# Copyright 2024 Helmut Grohne # SPDX-License-Identifier: GPL-3 """Provide typed Python functions for a number of Linux system calls relevant for Linux namespaces including the new mount API. """ import asyncio import ctypes import dataclasses import enum import errno import logging import os import typing from .atlocation import AtFlags, AtLocation, AtLocationLike, PathConvertible logger = logging.getLogger(__name__) LIBC_SO = ctypes.CDLL(None, use_errno=True) class CloneFlags(enum.IntFlag): """This value may be supplied to * unshare(2) flags * clone3(2) clone_args.flags * setns(2) nstype """ NONE = 0x00000000 NEWTIME = 0x00000080 VM = 0x00000100 FS = 0x00000200 FILES = 0x00000400 SIGHAND = 0x00000800 PIDFD = 0x00001000 PTRACE = 0x00002000 VFORK = 0x00004000 PARENT = 0x00008000 THREAD = 0x00010000 NEWNS = 0x00020000 SYSVSEM = 0x00040000 SETTLS = 0x00080000 PARENT_SETTID = 0x00100000 CHILD_CLEARTID = 0x00200000 DETACHED = 0x00400000 UNTRACED = 0x00800000 CHILD_SETTID = 0x01000000 NEWCGROUP = 0x02000000 NEWUTS = 0x04000000 NEWIPC = 0x08000000 NEWUSER = 0x10000000 NEWPID = 0x20000000 NEWNET = 0x40000000 IO = 0x80000000 NS_FLAGS = ( NEWCGROUP | NEWIPC | NEWNET | NEWNS | NEWPID | NEWTIME | NEWUSER | NEWUTS ) UNSHARE_FLAGS = NS_FLAGS | FILES | FS | SYSVSEM class EventFDFlags(enum.IntFlag): """This value may be supplied as flags to eventfd(2).""" NONE = 0 CLOEXEC = os.EFD_CLOEXEC NONBLOCK = os.EFD_NONBLOCK SEMAPHORE = os.EFD_SEMAPHORE ALL_FLAGS = CLOEXEC | NONBLOCK | SEMAPHORE class MountFlags(enum.IntFlag): """This value may be supplied as mountflags to mount(2).""" NONE = 0 RDONLY = 1 << 0 NOSUID = 1 << 1 NODEV = 1 << 2 NOEXEC = 1 << 3 SYNCHRONOUS = 1 << 4 REMOUNT = 1 << 5 MANDLOCK = 1 << 6 DIRSYNC = 1 << 7 NOSYMFOLLOW = 1 << 8 # Bit 9 vanished NOATIME = 1 << 10 NODIRATIME = 1 << 11 BIND = 1 << 12 MOVE = 1 << 13 REC = 1 << 14 SILENT = 1 << 15 POSIXACL = 1 << 16 UNBINDABLE = 1 << 17 PRIVATE = 1 << 18 SLAVE = 1 << 19 SHARED = 1 << 20 RELATIME = 1 << 21 KERNMOUNT = 1 << 22 I_VERSION = 1 << 23 STRICTATIME = 1 << 24 LAZYTIME = 1 << 25 SUBMOUNT = 1 << 26 NOREMOTELOCK = 1 << 27 NOSEC = 1 << 28 BORN = 1 << 29 ACTIVE = 1 << 30 NOUSER = 1 << 31 PROPAGATION_FLAGS = UNBINDABLE | PRIVATE | SLAVE | SHARED # Map each flag to: # * The flag value # * Whether the flag value is negated # * Whether the flag must be negated # * Whether the flag can be negated __flagstrmap = { "acl": (POSIXACL, False, False, False), "async": (SYNCHRONOUS, True, False, False), "atime": (NOATIME, True, False, True), "bind": (BIND, False, False, False), "dev": (NODEV, True, False, True), "diratime": (NODIRATIME, True, False, True), "dirsync": (DIRSYNC, False, False, False), "exec": (NOEXEC, True, False, True), "iversion": (I_VERSION, False, False, True), "lazytime": (LAZYTIME, False, False, True), "loud": (SILENT, True, False, False), "mand": (MANDLOCK, False, False, True), "private": (PRIVATE, False, False, False), "rbind": (BIND | REC, False, False, False), "relatime": (RELATIME, False, False, True), "remount": (REMOUNT, False, False, True), "ro": (RDONLY, False, False, False), "rprivate": (PRIVATE | REC, False, False, False), "rshared": (SHARED | REC, False, False, False), "rslave": (SLAVE | REC, False, False, False), "runbindable": (UNBINDABLE | REC, False, False, False), "rw": (RDONLY, True, False, False), "shared": (SHARED, False, False, False), "silent": (SILENT, False, False, False), "slave": (SLAVE, False, False, False), "strictatime": (STRICTATIME, False, False, True), "suid": (NOSUID, True, False, True), "symfollow": (NOSYMFOLLOW, True, False, True), "sync": (SYNCHRONOUS, False, False, False), "unbindable": (UNBINDABLE, False, False, False), } def change(self, flagsstr: str) -> "MountFlags": """Return modified mount flags after applying comma-separated mount flags represented as a str. Raise a ValueError if any given flag does not correspond to a textual mount flag. """ ret = self for flagstr in flagsstr.split(","): if not flagstr: continue flag, negated, mustnegate, cannegate = self.__flagstrmap.get( flagstr.removeprefix("no"), (MountFlags.NONE, False, True, False), ) if mustnegate <= flagstr.startswith("no") <= cannegate: if negated ^ flagstr.startswith("no"): ret &= ~flag else: if flag & MountFlags.PROPAGATION_FLAGS: ret &= ~MountFlags.PROPAGATION_FLAGS ret |= flag else: raise ValueError(f"not a valid mount flag: {flagstr!r}") return ret @staticmethod def fromstr(flagsstr: str) -> "MountFlags": """Construct mount flags by changing flags according to the passed flagsstr using the change method on an initial value with all flags cleared. """ return MountFlags.NONE.change(flagsstr) __flagvals: list[tuple[int, str]] = sorted( [ (RDONLY, "ro"), (NOSUID, "nosuid"), (NODEV, "nodev"), (NOEXEC, "noexec"), (SYNCHRONOUS, "sync"), (REMOUNT, "remount"), (MANDLOCK, "mand"), (DIRSYNC, "dirsync"), (NOSYMFOLLOW, "nosymfollow"), (NOATIME, "noatime"), (NODIRATIME, "nodiratime"), (BIND, "bind"), (BIND | REC, "rbind"), (SILENT, "silent"), (POSIXACL, "acl"), (UNBINDABLE, "unbindable"), (UNBINDABLE | REC, "runbindable"), (PRIVATE, "private"), (PRIVATE | REC, "rprivate"), (SLAVE, "slave"), (SLAVE | REC, "rslave"), (SHARED, "shared"), (SHARED | REC, "rshared"), (RELATIME, "relatime"), (I_VERSION, "iversion"), (STRICTATIME, "strictatime"), (LAZYTIME, "lazytime"), ], reverse=True, ) def tostr(self) -> str: """Attempt to represent the flags in a comma-separated, textual way.""" if (self & MountFlags.PROPAGATION_FLAGS).bit_count() > 1: raise ValueError("cannot represent conflicting propagation flags") parts: list[str] = [] remain = self for val, text in MountFlags.__flagvals: # Older mypy think MountFlags.__flagvals and thus text was of type # MountFlags. assert isinstance(text, str) if remain & val == val: parts.insert(0, text) remain &= ~val if remain: raise ValueError("cannot represent flags {remain}") return ",".join(parts) class MountSetattrFlags(enum.IntFlag): """This value may be supplied as flags to mount_setattr(2).""" NONE = 0 AT_SYMLINK_NOFOLLOW = 0x100 AT_NO_AUTOMOUNT = 0x800 AT_EMPTY_PATH = 0x1000 AT_RECURSIVE = 0x8000 @staticmethod def from_atflags(flags: AtFlags) -> "MountSetattrFlags": ret = MountSetattrFlags.NONE if flags & AtFlags.AT_SYMLINK_NOFOLLOW: ret |= MountSetattrFlags.AT_SYMLINK_NOFOLLOW if flags & AtFlags.AT_NO_AUTOMOUNT: ret |= MountSetattrFlags.AT_NO_AUTOMOUNT if flags & AtFlags.AT_EMPTY_PATH: ret |= MountSetattrFlags.AT_EMPTY_PATH return ret class MountAttrFlags(enum.IntFlag): """This value may be supplied as attr->attr_set or attr->attr_clr to mount_setattr(2). """ NONE = 0x000000 RDONLY = 0x000001 # Mount read-only. NOSUID = 0x000002 # Ignore suid and sgid bits. NODEV = 0x000004 # Disallow access to device special files. NOEXEC = 0x000008 # Disallow program execution. RELATIME = 0x000000 # - Update atime relative to mtime/ctime. NOATIME = 0x000010 # - Do not update access times. STRICTATIME = 0x000020 # - Always perform atime updates _ATIME = 0x000070 | NOATIME | STRICTATIME # Setting on how atime should be updated. NODIRATIME = 0x000080 # Do not update directory access times. IDMAP = 0x100000 # Idmap mount to @userns_fd in struct mount_attr. NOSYMFOLLOW = 0x200000 # Do not follow symlinks. ALL_FLAGS = ( RDONLY | NOSYMFOLLOW | NODEV | NOEXEC | _ATIME | NODIRATIME | IDMAP | NOSYMFOLLOW ) class MountAttr(ctypes.Structure): """This value may be supplied to mount_setattr(2) as attr.""" _fields_ = [ ("attr_set", ctypes.c_ulonglong), ("attr_clr", ctypes.c_ulonglong), ("propagation", ctypes.c_ulonglong), ("userns_fd", ctypes.c_ulonglong), ] class MoveMountFlags(enum.IntFlag): """This value may be supplied to move_mount(2) as flags.""" NONE = 0x00000000 F_SYMLINKS = 0x00000001 # Follow symlinks on from path F_AUTOMOUNTS = 0x00000002 # Follow automounts on from path F_EMPTY_PATH = 0x00000004 # Empty from path permitted T_SYMLINKS = 0x00000010 # Follow symlinks on to path T_AUTOMOUNTS = 0x00000020 # Follow automounts on to path T_EMPTY_PATH = 0x00000040 # Empty to path permitted SET_GROUP = 0x00000100 # Set sharing group instead ALL_FLAGS = ( F_SYMLINKS | F_AUTOMOUNTS | F_EMPTY_PATH | T_SYMLINKS | T_AUTOMOUNTS | T_EMPTY_PATH | SET_GROUP ) class OpenTreeFlags(enum.IntFlag): """This value may be supplied to open_tree(2) as flags.""" NONE = 0 OPEN_TREE_CLONE = 0x1 OPEN_TREE_CLOEXEC = os.O_CLOEXEC AT_SYMLINK_NOFOLLOW = 0x100 AT_NO_AUTOMOUNT = 0x800 AT_EMPTY_PATH = 0x1000 AT_RECURSIVE = 0x8000 ALL_FLAGS = ( OPEN_TREE_CLONE | OPEN_TREE_CLOEXEC | AT_SYMLINK_NOFOLLOW | AT_NO_AUTOMOUNT | AT_EMPTY_PATH | AT_RECURSIVE ) class PrctlOption(enum.IntEnum): """This value may be supplied to prctl(2) as option.""" PR_SET_PDEATHSIG = 1 PR_SET_CHILD_SUBREAPER = 36 PR_CAP_AMBIENT = 47 class UmountFlags(enum.IntFlag): """This value may be supplied to umount2(2) as flags.""" NONE = 0 FORCE = 1 DETACH = 2 EXPIRE = 4 NOFOLLOW = 8 ALL_FLAGS = FORCE | DETACH | EXPIRE | NOFOLLOW def call_libc(funcname: str, *args: typing.Any) -> int: """Call a function from the C library with given args. This assumes that the function returns an integer that is non-negative on success. On failure, an OSError with errno is raised. """ logger.debug("calling libc function %s%r", funcname, args) ret: int = LIBC_SO[funcname](*args) logger.debug("%s returned %d", funcname, ret) if ret < 0: err = ctypes.get_errno() raise OSError( err, f"{funcname}() failed with error {err}: {os.strerror(err)}" ) return ret @dataclasses.dataclass class CapabilitySets: """Represent the main capability sets that capget/capset deal with.""" effective: int permitted: int inheritable: int @staticmethod def _create_header(pid: int) -> ctypes.Array[ctypes.c_uint32]: return (ctypes.c_uint32 * 2)( 0x20080522, # _LINUX_CAPABILITY_VERSION_3 pid, ) @classmethod def get(cls, pid: int = 0) -> "CapabilitySets": """Call capget to retrieve the current capability sets.""" header = cls._create_header(pid) data = (ctypes.c_uint32 * 6)() call_libc("capget", ctypes.byref(header), ctypes.byref(data)) return cls( (data[3] << 32) | data[0], (data[4] << 32) | data[1], (data[5] << 32) | data[2], ) def set(self, pid: int = 0) -> None: """Call capset to set the capabilities.""" header = self._create_header(pid) data = (ctypes.c_uint32 * 6)( self.effective & 0xffffffff, self.permitted & 0xffffffff, self.inheritable & 0xffffffff, self.effective >> 32, self.permitted >> 32, self.inheritable >> 32, ) call_libc("capset", ctypes.byref(header), ctypes.byref(data)) class EventFD: """Represent a file descriptor returned from eventfd(2).""" def __init__( self, initval: int = 0, flags: EventFDFlags = EventFDFlags.NONE ) -> None: if flags & ~EventFDFlags.ALL_FLAGS: raise ValueError("invalid flags for eventfd") self.fd = os.eventfd(initval, int(flags)) def read(self) -> int: """Decrease the value of the eventfd using eventfd_read.""" if self.fd < 0: raise ValueError("attempt to read from closed eventfd") return os.eventfd_read(self.fd) def __handle_readable(self, fd: int, fut: asyncio.Future[int]) -> None: """Internal helper of aread.""" try: if fd != self.fd: raise RuntimeError("EventFD file descriptor changed") try: result = self.read() except OSError as err: if err.errno == errno.EAGAIN: return raise except Exception as exc: fut.get_loop().remove_reader(fd) fut.set_exception(exc) else: fut.get_loop().remove_reader(fd) fut.set_result(result) def aread(self) -> typing.Awaitable[int]: """Decrease the value of the eventfd asynchronously. It must have been constructed using EventFDFlags.NONBLOCK. """ if self.fd < 0: raise ValueError("attempt to read from closed eventfd") loop = asyncio.get_running_loop() fut: asyncio.Future[int] = loop.create_future() loop.add_reader(self.fd, self.__handle_readable, self.fd, fut) return fut def write(self, value: int = 1) -> None: """Add the given value to the eventfd using eventfd_write.""" if self.fd < 0: raise ValueError("attempt to read from closed eventfd") os.eventfd_write(self.fd, value) def fileno(self) -> int: """Return the underlying file descriptor.""" return self.fd def close(self) -> None: """Close the underlying file descriptor.""" if self.fd >= 0: try: os.close(self.fd) finally: self.fd = -1 __del__ = close def __bool__(self) -> bool: """Return True unless the eventfd is closed.""" return self.fd >= 0 def __enter__(self) -> "EventFD": """When used as a context manager, the EventFD is closed on scope exit. """ return self def __exit__( self, exc_type: typing.Any, exc_value: typing.Any, traceback: typing.Any, ) -> None: self.close() def mount( source: PathConvertible, target: PathConvertible, filesystemtype: str | None, flags: MountFlags = MountFlags.NONE, data: str | list[str] | None = None, ) -> None: """Python wrapper for mount(2).""" if (flags & MountFlags.PROPAGATION_FLAGS).bit_count() > 1: raise ValueError("invalid flags for mount") if ( flags & MountFlags.PROPAGATION_FLAGS and flags & ~( MountFlags.PROPAGATION_FLAGS | MountFlags.REC | MountFlags.SILENT ) ): raise ValueError("invalid flags for mount") if isinstance(data, list): if any("," in s for s in data): raise ValueError("data elements must not contain a comma") data = ",".join(data) call_libc( "mount", os.fsencode(source), os.fsencode(target), None if filesystemtype is None else os.fsencode(filesystemtype), int(flags), None if data is None else os.fsencode(data), ) def mount_setattr( filesystem: AtLocationLike, recursive: bool, attr_set: MountAttrFlags = MountAttrFlags.NONE, attr_clr: MountAttrFlags = MountAttrFlags.NONE, propagation: int = 0, userns_fd: int = -1, ) -> None: """Python wrapper for mount_setattr(2).""" filesystem = AtLocation(filesystem) flags = MountSetattrFlags.from_atflags(filesystem.flags) if recursive: flags |= MountSetattrFlags.AT_RECURSIVE if attr_clr & MountAttrFlags.IDMAP: raise ValueError("cannot clear the MOUNT_ATTR_IDMAP flag") attr = MountAttr(attr_set, attr_clr, propagation, userns_fd) call_libc( "mount_setattr", filesystem.fd, os.fsencode(filesystem.location), int(flags), ctypes.byref(attr), ctypes.sizeof(attr), ) def move_mount( from_: AtLocationLike, to: AtLocationLike, flags: MoveMountFlags = MoveMountFlags.NONE, ) -> None: """Python wrapper for move_mount(2).""" from_ = AtLocation(from_) to = AtLocation(to) if flags & ~MoveMountFlags.ALL_FLAGS: raise ValueError("invalid flags for move_mount") if from_.flags & AtFlags.AT_SYMLINK_NOFOLLOW: flags &= ~MoveMountFlags.F_SYMLINKS else: flags |= MoveMountFlags.F_SYMLINKS if from_.flags & AtFlags.AT_NO_AUTOMOUNT: flags &= ~MoveMountFlags.F_AUTOMOUNTS else: flags |= MoveMountFlags.F_AUTOMOUNTS if from_.flags & AtFlags.AT_EMPTY_PATH: flags |= MoveMountFlags.F_EMPTY_PATH else: flags &= ~MoveMountFlags.F_EMPTY_PATH if to.flags & AtFlags.AT_SYMLINK_NOFOLLOW: flags &= ~MoveMountFlags.T_SYMLINKS else: flags |= MoveMountFlags.T_SYMLINKS if to.flags & AtFlags.AT_NO_AUTOMOUNT: flags &= ~MoveMountFlags.T_AUTOMOUNTS else: flags |= MoveMountFlags.T_AUTOMOUNTS if to.flags & AtFlags.AT_EMPTY_PATH: flags |= MoveMountFlags.T_EMPTY_PATH else: flags &= ~MoveMountFlags.T_EMPTY_PATH call_libc( "move_mount", from_.fd, os.fsencode(from_.location), to.fd, os.fsencode(to.location), int(flags), ) def open_tree( source: AtLocationLike, flags: OpenTreeFlags = OpenTreeFlags.NONE ) -> AtLocation: """Python wrapper for open_tree(2).""" source = AtLocation(source) if flags & ~OpenTreeFlags.ALL_FLAGS: raise ValueError("invalid flags for open_tree") if ( flags & OpenTreeFlags.AT_RECURSIVE and not flags & OpenTreeFlags.OPEN_TREE_CLONE ): raise ValueError("invalid flags for open_tree") if source.flags & AtFlags.AT_SYMLINK_NOFOLLOW: flags |= OpenTreeFlags.AT_SYMLINK_NOFOLLOW else: flags &= ~OpenTreeFlags.AT_SYMLINK_NOFOLLOW if source.flags & AtFlags.AT_NO_AUTOMOUNT: flags |= OpenTreeFlags.AT_NO_AUTOMOUNT else: flags &= ~OpenTreeFlags.AT_NO_AUTOMOUNT if source.flags & AtFlags.AT_EMPTY_PATH: flags |= OpenTreeFlags.AT_EMPTY_PATH else: flags &= ~OpenTreeFlags.AT_EMPTY_PATH return AtLocation( call_libc( "open_tree", source.fd, os.fsencode(source.location), int(flags) ) ) def pivot_root(new_root: PathConvertible, put_old: PathConvertible) -> None: """Python wrapper for pivot_root(2).""" call_libc("pivot_root", os.fsencode(new_root), os.fsencode(put_old)) def prctl( option: PrctlOption | int, arg2: int = 0, arg3: int = 0, arg4: int = 0, arg5: int = 0, ) -> int: """Python wrapper for prctl(2).""" return call_libc("prctl", int(option), arg2, arg3, arg4, arg5) def prctl_raise_ambient_capabilities(capabilities: int) -> None: """Raise all ambient capabilities in the given bitfield. If multiple bits are set, this results in multiple prctl(2) syscalls. """ while capabilities: cap = capabilities & (~capabilities + 1) capabilities ^= cap prctl( PrctlOption.PR_CAP_AMBIENT, 2, # PR_CAP_AMBIENT_RAISE cap.bit_length() - 1, ) def prctl_set_child_subreaper(enabled: bool = True) -> None: """Enable or disable being a child subreaper.""" prctl(PrctlOption.PR_SET_CHILD_SUBREAPER, int(enabled)) def prctl_set_pdeathsig(signum: int) -> None: """Set the parent-death signal of the calling process.""" if signum < 0: raise ValueError("invalid signal number") prctl(PrctlOption.PR_SET_PDEATHSIG, signum) def setns(fd: int, nstype: CloneFlags = CloneFlags.NONE) -> None: """Python wrapper for setns(2).""" if fd < 0: raise ValueError("invalid file descriptor") if nstype & ~CloneFlags.NS_FLAGS != 0: raise ValueError("invalid nstype for setns") call_libc("setns", fd, int(nstype)) def umount( path: PathConvertible, flags: UmountFlags = UmountFlags.NONE ) -> None: """Python wrapper for umount(2).""" if flags & ~UmountFlags.ALL_FLAGS: raise ValueError("umount flags out of range") if flags & UmountFlags.EXPIRE and flags & ( UmountFlags.FORCE | UmountFlags.DETACH ): raise ValueError("invalid flags for umount") call_libc("umount2", os.fsencode(path), int(flags)) def unshare(flags: CloneFlags) -> None: """Python wrapper for unshare(2).""" if flags & ~CloneFlags.UNSHARE_FLAGS: raise ValueError("invalid flags for unshare") call_libc("unshare", int(flags))