diff options
author | Helmut Grohne <helmut@subdivi.de> | 2024-01-18 22:13:03 +0100 |
---|---|---|
committer | Helmut Grohne <helmut@subdivi.de> | 2024-01-18 22:13:03 +0100 |
commit | 034f732a1af4ce295d993e6951decc4898967dd3 (patch) | |
tree | 5a49cc7b5f5db586ae67f5276071ab525ef832f3 | |
download | python-linuxnamespaces-034f732a1af4ce295d993e6951decc4898967dd3.tar.gz |
initial checkin
-rw-r--r-- | .gitignore | 3 | ||||
-rw-r--r-- | README.md | 10 | ||||
-rw-r--r-- | conftest.py | 0 | ||||
-rw-r--r-- | linuxnamespaces/__init__.py | 333 | ||||
-rw-r--r-- | linuxnamespaces/atlocation.py | 362 | ||||
-rw-r--r-- | linuxnamespaces/syscalls.py | 504 | ||||
-rw-r--r-- | tests/test_simple.py | 164 |
7 files changed, 1376 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..a2eeca3 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +__pycache__ +.mypy_cache +.pytest_cache diff --git a/README.md b/README.md new file mode 100644 index 0000000..b215cd6 --- /dev/null +++ b/README.md @@ -0,0 +1,10 @@ +linuxnamespaces +=============== + +This is a plumbing-level Python module for working with Linux namespaces via +ctypes. It leverages glibc wrappers to access the relevant system calls and +provides typed abstractions for them. + +License +------- +GPL-3 diff --git a/conftest.py b/conftest.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/conftest.py diff --git a/linuxnamespaces/__init__.py b/linuxnamespaces/__init__.py new file mode 100644 index 0000000..29d41f6 --- /dev/null +++ b/linuxnamespaces/__init__.py @@ -0,0 +1,333 @@ +# Copyright 2024 Helmut Grohne <helmut@subdivi.de> +# SPDX-License-Identifier: GPL-3 + +"""Provide plumbing-layer functionality for working with Linux namespaces in +Python. +""" + +import bisect +import dataclasses +import os +import pathlib +import stat +import subprocess +import sys +import typing + +from .atlocation import * +from .syscalls import * + + +def subidranges( + kind: typing.Literal["uid", "gid"], login: str | None = None +) -> typing.Iterator[tuple[int, int]]: + """Parse a `/etc/sub?id` file for ranges allocated to the given or current + user. Return all ranges as (start, count) pairs. + """ + if login is None: + login = os.getlogin() + with open(f"/etc/sub{kind}") as filelike: + for line in filelike: + parts = line.strip().split(":") + if parts[0] == login: + yield (int(parts[1]), int(parts[2])) + + +@dataclasses.dataclass(frozen=True) +class IDMapping: + """Represent one range in a user or goup id mapping.""" + + innerstart: int + outerstart: int + count: int + + def __post_init__(self) -> None: + if self.outerstart < 0: + raise ValueError("outerstart must not be negative") + if self.innerstart < 0: + raise ValueError("innerstart must not be negative") + if self.count <= 0: + raise ValueError("count must be positive") + if self.outerstart + self.count >= 1 << 64: + raise ValueError("outerstart + count exceed 64bits") + if self.innerstart + self.count >= 1 << 64: + raise ValueError("innerstart + count exceed 64bits") + + +class IDAllocation: + """This represents a subset of IDs (user or group). It can be used to + allocate a continguous range for use with a user namespace. + """ + + def __init__(self) -> None: + self.ranges: list[tuple[int, int]] = [] + + def add_range(self, start: int, count: int) -> None: + """Add count ids starting from start to this allocation.""" + if start < 0 or count <= 0: + raise ValueError("invalid range") + index = bisect.bisect_right(self.ranges, (start, 0)) + prevrange = None + if index > 0: + prevrange = self.ranges[index - 1] + if prevrange[0] + prevrange[1] > start: + raise ValueError("attempt to add overlapping range") + nextrange = None + if index < len(self.ranges): + nextrange = self.ranges[index] + if nextrange[0] < start + count: + raise ValueError("attempt to add overlapping range") + if prevrange and prevrange[0] + prevrange[1] == start: + if nextrange and nextrange[0] == start + count: + self.ranges[index - 1] = ( + prevrange[0], + prevrange[1] + count + nextrange[1], + ) + del self.ranges[index] + else: + self.ranges[index - 1] = (prevrange[0], prevrange[1] + count) + elif nextrange and nextrange[0] == start + count: + self.ranges[index] = (start, count + nextrange[1]) + else: + self.ranges.insert(index, (start, count)) + + @classmethod + def loadsubid( + cls, kind: typing.Literal["uid", "gid"], login: str | None = None, + ) -> "IDAllocation": + """Load a `/etc/sub?id` file and return ids allocated to the given + login or current user. + """ + self = cls() + for start, count in subidranges(kind, login): + self.add_range(start, count) + return self + + def find(self, count: int) -> int: + """Locate count continguous ids from this allocation. The start of + the allocation is returned. The allocation object is left unchanged. + """ + for start, available in self.ranges: + if available >= count: + return start + raise ValueError("could not satisfy allocation request") + + def allocate(self, count: int) -> int: + """Allocate count contiguous ids from this allocation. The start of + the allocation is returned and the ids are removed from this + IDAllocation object. + """ + for index, (start, available) in enumerate(self.ranges): + if available > count: + self.ranges[index] = (start + count, available - count) + return start + if available == count: + del self.ranges[index] + return start + raise ValueError("could not satisfy allocation request") + + def allocatemap(self, count: int, target: int) -> IDMapping: + """Allocate count contiguous ids from this allocation. An IDMapping + with its innerstart set to target is returned. The allocation is + removed from this IDAllocation object. + """ + return IDMapping(target, self.allocate(count), count) + + +def newidmap( + kind: typing.Literal["uid", "gid"], + pid: int, + mapping: list[IDMapping], + helper: bool | None = None, +) -> None: + """Apply the given uid or gid mapping to the given process. A positive pid + identifies a process, other values identify the currently running process. + Whether setuid binaries newuidmap and newgidmap are used is determined via + the helper argument. A None value indicate automatic detection of whether + a helper is required for setting up the given mapping. + """ + + assert kind in ("uid", "gid") + if pid <= 0: + pid = os.getpid() + if helper is None: + # We cannot reliably test whether we have the right EUID and we don't + # implement checking whether setgroups has been denied either. Please + # be explicit about the helper choice in such cases. + helper = len(mapping) > 1 or mapping[0].count > 1 + if helper: + argv = [f"new{kind}map", str(pid)] + for idblock in mapping: + argv.extend(map(str, dataclasses.astuple(idblock))) + subprocess.check_call(argv) + else: + pathlib.Path(f"/proc/{pid}/{kind}_map").write_text( + "".join( + "%d %d %d\n" % dataclasses.astuple(idblock) + for idblock in mapping + ), + encoding="ascii", + ) + + +def newuidmap(pid: int, mapping: list[IDMapping], helper: bool = True) -> None: + """Apply a given uid mapping to the given process. Refer to newidmap for + details. + """ + newidmap("uid", pid, mapping, helper) + + +def newgidmap(pid: int, mapping: list[IDMapping], helper: bool = True) -> None: + """Apply a given gid mapping to the given process. Refer to newidmap for + details. + """ + newidmap("gid", pid, mapping, helper) + + +def newidmaps( + pid: int, + uidmapping: list[IDMapping], + gidmapping: list[IDMapping], + helper: bool = True, +) -> None: + """Appply a given uid and gid mapping to the given process. Refer to + newidmap for details. + """ + newgidmap(pid, gidmapping, helper) + newuidmap(pid, uidmapping, helper) + + +class run_in_fork: + """Decorator for running the decorated function once in a separate process. + """ + def __init__(self, function: typing.Callable[[], None]): + """Fork a new process that will eventually run the given function and + then exit. + """ + self.efd = EventFD() + self.pid = os.fork() + if self.pid == 0: + self.efd.read() + self.efd.close() + function() + sys.exit(0) + + def start(self) -> None: + """Start the decorated function. It can only be started once.""" + if not self.efd: + raise ValueError("this function can only be called once") + self.efd.write(1) + self.efd.close() + + def wait(self) -> None: + """Wait for the process running the decorated function to finish.""" + if self.efd: + raise ValueError("start must be called before wait") + ret = os.waitpid(self.pid, 0) + if ret != (self.pid, 0): + raise ValueError("something failed") + + def __call__(self) -> None: + """Start the decorated function and wait for its process to finish.""" + self.start() + self.wait() + + +def bind_mount( + source: AtLocationLike, + target: AtLocationLike, + recursive: bool = False, + readonly: bool = False, +) -> None: + """Create a bind mount from source to target. Depending on whether one of + the locations involves a file descriptor or not, the new or old mount API + will be used. + """ + source = AtLocation(source) + target = AtLocation(target) + try: + # mypy does not know that os.fspath accepts AtLocation + srcloc: str | bytes + srcloc = os.fspath(source) # type: ignore + tgtloc: str | bytes + tgtloc = os.fspath(target) # type: ignore + except ValueError: + otflags = OpenTreeFlags.OPEN_TREE_CLONE + if recursive: + otflags |= OpenTreeFlags.AT_RECURSIVE + with open_tree(source, otflags) as srcfd: + if readonly: + mount_setattr(srcfd, recursive, MountAttrFlags.RDONLY) + return move_mount(srcfd, target) + else: + mflags = MountFlags.BIND + if recursive: + mflags |= MountFlags.REC + if readonly: + mflags |= MountFlags.RDONLY + return mount(srcloc, tgtloc, None, mflags) + + +def populate_dev( + origroot: AtLocationLike, + newroot: PathConvertible, + *, + fuse: bool = True, + pidns: bool = True, + tun: bool = True, +) -> None: + """Mount a tmpfs to the dev directory beneath newroot and populate it with + basic devices by bind mounting them from the dev directory beneath + origroot. Also mount a new pts instance. + """ + origdev = AtLocation(origroot) / "dev" + newdev = AtLocation(newroot) / "dev" + mount( + "devtmpfs", + newdev, + "tmpfs", + MountFlags.NOSUID | MountFlags.NOEXEC, + "mode=0755", + ) + bind_devices = "null zero full random urandom tty".split() + bind_directories = [] + if fuse: + bind_devices.append("fuse") + if pidns: + (newdev / "pts").mkdir() + mount( + "devpts", + newdev / "pts", + "devpts", + MountFlags.NOSUID | MountFlags.NOEXEC, + "gid=5,mode=620,ptmxmode=666", + ) + (newdev / "ptmx").symlink("pts/ptmx") + else: + bind_devices.append("ptmx") + bind_directories.append("pts") + if tun: + (newdev / "net").mkdir() + bind_devices.append("net/tun") + for node in bind_devices: + (newdev / node).mknod(stat.S_IFREG) + bind_mount(origdev / node, newdev / node, True) + for node in bind_directories: + (newdev / node).mkdir() + bind_mount(origdev / node, newdev / node, True) + + +def unshare_user_idmap( + uidmap: list[IDMapping], + gidmap: list[IDMapping], + flags: CloneFlags = CloneFlags.NEWUSER, +) -> None: + """Unshare the given namespaces (must include user) and set up the given + id mappings. + """ + pid = os.getpid() + @run_in_fork + def setup_idmaps() -> None: + newidmaps(pid, uidmap, gidmap) + unshare(flags) + setup_idmaps() diff --git a/linuxnamespaces/atlocation.py b/linuxnamespaces/atlocation.py new file mode 100644 index 0000000..2c827a2 --- /dev/null +++ b/linuxnamespaces/atlocation.py @@ -0,0 +1,362 @@ +# Copyright 2024 Helmut Grohne <helmut@subdivi.de> +# SPDX-License-Identifier: GPL-3 + +"""Describe a location in the filesystem by a combination of a file descriptor +and a file name each of which can be optional. Many Linux system calls are able +to work with a location described in this way and this module provides support +code for doing so. +""" + +import enum +import os +import os.path +import pathlib +import typing + + +AT_FDCWD = -100 + + +PathConvertible = typing.Union[bytes, str, os.PathLike] + + +class AtFlags(enum.IntFlag): + """Linux AT_* flags used with many different syscalls.""" + + NONE = 0 + AT_SYMLINK_NOFOLLOW = 0x100 + AT_NO_AUTOMOUNT = 0x800 + AT_EMPTY_PATH = 0x1000 + + +class AtLocation: + """Represent a location in the filesystem suitable for use with the + at-family of syscalls. If flags has the AT_EMPTY_PATH bit set, the + location string must be empty and the file descriptor specifies the + filesystem object. Otherwise, the location specifies the filesystem object. + If it is relative, it the anchor is the file descriptor or the current + working directory if the file descriptor is AT_FDCWD. + """ + + fd: int + location: PathConvertible + flags: AtFlags + + def __new__( + cls, + thing: typing.Union["AtLocation", int, PathConvertible], + location: PathConvertible | None = None, + flags: AtFlags = AtFlags.NONE, + ) -> "AtLocation": + """The argument thing can be many different thing. If it is an + AtLocation, it is copied and all other arguments must be unset. If it + is an integer, it is considered to be a file descriptor and the + location must be unset if flags contains AT_EMPTY_PATH. flags are used + as is except that AT_EMPTY_PATH is automatically added when given a + file descriptor and no location. + """ + if isinstance(thing, AtLocation): + if location is not None or flags != AtFlags.NONE: + raise ValueError( + "cannot override location or flags for an AtLocation" + ) + return thing # Don't copy. + obj = super(AtLocation, cls).__new__(cls) + if isinstance(thing, int): + if thing < 0 and thing != AT_FDCWD: + raise ValueError("fd cannot be negative") + obj.fd = thing + if location is None: + obj.location = "" + obj.flags = flags | AtFlags.AT_EMPTY_PATH + elif flags & AtFlags.AT_EMPTY_PATH: + raise ValueError( + "cannot set AT_EMPTY_PATH with a non-empty location" + ) + else: + obj.location = location + obj.flags = flags + elif location is not None: + raise ValueError("location specified twice") + else: + obj.fd = AT_FDCWD + obj.location = thing + obj.flags = flags + return obj + + def close(self) -> None: + """Close the underlying file descriptor.""" + if self.fd >= 0: + os.close(self.fd) + self.fd = AT_FDCWD + + def nosymfollow(self) -> "AtLocation": + """Return a copy with the AT_SYMLINK_NOFOLLOW set.""" + return AtLocation( + self.fd, self.location, self.flags | AtFlags.AT_SYMLINK_NOFOLLOW + ) + + def symfollow(self) -> "AtLocation": + """Return a copy with AT_SYMLINK_NOFOLLOW cleared.""" + return AtLocation( + self.fd, self.location, self.flags & ~AtFlags.AT_SYMLINK_NOFOLLOW + ) + + def noautomount(self) -> "AtLocation": + """Return a copy with AT_NO_AUTOMOUNT set.""" + return AtLocation( + self.fd, self.location, self.flags | AtFlags.AT_NO_AUTOMOUNT + ) + + def automount(self) -> "AtLocation": + """Return a copy with AT_NO_AUTOMOUNT cleared.""" + return AtLocation( + self.fd, self.location, self.flags & ~AtFlags.AT_NO_AUTOMOUNT + ) + + def joinpath(self, name: PathConvertible) -> "AtLocation": + """Combine an AtLocation and a path by doing the equivalent of joining + them with a slash as separator. + """ + if self.flags & AtFlags.AT_EMPTY_PATH: + return AtLocation( + self.fd, name, self.flags & ~AtFlags.AT_EMPTY_PATH + ) + if not self.location: + return AtLocation(self.fd, name, self.flags) + if isinstance(self.location, bytes) or isinstance(name, bytes): + return AtLocation( + self.fd, + os.path.join(os.fsencode(self.location), os.fsencode(name)), + self.flags, + ) + return AtLocation( + self.fd, pathlib.Path(self.location).joinpath(name), self.flags + ) + + def __truediv__(self, name: PathConvertible) -> "AtLocation": + return self.joinpath(name) + + def fileno(self) -> int: + """Return the underlying file descriptor if this is an AT_EMPTY_PATH + location and raise a ValueError otherwise. + """ + if self.flags != AtFlags.AT_EMPTY_PATH: + raise ValueError("AtLocation is not simply a file descriptor") + assert self.fd >= 0 + assert not self.location + return self.fd + + @property + def fd_or_none(self) -> int | None: + """A variant of the fd attribute that replaces AT_FDCWD with None.""" + return None if self.fd == AT_FDCWD else self.fd + + def access(self, mode: int, *, effective_ids: bool = False) -> bool: + """Wrapper for os.access supplying path, dir_fd and follow_symlinks.""" + if self.flags == AtFlags.AT_SYMLINK_NOFOLLOW: + follow_symlinks = False + elif self.flags == AtFlags.NONE: + follow_symlinks = True + else: + raise NotImplementedError( + "access on AtLocation only supports flag AT_SYMLINK_NOFOLLOW" + ) + assert self.location + return os.access( + self.location, + mode, + dir_fd=self.fd_or_none, + effective_ids=effective_ids, + follow_symlinks=follow_symlinks, + ) + + def chdir(self) -> None: + """Wrapper for os.chdir or os.fchdir.""" + if self.flags == AtFlags.AT_EMPTY_PATH: + return os.fchdir(self.fd) + if self.flags != AtFlags.NONE: + raise NotImplementedError( + "chdir on AtLocation only supports flag AT_EMPTY_PATH" + ) + assert self.location + return os.chdir(self.location) + + def chmod(self, mode: int) -> None: + """Wrapper for os.chmod or os.fchmod.""" + if self.flags == AtFlags.AT_EMPTY_PATH: + return os.fchmod(self.fd, mode) + if self.flags == AtFlags.AT_SYMLINK_NOFOLLOW: + follow_symlinks = False + elif self.flags == AtFlags.NONE: + follow_symlinks = True + else: + raise NotImplementedError( + "chmod on AtLocation with unsupported flags" + ) + assert self.location + return os.chmod( + self.location, + mode, + dir_fd=self.fd_or_none, + follow_symlinks=follow_symlinks, + ) + + def chown(self, uid: int, gid: int) -> None: + """Wrapper for os.chown or os.chown.""" + if self.flags == AtFlags.AT_EMPTY_PATH: + return os.fchown(self.fd, uid, gid) + if self.flags == AtFlags.AT_SYMLINK_NOFOLLOW: + follow_symlinks = False + elif self.flags == AtFlags.NONE: + follow_symlinks = True + else: + raise NotImplementedError( + "chmod on AtLocation with unsupported flags" + ) + assert self.location + return os.chown( + self.location, + uid, + gid, + dir_fd=self.fd_or_none, + follow_symlinks=follow_symlinks, + ) + + def mkdir(self, mode: int = 0o777) -> None: + """Wrapper for os.mkdir supplying path and dir_fd.""" + if self.flags != AtFlags.NONE: + raise NotImplementedError( + "mkdir is not supported for an AtLocation with flags" + ) + assert self.location + os.mkdir(self.location, mode, dir_fd=self.fd_or_none) + + def mknod(self, mode: int = 0o600, device: int = 0) -> None: + """Wrapper for os.mknod supplying path and dir_fd.""" + if self.flags != AtFlags.NONE: + raise NotImplementedError( + "mknod is not supported for an AtLocation with flags" + ) + assert self.location + os.mknod(self.location, mode, device, dir_fd=self.fd_or_none) + + def open(self, flags: int, mode: int = 0o777) -> int: + """Wrapper for os.open supplying path and dir_fd.""" + if self.flags == AtFlags.AT_SYMLINK_NOFOLLOW: + flags |= os.O_NOFOLLOW + elif self.flags != AtFlags.NONE: + raise NotImplementedError( + "opening an AtLocation only supports flag AT_SYMLINK_NOFOLLOW" + ) + assert self.location + return os.open(self.location, flags, mode, dir_fd=self.fd_or_none) + + def readlink(self) -> str: + """Wrapper for os.readlink supplying path and dir_fd.""" + if self.flags & ~AtFlags.AT_EMPTY_PATH != AtFlags.NONE: + raise NotImplementedError( + "readlink on AtLocation only support flag AT_EMPTY_PATH" + ) + return os.fsdecode( + os.readlink(os.fspath(self.location), dir_fd=self.fd_or_none) + ) + + def rmdir(self) -> None: + """Wrapper for os.rmdir suppling path and dir_fd.""" + if self.flags != AtFlags.NONE: + raise NotImplementedError( + "rmdir is not supported for an AtLocation with flags" + ) + assert self.location + return os.rmdir(self.location, dir_fd=self.fd_or_none) + + def symlink(self, linktarget: PathConvertible) -> None: + """Create a symlink at self pointing to linktarget. Note that this + method has its arguments reversed compared to the usual os.symlink, + because the dir_fd is applicable to the second argument there. + """ + if self.flags != AtFlags.NONE: + raise NotImplementedError( + "symlink is not supported for an AtLocation with flags" + ) + assert self.location + os.symlink(linktarget, self.location, dir_fd=self.fd_or_none) + + def unlink(self) -> None: + """Wrapper for os.unlink suppling path and dir_fd.""" + if self.flags != AtFlags.NONE: + raise NotImplementedError( + "unlink is not supported for an AtLocation with flags" + ) + assert self.location + return os.unlink(self.location, dir_fd=self.fd_or_none) + + def walk( + self, + topdown: bool = True, + onerror: typing.Callable[[OSError], typing.Any] | None = None, + follow_symlinks: bool = False, + ) -> typing.Iterator[ + tuple[ + "AtLocation", list["AtLocation"], list["AtLocation"], "AtLocation", + ] + ]: + """Resemble os.fwalk with a few differences. The returned iterator + yields the dirpath as an AtLocation that borrows the fd from self. The + dirnames and filenames become AtLocations whose location is the entry + name and whose fd is temporary. Finally, the dirfd also becomes an + AtLocations referencing the same object as the dirpath though as an + AT_EMPTY_PATH with temporary fd. + """ + if self.flags != AtFlags.NONE: + raise NotImplementedError( + "walk is not supported for an AtLocation with flags" + ) + for dirpath, dirnames, filenames, dirfd in os.fwalk( + self.location, + topdown=topdown, + onerror=onerror, + follow_symlinks=follow_symlinks, + dir_fd=self.fd_or_none, + ): + yield ( + AtLocation(self.fd, dirpath), + [AtLocation(dirfd, dirname) for dirname in dirnames], + [AtLocation(dirfd, filename) for filename in filenames], + AtLocation(dirfd), + ) + + def __enter__(self) -> "AtLocation": + """When used as a context manager, the associated fd will be closed on + scope exit. + """ + return self + + def __exit__( + self, + exc_type: typing.Any, + exc_value: typing.Any, + traceback: typing.Any, + ) -> None: + """When used as a context manager, the associated fd will be closed on + scope exit. + """ + self.close() + + def __fspath__(self) -> str | bytes: + """Return the underlying location if it uniquely defines this object. + Otherwise raise a ValueError. + """ + if self.fd != AT_FDCWD: + raise ValueError( + "AtLocation with fd is not convertible to plain path" + ) + if self.flags != AtFlags.NONE: + raise ValueError( + "AtLocation with flags is not convertible to plain path" + ) + return os.fspath(self.location) + + +AtLocationLike = typing.Union[AtLocation, int, PathConvertible] diff --git a/linuxnamespaces/syscalls.py b/linuxnamespaces/syscalls.py new file mode 100644 index 0000000..0e33a44 --- /dev/null +++ b/linuxnamespaces/syscalls.py @@ -0,0 +1,504 @@ +# Copyright 2024 Helmut Grohne <helmut@subdivi.de> +# SPDX-License-Identifier: GPL-3 + +"""Provide typed Python functions for a number of Linux system calls relevant +for Linux namespaces including the new mount API. +""" + +import ctypes +import dataclasses +import enum +import os +import typing + +from .atlocation import AtFlags, AtLocation, AtLocationLike, PathConvertible + + +LIBC_SO = ctypes.CDLL("libc.so.6", use_errno=True) + + +class CloneFlags(enum.IntFlag): + """This value may be supplied to + * unshare(2) flags + * clone3(2) clone_args.flags + * setns(2) nstype + """ + + NONE = 0x00000000 + NEWTIME = 0x00000080 + VM = 0x00000100 + FS = 0x00000200 + FILES = 0x00000400 + SIGHAND = 0x00000800 + PIDFD = 0x00001000 + PTRACE = 0x00002000 + VFORK = 0x00004000 + PARENT = 0x00008000 + THREAD = 0x00010000 + NEWNS = 0x00020000 + SYSVSEM = 0x00040000 + SETTLS = 0x00080000 + PARENT_SETTID = 0x00100000 + CHILD_CLEARTID = 0x00200000 + DETACHED = 0x00400000 + UNTRACED = 0x00800000 + CHILD_SETTID = 0x01000000 + NEWCGROUP = 0x02000000 + NEWUTS = 0x04000000 + NEWIPC = 0x08000000 + NEWUSER = 0x10000000 + NEWPID = 0x20000000 + NEWNET = 0x40000000 + IO = 0x80000000 + NS_FLAGS = ( + NEWCGROUP + | NEWIPC + | NEWNET + | NEWNS + | NEWPID + | NEWTIME + | NEWUSER + | NEWUTS + ) + UNSHARE_FLAGS = NS_FLAGS | FILES | FS | SYSVSEM + + +class EventFDFlags(enum.IntFlag): + """This value may be supplied as flags to eventfd(2).""" + + NONE = 0 + CLOEXEC = 0o2000000 + NONBLOCK = 0o4000 + SEMAPHORE = 0o1 + ALL_FLAGS = CLOEXEC | NONBLOCK | SEMAPHORE + + +class MountFlags(enum.IntFlag): + """This value may be supplied as mountflags to mount(2).""" + + NONE = 0 + RDONLY = 1 << 0 + NOSUID = 1 << 1 + NODEV = 1 << 2 + NOEXEC = 1 << 3 + SYNCHRONOUS = 1 << 4 + REMOUNT = 1 << 5 + MANDLOCK = 1 << 6 + DIRSYNC = 1 << 7 + NOSYMFOLLOW = 1 << 8 + # Bit 9 vanished + NOATIME = 1 << 10 + NODIRATIME = 1 << 11 + BIND = 1 << 12 + MOVE = 1 << 13 + REC = 1 << 14 + SILENT = 1 << 15 + POSIXACL = 1 << 16 + UNBINDABLE = 1 << 17 + PRIVATE = 1 << 18 + SLAVE = 1 << 19 + SHARED = 1 << 20 + RELATIME = 1 << 21 + KERNMOUNT = 1 << 22 + I_VERSION = 1 << 23 + STRICTATIME = 1 << 24 + LAZYTIME = 1 << 25 + SUBMOUNT = 1 << 26 + NOREMOTELOCK = 1 << 27 + NOSEC = 1 << 28 + BORN = 1 << 29 + ACTIVE = 1 << 30 + NOUSER = 1 << 31 + + PROPAGATION_FLAGS = UNBINDABLE | PRIVATE | SLAVE | SHARED + + +class MountSetattrFlags(enum.IntFlag): + """This value may be supplied as flags to mount_setattr(2).""" + + NONE = 0 + AT_SYMLINK_NOFOLLOW = 0x100 + AT_NO_AUTOMOUNT = 0x800 + AT_EMPTY_PATH = 0x1000 + AT_RECURSIVE = 0x8000 + + @staticmethod + def from_atflags(flags: AtFlags) -> "MountSetattrFlags": + ret = MountSetattrFlags.NONE + if flags & AtFlags.AT_SYMLINK_NOFOLLOW: + ret |= MountSetattrFlags.AT_SYMLINK_NOFOLLOW + if flags & AtFlags.AT_NO_AUTOMOUNT: + ret |= MountSetattrFlags.AT_NO_AUTOMOUNT + if flags & AtFlags.AT_EMPTY_PATH: + ret |= MountSetattrFlags.AT_EMPTY_PATH + return ret + + +class MountAttrFlags(enum.IntFlag): + """This value may be supplied as attr->attr_set or attr->attr_clr to + mount_setattr(2). + """ + + NONE = 0x000000 + RDONLY = 0x000001 # Mount read-only. + NOSUID = 0x000002 # Ignore suid and sgid bits. + NODEV = 0x000004 # Disallow access to device special files. + NOEXEC = 0x000008 # Disallow program execution. + RELATIME = 0x000000 # - Update atime relative to mtime/ctime. + NOATIME = 0x000010 # - Do not update access times. + STRICTATIME = 0x000020 # - Always perform atime updates + _ATIME = 0x000070 | NOATIME | STRICTATIME + # Setting on how atime should be updated. + NODIRATIME = 0x000080 # Do not update directory access times. + IDMAP = 0x100000 # Idmap mount to @userns_fd in struct mount_attr. + NOSYMFOLLOW = 0x200000 # Do not follow symlinks. + + ALL_FLAGS = ( + RDONLY + | NOSYMFOLLOW + | NODEV + | NOEXEC + | _ATIME + | NODIRATIME + | IDMAP + | NOSYMFOLLOW + ) + + +class MountAttr(ctypes.Structure): + """This value may be supplied to mount_setattr(2) as attr.""" + + _fields_ = [ + ("attr_set", ctypes.c_ulonglong), + ("attr_clr", ctypes.c_ulonglong), + ("propagation", ctypes.c_ulonglong), + ("userns_fd", ctypes.c_ulonglong), + ] + + +class MoveMountFlags(enum.IntFlag): + """This value may be supplied to move_mount(2) as flags.""" + + NONE = 0x00000000 + F_SYMLINKS = 0x00000001 # Follow symlinks on from path + F_AUTOMOUNTS = 0x00000002 # Follow automounts on from path + F_EMPTY_PATH = 0x00000004 # Empty from path permitted + T_SYMLINKS = 0x00000010 # Follow symlinks on to path + T_AUTOMOUNTS = 0x00000020 # Follow automounts on to path + T_EMPTY_PATH = 0x00000040 # Empty to path permitted + SET_GROUP = 0x00000100 # Set sharing group instead + ALL_FLAGS = ( + F_SYMLINKS + | F_AUTOMOUNTS + | F_EMPTY_PATH + | T_SYMLINKS + | T_AUTOMOUNTS + | T_EMPTY_PATH + | SET_GROUP + ) + + +class OpenTreeFlags(enum.IntFlag): + """This value may be supplied to open_tree(2) as flags.""" + + NONE = 0 + OPEN_TREE_CLONE = 0x1 + OPEN_TREE_CLOEXEC = os.O_CLOEXEC + AT_SYMLINK_NOFOLLOW = 0x100 + AT_NO_AUTOMOUNT = 0x800 + AT_EMPTY_PATH = 0x1000 + AT_RECURSIVE = 0x8000 + ALL_FLAGS = ( + OPEN_TREE_CLONE + | OPEN_TREE_CLOEXEC + | AT_SYMLINK_NOFOLLOW + | AT_NO_AUTOMOUNT + | AT_EMPTY_PATH + | AT_RECURSIVE + ) + + +class UmountFlags(enum.IntFlag): + """This value may be supplied to umount2(2) as flags.""" + + NONE = 0 + FORCE = 1 + DETACH = 2 + EXPIRE = 4 + NOFOLLOW = 8 + ALL_FLAGS = FORCE | DETACH | EXPIRE | NOFOLLOW + + +def call_libc(funcname: str, *args: typing.Any) -> int: + """Call a function from the C library with given args. This assumes that + the function returns an integer that is non-negative on success. On + failure, an OSError with errno is raised. + """ + ret: int = LIBC_SO[funcname](*args) + if ret < 0: + err = ctypes.get_errno() + raise OSError( + err, f"{funcname}() failed with error {err}: {os.strerror(err)}" + ) + return ret + + +@dataclasses.dataclass +class CapabilitySets: + """Represent the main capability sets that capget/capset deal with.""" + + effective: int + permitted: int + inheritable: int + + @staticmethod + def _create_header(pid: int) -> ctypes.Array[ctypes.c_uint32]: + return (ctypes.c_uint32 * 2)( + 0x20080522, # _LINUX_CAPABILITY_VERSION_3 + pid, + ) + + @classmethod + def get(cls, pid: int = 0) -> "CapabilitySets": + """Call capget to retrieve the current capability sets.""" + header = cls._create_header(pid) + data = (ctypes.c_uint32 * 6)() + call_libc("capget", ctypes.byref(header), ctypes.byref(data)) + return cls( + (data[3] << 32) | data[0], + (data[4] << 32) | data[1], + (data[5] << 32) | data[2], + ) + + def set(self, pid: int = 0) -> None: + """Call capset to set the capabilities.""" + header = self._create_header(pid) + data = (ctypes.c_uint32 * 6)( + self.effective & 0xffffffff, + self.permitted & 0xffffffff, + self.inheritable & 0xffffffff, + self.effective >> 32, + self.permitted >> 32, + self.inheritable >> 32, + ) + call_libc("capset", ctypes.byref(header), ctypes.byref(data)) + + +class EventFD: + """Represent a file decriptor returned from eventfd(2).""" + + def __init__( + self, initval: int = 0, flags: EventFDFlags = EventFDFlags.NONE + ) -> None: + if flags & ~EventFDFlags.ALL_FLAGS: + raise ValueError("invalid flags for eventfd") + self.fd = call_libc("eventfd", initval, int(flags)) + + def read(self) -> int: + """Decrease the value of the eventfd using eventfd_read.""" + if self.fd < 0: + raise ValueError("attempt to read from closed eventfd") + cvalue = ctypes.c_ulonglong() + call_libc("eventfd_read", self.fd, ctypes.byref(cvalue)) + return cvalue.value + + def write(self, value: int = 1) -> None: + """Add the given value to the eventfd using eventfd_write.""" + if self.fd < 0: + raise ValueError("attempt to read from closed eventfd") + if value < 0 or (value >> 64): + raise ValueError("value for eventfd_write out of range") + call_libc("eventfd_write", self.fd, ctypes.c_ulonglong(value)) + + def fileno(self) -> int: + """Return the underlying file descriptor.""" + return self.fd + + def close(self) -> None: + """Close the underlying file descriptor.""" + if self.fd >= 0: + try: + os.close(self.fd) + finally: + self.fd = -1 + + __del__ = close + + def __bool__(self) -> bool: + """Return True unless the eventfd is closed.""" + return self.fd >= 0 + + def __enter__(self) -> "EventFD": + """When used as a context manager, the EventFD is closed on scope exit. + """ + return self + + def __exit__( + self, + exc_type: typing.Any, + exc_value: typing.Any, + traceback: typing.Any, + ) -> None: + self.close() + + +def mount( + source: PathConvertible, + target: PathConvertible, + filesystemtype: str | None, + flags: MountFlags = MountFlags.NONE, + data: str | None = None, +) -> None: + """Python wrapper for mount(2).""" + if (flags & MountFlags.PROPAGATION_FLAGS).bit_count() > 1: + raise ValueError("invalid flags for mount") + if ( + flags & MountFlags.PROPAGATION_FLAGS + and flags & ~( + MountFlags.PROPAGATION_FLAGS | MountFlags.REC | MountFlags.SILENT + ) + ): + raise ValueError("invalid flags for mount") + call_libc( + "mount", + os.fsencode(source), + os.fsencode(target), + None if filesystemtype is None else os.fsencode(filesystemtype), + int(flags), + None if data is None else os.fsencode(data), + ) + + +def mount_setattr( + filesystem: AtLocationLike, + recursive: bool, + attr_set: MountAttrFlags = MountAttrFlags.NONE, + attr_clr: MountAttrFlags = MountAttrFlags.NONE, + propagation: int = 0, + userns_fd: int = -1, +) -> None: + """Python wrapper for mount_setattr(2).""" + filesystem = AtLocation(filesystem) + flags = MountSetattrFlags.from_atflags(filesystem.flags) + if recursive: + flags |= MountSetattrFlags.AT_RECURSIVE + if attr_clr & MountAttrFlags.IDMAP: + raise ValueError("cannot clear the MOUNT_ATTR_IDMAP flag") + attr = MountAttr(attr_set, attr_clr, propagation, userns_fd) + call_libc( + "mount_setattr", + filesystem.fd, + os.fsencode(filesystem.location), + int(flags), + ctypes.byref(attr), + ctypes.sizeof(attr), + ) + + +def move_mount( + from_: AtLocationLike, + to: AtLocationLike, + flags: MoveMountFlags = MoveMountFlags.NONE, +) -> None: + """Python wrapper for move_mount(2).""" + from_ = AtLocation(from_) + to = AtLocation(to) + if flags & ~MoveMountFlags.ALL_FLAGS: + raise ValueError("invalid flags for move_mount") + if from_.flags & AtFlags.AT_SYMLINK_NOFOLLOW: + flags &= ~MoveMountFlags.F_SYMLINKS + else: + flags |= MoveMountFlags.F_SYMLINKS + if from_.flags & AtFlags.AT_NO_AUTOMOUNT: + flags &= ~MoveMountFlags.F_AUTOMOUNTS + else: + flags |= MoveMountFlags.F_AUTOMOUNTS + if from_.flags & AtFlags.AT_EMPTY_PATH: + flags |= MoveMountFlags.F_EMPTY_PATH + else: + flags &= ~MoveMountFlags.F_EMPTY_PATH + if to.flags & AtFlags.AT_SYMLINK_NOFOLLOW: + flags &= ~MoveMountFlags.T_SYMLINKS + else: + flags |= MoveMountFlags.T_SYMLINKS + if to.flags & AtFlags.AT_NO_AUTOMOUNT: + flags &= ~MoveMountFlags.T_AUTOMOUNTS + else: + flags |= MoveMountFlags.T_AUTOMOUNTS + if to.flags & AtFlags.AT_EMPTY_PATH: + flags |= MoveMountFlags.T_EMPTY_PATH + else: + flags &= ~MoveMountFlags.T_EMPTY_PATH + call_libc( + "move_mount", + from_.fd, + os.fsencode(from_.location), + to.fd, + os.fsencode(to.location), + int(flags), + ) + + +def open_tree( + source: AtLocationLike, flags: OpenTreeFlags = OpenTreeFlags.NONE +) -> AtLocation: + """Python wrapper for open_tree(2).""" + source = AtLocation(source) + if flags & ~OpenTreeFlags.ALL_FLAGS: + raise ValueError("invalid flags for open_tree") + if ( + flags & OpenTreeFlags.AT_RECURSIVE + and not flags & OpenTreeFlags.OPEN_TREE_CLONE + ): + raise ValueError("invalid flags for open_tree") + if source.flags & AtFlags.AT_SYMLINK_NOFOLLOW: + flags |= OpenTreeFlags.AT_SYMLINK_NOFOLLOW + else: + flags &= ~OpenTreeFlags.AT_SYMLINK_NOFOLLOW + if source.flags & AtFlags.AT_NO_AUTOMOUNT: + flags |= OpenTreeFlags.AT_NO_AUTOMOUNT + else: + flags &= ~OpenTreeFlags.AT_NO_AUTOMOUNT + if source.flags & AtFlags.AT_EMPTY_PATH: + flags |= OpenTreeFlags.AT_EMPTY_PATH + else: + flags &= ~OpenTreeFlags.AT_EMPTY_PATH + return AtLocation( + call_libc( + "open_tree", source.fd, os.fsencode(source.location), int(flags) + ) + ) + + +def pivot_root(new_root: PathConvertible, put_old: PathConvertible) -> None: + """Python wrapper for pivot_root(2).""" + call_libc("pivot_root", os.fsencode(new_root), os.fsencode(put_old)) + + +def setns(fd: int, nstype: CloneFlags = CloneFlags.NONE) -> None: + """Python wrapper for setns(2).""" + if fd < 0: + raise ValueError("invalid file descriptor") + if nstype & ~CloneFlags.NS_FLAGS != 0: + raise ValueError("invalid nstype for setns") + call_libc("setns", fd, int(nstype)) + + +def umount( + path: PathConvertible, flags: UmountFlags = UmountFlags.NONE +) -> None: + """Python wrapper for umount(2).""" + if flags & ~UmountFlags.ALL_FLAGS: + raise ValueError("umount flags out of range") + if flags & UmountFlags.EXPIRE and flags & ( + UmountFlags.FORCE | UmountFlags.DETACH + ): + raise ValueError("invalid flags for umount") + call_libc("umount2", os.fsencode(path), int(flags)) + + +def unshare(flags: CloneFlags) -> None: + """Python wrapper for unshare(2).""" + if flags & ~CloneFlags.UNSHARE_FLAGS: + raise ValueError("invalid flags for unshare") + call_libc("unshare", int(flags)) diff --git a/tests/test_simple.py b/tests/test_simple.py new file mode 100644 index 0000000..e0cb66e --- /dev/null +++ b/tests/test_simple.py @@ -0,0 +1,164 @@ +# Copyright 2024 Helmut Grohne <helmut@subdivi.de> +# SPDX-License-Identifier: GPL-3 + +import functools +import os +import pathlib +import socket +import unittest + +import pytest + +import linuxnamespaces + + +def allow_fork_exit(function): + @functools.wraps(function) + def wrapped(*args, **kwargs): + mainpid = os.getpid() + try: + return function(*args, **kwargs) + except SystemExit as sysexit: + if sysexit.code or os.getpid() == mainpid: + raise + + # We're supposed to successfully exit from a child process. If we + # were to return or raise here, pytest would record success or + # failure. Instead we hide this process from pytest. + os._exit(0) + return pytest.mark.forked(wrapped) + +class IDAllocationTest(unittest.TestCase): + def test_idalloc(self) -> None: + alloc = linuxnamespaces.IDAllocation() + alloc.add_range(1, 2) + alloc.add_range(5, 4) + self.assertIn(alloc.find(3), (5, 6)) + self.assertIn(alloc.allocate(3), (5, 6)) + self.assertRaises(ValueError, alloc.find, 3) + self.assertRaises(ValueError, alloc.allocate, 3) + self.assertEqual(alloc.find(2), 1) + + def test_merge(self) -> None: + alloc = linuxnamespaces.IDAllocation() + alloc.add_range(1, 2) + alloc.add_range(3, 2) + self.assertIn(alloc.allocate(3), (1, 2)) + + +class UnshareTest(unittest.TestCase): + @pytest.mark.forked + def test_unshare_user(self) -> None: + overflowuid = int(pathlib.Path("/proc/sys/fs/overflowuid").read_text()) + idmap = linuxnamespaces.IDMapping(0, os.getuid(), 1) + linuxnamespaces.unshare(linuxnamespaces.CloneFlags.NEWUSER) + self.assertEqual(os.getuid(), overflowuid) + linuxnamespaces.newuidmap(-1, [idmap], False) + self.assertEqual(os.getuid(), 0) + # UID 1 is not mapped. + self.assertRaises(OSError, os.setuid, 1) + + @allow_fork_exit + def test_mount_proc(self) -> None: + idmap = linuxnamespaces.IDMapping(0, os.getuid(), 1) + linuxnamespaces.unshare( + linuxnamespaces.CloneFlags.NEWUSER + | linuxnamespaces.CloneFlags.NEWNS + | linuxnamespaces.CloneFlags.NEWPID + ) + linuxnamespaces.newuidmap(-1, [idmap], False) + @linuxnamespaces.run_in_fork + def setup() -> None: + self.assertEqual(os.getpid(), 1) + linuxnamespaces.mount("proc", "/proc", "proc") + setup() + + @pytest.mark.forked + def test_sethostname(self) -> None: + self.assertRaises(socket.error, socket.sethostname, "example") + linuxnamespaces.unshare( + linuxnamespaces.CloneFlags.NEWUSER + | linuxnamespaces.CloneFlags.NEWUTS + ) + socket.sethostname("example") + + @pytest.mark.forked + def test_populate_dev(self) -> None: + uidmap = linuxnamespaces.IDMapping(0, os.getuid(), 1) + gidmap = linuxnamespaces.IDMapping(0, os.getgid(), 1) + linuxnamespaces.unshare( + linuxnamespaces.CloneFlags.NEWUSER + | linuxnamespaces.CloneFlags.NEWNS + ) + pathlib.Path("/proc/self/setgroups").write_text("deny") + linuxnamespaces.newuidmap(-1, [uidmap], False) + linuxnamespaces.newgidmap(-1, [gidmap], False) + linuxnamespaces.mount("tmpfs", "/mnt", "tmpfs", data="mode=0755") + os.mkdir("/mnt/dev") + linuxnamespaces.populate_dev("/", "/mnt", pidns=False) + self.assertTrue(os.access("/mnt/dev/null", os.W_OK)) + pathlib.Path("/mnt/dev/null").write_text("") + + +class UnshareIdmapTest(unittest.TestCase): + def setUp(self) -> None: + super().setUp() + self.uidalloc = linuxnamespaces.IDAllocation.loadsubid("uid") + self.gidalloc = linuxnamespaces.IDAllocation.loadsubid("gid") + try: + self.uidalloc.find(65536) + self.gidalloc.find(65536) + except ValueError: + self.skipTest("insufficient /etc/sub?id allocation") + + @allow_fork_exit + def test_unshare_user_idmap(self) -> None: + overflowuid = int(pathlib.Path("/proc/sys/fs/overflowuid").read_text()) + uidmap = linuxnamespaces.IDMapping( + 0, self.uidalloc.allocate(65536), 65536 + ) + self.assertNotEqual(os.getuid(), uidmap.outerstart) + gidmap = linuxnamespaces.IDMapping( + 0, self.gidalloc.allocate(65536), 65536 + ) + pid = os.getpid() + @linuxnamespaces.run_in_fork + def setup() -> None: + linuxnamespaces.newgidmap(pid, [gidmap]) + linuxnamespaces.newuidmap(pid, [uidmap]) + linuxnamespaces.unshare(linuxnamespaces.CloneFlags.NEWUSER) + setup() + self.assertEqual(os.getuid(), overflowuid) + os.setuid(0) + self.assertEqual(os.getuid(), 0) + os.setuid(1) + self.assertEqual(os.getuid(), 1) + + @allow_fork_exit + def test_populate_dev(self) -> None: + uidmap = linuxnamespaces.IDMapping( + 0, self.uidalloc.allocate(65536), 65536 + ) + self.assertNotEqual(os.getuid(), uidmap.outerstart) + gidmap = linuxnamespaces.IDMapping( + 0, self.gidalloc.allocate(65536), 65536 + ) + pid = os.getpid() + @linuxnamespaces.run_in_fork + def setup() -> None: + linuxnamespaces.newgidmap(pid, [gidmap]) + linuxnamespaces.newuidmap(pid, [uidmap]) + linuxnamespaces.unshare( + linuxnamespaces.CloneFlags.NEWUSER + | linuxnamespaces.CloneFlags.NEWNS + | linuxnamespaces.CloneFlags.NEWPID + ) + setup() + os.setreuid(0, 0) + os.setregid(0, 0) + linuxnamespaces.mount("tmpfs", "/mnt", "tmpfs") + os.mkdir("/mnt/dev") + @linuxnamespaces.run_in_fork + def test() -> None: + linuxnamespaces.populate_dev("/", "/mnt") + test() |