# Copyright 2024 Helmut Grohne # SPDX-License-Identifier: GPL-3 """Provide plumbing-layer functionality for working with Linux namespaces in Python. """ import bisect import contextlib import dataclasses import os import pathlib import stat import subprocess import sys import typing from .atlocation import * from .syscalls import * def subidranges( kind: typing.Literal["uid", "gid"], login: str | None = None ) -> typing.Iterator[tuple[int, int]]: """Parse a `/etc/sub?id` file for ranges allocated to the given or current user. Return all ranges as (start, count) pairs. """ if login is None: login = os.getlogin() with open(f"/etc/sub{kind}") as filelike: for line in filelike: parts = line.strip().split(":") if parts[0] == login: yield (int(parts[1]), int(parts[2])) @dataclasses.dataclass(frozen=True) class IDMapping: """Represent one range in a user or goup id mapping.""" innerstart: int outerstart: int count: int def __post_init__(self) -> None: if self.outerstart < 0: raise ValueError("outerstart must not be negative") if self.innerstart < 0: raise ValueError("innerstart must not be negative") if self.count <= 0: raise ValueError("count must be positive") if self.outerstart + self.count >= 1 << 64: raise ValueError("outerstart + count exceed 64bits") if self.innerstart + self.count >= 1 << 64: raise ValueError("innerstart + count exceed 64bits") class IDAllocation: """This represents a subset of IDs (user or group). It can be used to allocate a continguous range for use with a user namespace. """ def __init__(self) -> None: self.ranges: list[tuple[int, int]] = [] def add_range(self, start: int, count: int) -> None: """Add count ids starting from start to this allocation.""" if start < 0 or count <= 0: raise ValueError("invalid range") index = bisect.bisect_right(self.ranges, (start, 0)) prevrange = None if index > 0: prevrange = self.ranges[index - 1] if prevrange[0] + prevrange[1] > start: raise ValueError("attempt to add overlapping range") nextrange = None if index < len(self.ranges): nextrange = self.ranges[index] if nextrange[0] < start + count: raise ValueError("attempt to add overlapping range") if prevrange and prevrange[0] + prevrange[1] == start: if nextrange and nextrange[0] == start + count: self.ranges[index - 1] = ( prevrange[0], prevrange[1] + count + nextrange[1], ) del self.ranges[index] else: self.ranges[index - 1] = (prevrange[0], prevrange[1] + count) elif nextrange and nextrange[0] == start + count: self.ranges[index] = (start, count + nextrange[1]) else: self.ranges.insert(index, (start, count)) @classmethod def loadsubid( cls, kind: typing.Literal["uid", "gid"], login: str | None = None, ) -> "IDAllocation": """Load a `/etc/sub?id` file and return ids allocated to the given login or current user. """ self = cls() for start, count in subidranges(kind, login): self.add_range(start, count) return self def find(self, count: int) -> int: """Locate count continguous ids from this allocation. The start of the allocation is returned. The allocation object is left unchanged. """ for start, available in self.ranges: if available >= count: return start raise ValueError("could not satisfy allocation request") def allocate(self, count: int) -> int: """Allocate count contiguous ids from this allocation. The start of the allocation is returned and the ids are removed from this IDAllocation object. """ for index, (start, available) in enumerate(self.ranges): if available > count: self.ranges[index] = (start + count, available - count) return start if available == count: del self.ranges[index] return start raise ValueError("could not satisfy allocation request") def allocatemap(self, count: int, target: int = 0) -> IDMapping: """Allocate count contiguous ids from this allocation. An IDMapping with its innerstart set to target is returned. The allocation is removed from this IDAllocation object. """ return IDMapping(target, self.allocate(count), count) def newidmap( kind: typing.Literal["uid", "gid"], pid: int, mapping: list[IDMapping], helper: bool | None = None, ) -> None: """Apply the given uid or gid mapping to the given process. A positive pid identifies a process, other values identify the currently running process. Whether setuid binaries newuidmap and newgidmap are used is determined via the helper argument. A None value indicate automatic detection of whether a helper is required for setting up the given mapping. """ assert kind in ("uid", "gid") if pid <= 0: pid = os.getpid() if helper is None: # We cannot reliably test whether we have the right EUID and we don't # implement checking whether setgroups has been denied either. Please # be explicit about the helper choice in such cases. helper = len(mapping) > 1 or mapping[0].count > 1 if helper: argv = [f"new{kind}map", str(pid)] for idblock in mapping: argv.extend(map(str, dataclasses.astuple(idblock))) subprocess.check_call(argv) else: pathlib.Path(f"/proc/{pid}/{kind}_map").write_text( "".join( "%d %d %d\n" % dataclasses.astuple(idblock) for idblock in mapping ), encoding="ascii", ) def newuidmap(pid: int, mapping: list[IDMapping], helper: bool = True) -> None: """Apply a given uid mapping to the given process. Refer to newidmap for details. """ newidmap("uid", pid, mapping, helper) def newgidmap(pid: int, mapping: list[IDMapping], helper: bool = True) -> None: """Apply a given gid mapping to the given process. Refer to newidmap for details. """ newidmap("gid", pid, mapping, helper) def newidmaps( pid: int, uidmapping: list[IDMapping], gidmapping: list[IDMapping], helper: bool = True, ) -> None: """Appply a given uid and gid mapping to the given process. Refer to newidmap for details. """ newgidmap(pid, gidmapping, helper) newuidmap(pid, uidmapping, helper) class run_in_fork: """Decorator for running the decorated function once in a separate process. """ def __init__(self, function: typing.Callable[[], None]): """Fork a new process that will eventually run the given function and then exit. """ self.efd = EventFD() self.pid = os.fork() if self.pid == 0: self.efd.read() self.efd.close() function() os._exit(0) def start(self) -> None: """Start the decorated function. It can only be started once.""" if not self.efd: raise ValueError("this function can only be called once") self.efd.write(1) self.efd.close() def wait(self) -> None: """Wait for the process running the decorated function to finish.""" if self.efd: raise ValueError("start must be called before wait") ret = os.waitpid(self.pid, 0) if ret != (self.pid, 0): raise ValueError("something failed") def __call__(self) -> None: """Start the decorated function and wait for its process to finish.""" self.start() self.wait() def bind_mount( source: AtLocationLike, target: AtLocationLike, recursive: bool = False, readonly: bool = False, ) -> None: """Create a bind mount from source to target. Depending on whether one of the locations involves a file descriptor or not, the new or old mount API will be used. """ source = AtLocation(source) target = AtLocation(target) try: srcloc = os.fspath(source) tgtloc = os.fspath(target) except ValueError: otflags = OpenTreeFlags.OPEN_TREE_CLONE if recursive: otflags |= OpenTreeFlags.AT_RECURSIVE with open_tree(source, otflags) as srcfd: if readonly: mount_setattr(srcfd, recursive, MountAttrFlags.RDONLY) return move_mount(srcfd, target) else: mflags = MountFlags.BIND if recursive: mflags |= MountFlags.REC if readonly: mflags |= MountFlags.RDONLY return mount(srcloc, tgtloc, None, mflags) _P = typing.ParamSpec("_P") class _ExceptionExitCallback: """Helper class that invokes a callback when a context manager exists with a failure. """ def __init__( self, callback: typing.Callable[_P, typing.Any], *args: _P.args, **kwargs: _P.kwargs, ) -> None: self.callback = callback self.args = args self.kwargs = kwargs def __enter__(self) -> None: pass def __exit__(self, exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: typing.Any, ) -> None: if exc_type is not None: self.callback(*self.args, **self.kwargs) def populate_dev( origroot: AtLocationLike, newroot: PathConvertible, *, fuse: bool = True, pidns: bool = True, tun: bool = True, ) -> None: """Mount a tmpfs to the dev directory beneath newroot and populate it with basic devices by bind mounting them from the dev directory beneath origroot. Also mount a new pts instance. """ origdev = AtLocation(origroot) / "dev" newdev = AtLocation(newroot) / "dev" bind_devices = "null zero full random urandom tty".split() if fuse: bind_devices.append("fuse") bind_directories = [] mount( "devtmpfs", newdev, "tmpfs", MountFlags.NOSUID | MountFlags.NOEXEC, "mode=0755", ) with _ExceptionExitCallback(umount, newdev, UmountFlags.DETACH): if pidns: (newdev / "pts").mkdir() (newdev / "pts").chmod(0o755) mount( "devpts", newdev / "pts", "devpts", MountFlags.NOSUID | MountFlags.NOEXEC, "gid=5,mode=620,ptmxmode=666", ) (newdev / "ptmx").symlink("pts/ptmx") else: bind_devices.append("ptmx") bind_directories.append("pts") if tun: (newdev / "net").mkdir() (newdev / "net").chmod(0o755) bind_devices.append("net/tun") for node in bind_devices: (newdev / node).mknod(stat.S_IFREG) bind_mount(origdev / node, newdev / node, True) for node in bind_directories: (newdev / node).mkdir() bind_mount(origdev / node, newdev / node, True) def populate_sys( origroot: AtLocationLike, newroot: PathConvertible, rootcgroup: PathConvertible | None = None, module: bool = True, ) -> None: """Create a /sys hierarchy below newroot. Bind the cgroup hiearchy. The cgroup hierarchy will be mounted read-only if mounting the root group. """ newsys = AtLocation(newroot) / "sys" mflags = MountFlags.NOSUID | MountFlags.NOEXEC | MountFlags.NODEV if rootcgroup is None: rootcgroup = "" else: rootcgroup = pathlib.PurePath(rootcgroup).relative_to("/") with contextlib.ExitStack() as exitstack: cgfd = exitstack.enter_context( open_tree( AtLocation(origroot) / "sys/fs/cgroup" / rootcgroup, OpenTreeFlags.OPEN_TREE_CLONE | OpenTreeFlags.AT_RECURSIVE, ), ) if rootcgroup: mount_setattr(cgfd, True, MountAttrFlags.RDONLY) if module: modfd = exitstack.enter_context( open_tree( AtLocation(origroot) / "sys/module", OpenTreeFlags.OPEN_TREE_CLONE | OpenTreeFlags.AT_RECURSIVE, ), ) mount_setattr(modfd, True, MountAttrFlags.RDONLY) mount("sysfs", newsys, "tmpfs", mflags, "mode=0755") exitstack.enter_context( _ExceptionExitCallback(umount, newsys, UmountFlags.DETACH) ) for subdir in ("fs", "fs/cgroup", "module"): (newsys / subdir).mkdir() (newsys / subdir).chmod(0o755) mflags |= MountFlags.REMOUNT | MountFlags.RDONLY mount("sysfs", newsys, "tmpfs", mflags, "mode=0755") move_mount(cgfd, newsys / "fs/cgroup") if module: move_mount(modfd, newsys / "module") def unshare_user_idmap( uidmap: list[IDMapping], gidmap: list[IDMapping], flags: CloneFlags = CloneFlags.NEWUSER, ) -> None: """Unshare the given namespaces (must include user) and set up the given id mappings. """ pid = os.getpid() @run_in_fork def setup_idmaps() -> None: newidmaps(pid, uidmap, gidmap) unshare(flags) setup_idmaps() def unshare_user_idmap_nohelper( uid: int, gid: int, flags: CloneFlags = CloneFlags.NEWUSER ) -> None: """Unshare the given namespaces (must include user) and map the current user and group to the given uid and gid without using the setuid helpers. """ uidmap = IDMapping(uid, os.getuid(), 1) gidmap = IDMapping(gid, os.getgid(), 1) unshare(flags) pathlib.Path("/proc/self/setgroups").write_bytes(b"deny") newidmaps(-1, [uidmap], [gidmap], False)