From 034f732a1af4ce295d993e6951decc4898967dd3 Mon Sep 17 00:00:00 2001 From: Helmut Grohne Date: Thu, 18 Jan 2024 22:13:03 +0100 Subject: initial checkin --- linuxnamespaces/syscalls.py | 504 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 504 insertions(+) create mode 100644 linuxnamespaces/syscalls.py (limited to 'linuxnamespaces/syscalls.py') diff --git a/linuxnamespaces/syscalls.py b/linuxnamespaces/syscalls.py new file mode 100644 index 0000000..0e33a44 --- /dev/null +++ b/linuxnamespaces/syscalls.py @@ -0,0 +1,504 @@ +# Copyright 2024 Helmut Grohne +# SPDX-License-Identifier: GPL-3 + +"""Provide typed Python functions for a number of Linux system calls relevant +for Linux namespaces including the new mount API. +""" + +import ctypes +import dataclasses +import enum +import os +import typing + +from .atlocation import AtFlags, AtLocation, AtLocationLike, PathConvertible + + +LIBC_SO = ctypes.CDLL("libc.so.6", use_errno=True) + + +class CloneFlags(enum.IntFlag): + """This value may be supplied to + * unshare(2) flags + * clone3(2) clone_args.flags + * setns(2) nstype + """ + + NONE = 0x00000000 + NEWTIME = 0x00000080 + VM = 0x00000100 + FS = 0x00000200 + FILES = 0x00000400 + SIGHAND = 0x00000800 + PIDFD = 0x00001000 + PTRACE = 0x00002000 + VFORK = 0x00004000 + PARENT = 0x00008000 + THREAD = 0x00010000 + NEWNS = 0x00020000 + SYSVSEM = 0x00040000 + SETTLS = 0x00080000 + PARENT_SETTID = 0x00100000 + CHILD_CLEARTID = 0x00200000 + DETACHED = 0x00400000 + UNTRACED = 0x00800000 + CHILD_SETTID = 0x01000000 + NEWCGROUP = 0x02000000 + NEWUTS = 0x04000000 + NEWIPC = 0x08000000 + NEWUSER = 0x10000000 + NEWPID = 0x20000000 + NEWNET = 0x40000000 + IO = 0x80000000 + NS_FLAGS = ( + NEWCGROUP + | NEWIPC + | NEWNET + | NEWNS + | NEWPID + | NEWTIME + | NEWUSER + | NEWUTS + ) + UNSHARE_FLAGS = NS_FLAGS | FILES | FS | SYSVSEM + + +class EventFDFlags(enum.IntFlag): + """This value may be supplied as flags to eventfd(2).""" + + NONE = 0 + CLOEXEC = 0o2000000 + NONBLOCK = 0o4000 + SEMAPHORE = 0o1 + ALL_FLAGS = CLOEXEC | NONBLOCK | SEMAPHORE + + +class MountFlags(enum.IntFlag): + """This value may be supplied as mountflags to mount(2).""" + + NONE = 0 + RDONLY = 1 << 0 + NOSUID = 1 << 1 + NODEV = 1 << 2 + NOEXEC = 1 << 3 + SYNCHRONOUS = 1 << 4 + REMOUNT = 1 << 5 + MANDLOCK = 1 << 6 + DIRSYNC = 1 << 7 + NOSYMFOLLOW = 1 << 8 + # Bit 9 vanished + NOATIME = 1 << 10 + NODIRATIME = 1 << 11 + BIND = 1 << 12 + MOVE = 1 << 13 + REC = 1 << 14 + SILENT = 1 << 15 + POSIXACL = 1 << 16 + UNBINDABLE = 1 << 17 + PRIVATE = 1 << 18 + SLAVE = 1 << 19 + SHARED = 1 << 20 + RELATIME = 1 << 21 + KERNMOUNT = 1 << 22 + I_VERSION = 1 << 23 + STRICTATIME = 1 << 24 + LAZYTIME = 1 << 25 + SUBMOUNT = 1 << 26 + NOREMOTELOCK = 1 << 27 + NOSEC = 1 << 28 + BORN = 1 << 29 + ACTIVE = 1 << 30 + NOUSER = 1 << 31 + + PROPAGATION_FLAGS = UNBINDABLE | PRIVATE | SLAVE | SHARED + + +class MountSetattrFlags(enum.IntFlag): + """This value may be supplied as flags to mount_setattr(2).""" + + NONE = 0 + AT_SYMLINK_NOFOLLOW = 0x100 + AT_NO_AUTOMOUNT = 0x800 + AT_EMPTY_PATH = 0x1000 + AT_RECURSIVE = 0x8000 + + @staticmethod + def from_atflags(flags: AtFlags) -> "MountSetattrFlags": + ret = MountSetattrFlags.NONE + if flags & AtFlags.AT_SYMLINK_NOFOLLOW: + ret |= MountSetattrFlags.AT_SYMLINK_NOFOLLOW + if flags & AtFlags.AT_NO_AUTOMOUNT: + ret |= MountSetattrFlags.AT_NO_AUTOMOUNT + if flags & AtFlags.AT_EMPTY_PATH: + ret |= MountSetattrFlags.AT_EMPTY_PATH + return ret + + +class MountAttrFlags(enum.IntFlag): + """This value may be supplied as attr->attr_set or attr->attr_clr to + mount_setattr(2). + """ + + NONE = 0x000000 + RDONLY = 0x000001 # Mount read-only. + NOSUID = 0x000002 # Ignore suid and sgid bits. + NODEV = 0x000004 # Disallow access to device special files. + NOEXEC = 0x000008 # Disallow program execution. + RELATIME = 0x000000 # - Update atime relative to mtime/ctime. + NOATIME = 0x000010 # - Do not update access times. + STRICTATIME = 0x000020 # - Always perform atime updates + _ATIME = 0x000070 | NOATIME | STRICTATIME + # Setting on how atime should be updated. + NODIRATIME = 0x000080 # Do not update directory access times. + IDMAP = 0x100000 # Idmap mount to @userns_fd in struct mount_attr. + NOSYMFOLLOW = 0x200000 # Do not follow symlinks. + + ALL_FLAGS = ( + RDONLY + | NOSYMFOLLOW + | NODEV + | NOEXEC + | _ATIME + | NODIRATIME + | IDMAP + | NOSYMFOLLOW + ) + + +class MountAttr(ctypes.Structure): + """This value may be supplied to mount_setattr(2) as attr.""" + + _fields_ = [ + ("attr_set", ctypes.c_ulonglong), + ("attr_clr", ctypes.c_ulonglong), + ("propagation", ctypes.c_ulonglong), + ("userns_fd", ctypes.c_ulonglong), + ] + + +class MoveMountFlags(enum.IntFlag): + """This value may be supplied to move_mount(2) as flags.""" + + NONE = 0x00000000 + F_SYMLINKS = 0x00000001 # Follow symlinks on from path + F_AUTOMOUNTS = 0x00000002 # Follow automounts on from path + F_EMPTY_PATH = 0x00000004 # Empty from path permitted + T_SYMLINKS = 0x00000010 # Follow symlinks on to path + T_AUTOMOUNTS = 0x00000020 # Follow automounts on to path + T_EMPTY_PATH = 0x00000040 # Empty to path permitted + SET_GROUP = 0x00000100 # Set sharing group instead + ALL_FLAGS = ( + F_SYMLINKS + | F_AUTOMOUNTS + | F_EMPTY_PATH + | T_SYMLINKS + | T_AUTOMOUNTS + | T_EMPTY_PATH + | SET_GROUP + ) + + +class OpenTreeFlags(enum.IntFlag): + """This value may be supplied to open_tree(2) as flags.""" + + NONE = 0 + OPEN_TREE_CLONE = 0x1 + OPEN_TREE_CLOEXEC = os.O_CLOEXEC + AT_SYMLINK_NOFOLLOW = 0x100 + AT_NO_AUTOMOUNT = 0x800 + AT_EMPTY_PATH = 0x1000 + AT_RECURSIVE = 0x8000 + ALL_FLAGS = ( + OPEN_TREE_CLONE + | OPEN_TREE_CLOEXEC + | AT_SYMLINK_NOFOLLOW + | AT_NO_AUTOMOUNT + | AT_EMPTY_PATH + | AT_RECURSIVE + ) + + +class UmountFlags(enum.IntFlag): + """This value may be supplied to umount2(2) as flags.""" + + NONE = 0 + FORCE = 1 + DETACH = 2 + EXPIRE = 4 + NOFOLLOW = 8 + ALL_FLAGS = FORCE | DETACH | EXPIRE | NOFOLLOW + + +def call_libc(funcname: str, *args: typing.Any) -> int: + """Call a function from the C library with given args. This assumes that + the function returns an integer that is non-negative on success. On + failure, an OSError with errno is raised. + """ + ret: int = LIBC_SO[funcname](*args) + if ret < 0: + err = ctypes.get_errno() + raise OSError( + err, f"{funcname}() failed with error {err}: {os.strerror(err)}" + ) + return ret + + +@dataclasses.dataclass +class CapabilitySets: + """Represent the main capability sets that capget/capset deal with.""" + + effective: int + permitted: int + inheritable: int + + @staticmethod + def _create_header(pid: int) -> ctypes.Array[ctypes.c_uint32]: + return (ctypes.c_uint32 * 2)( + 0x20080522, # _LINUX_CAPABILITY_VERSION_3 + pid, + ) + + @classmethod + def get(cls, pid: int = 0) -> "CapabilitySets": + """Call capget to retrieve the current capability sets.""" + header = cls._create_header(pid) + data = (ctypes.c_uint32 * 6)() + call_libc("capget", ctypes.byref(header), ctypes.byref(data)) + return cls( + (data[3] << 32) | data[0], + (data[4] << 32) | data[1], + (data[5] << 32) | data[2], + ) + + def set(self, pid: int = 0) -> None: + """Call capset to set the capabilities.""" + header = self._create_header(pid) + data = (ctypes.c_uint32 * 6)( + self.effective & 0xffffffff, + self.permitted & 0xffffffff, + self.inheritable & 0xffffffff, + self.effective >> 32, + self.permitted >> 32, + self.inheritable >> 32, + ) + call_libc("capset", ctypes.byref(header), ctypes.byref(data)) + + +class EventFD: + """Represent a file decriptor returned from eventfd(2).""" + + def __init__( + self, initval: int = 0, flags: EventFDFlags = EventFDFlags.NONE + ) -> None: + if flags & ~EventFDFlags.ALL_FLAGS: + raise ValueError("invalid flags for eventfd") + self.fd = call_libc("eventfd", initval, int(flags)) + + def read(self) -> int: + """Decrease the value of the eventfd using eventfd_read.""" + if self.fd < 0: + raise ValueError("attempt to read from closed eventfd") + cvalue = ctypes.c_ulonglong() + call_libc("eventfd_read", self.fd, ctypes.byref(cvalue)) + return cvalue.value + + def write(self, value: int = 1) -> None: + """Add the given value to the eventfd using eventfd_write.""" + if self.fd < 0: + raise ValueError("attempt to read from closed eventfd") + if value < 0 or (value >> 64): + raise ValueError("value for eventfd_write out of range") + call_libc("eventfd_write", self.fd, ctypes.c_ulonglong(value)) + + def fileno(self) -> int: + """Return the underlying file descriptor.""" + return self.fd + + def close(self) -> None: + """Close the underlying file descriptor.""" + if self.fd >= 0: + try: + os.close(self.fd) + finally: + self.fd = -1 + + __del__ = close + + def __bool__(self) -> bool: + """Return True unless the eventfd is closed.""" + return self.fd >= 0 + + def __enter__(self) -> "EventFD": + """When used as a context manager, the EventFD is closed on scope exit. + """ + return self + + def __exit__( + self, + exc_type: typing.Any, + exc_value: typing.Any, + traceback: typing.Any, + ) -> None: + self.close() + + +def mount( + source: PathConvertible, + target: PathConvertible, + filesystemtype: str | None, + flags: MountFlags = MountFlags.NONE, + data: str | None = None, +) -> None: + """Python wrapper for mount(2).""" + if (flags & MountFlags.PROPAGATION_FLAGS).bit_count() > 1: + raise ValueError("invalid flags for mount") + if ( + flags & MountFlags.PROPAGATION_FLAGS + and flags & ~( + MountFlags.PROPAGATION_FLAGS | MountFlags.REC | MountFlags.SILENT + ) + ): + raise ValueError("invalid flags for mount") + call_libc( + "mount", + os.fsencode(source), + os.fsencode(target), + None if filesystemtype is None else os.fsencode(filesystemtype), + int(flags), + None if data is None else os.fsencode(data), + ) + + +def mount_setattr( + filesystem: AtLocationLike, + recursive: bool, + attr_set: MountAttrFlags = MountAttrFlags.NONE, + attr_clr: MountAttrFlags = MountAttrFlags.NONE, + propagation: int = 0, + userns_fd: int = -1, +) -> None: + """Python wrapper for mount_setattr(2).""" + filesystem = AtLocation(filesystem) + flags = MountSetattrFlags.from_atflags(filesystem.flags) + if recursive: + flags |= MountSetattrFlags.AT_RECURSIVE + if attr_clr & MountAttrFlags.IDMAP: + raise ValueError("cannot clear the MOUNT_ATTR_IDMAP flag") + attr = MountAttr(attr_set, attr_clr, propagation, userns_fd) + call_libc( + "mount_setattr", + filesystem.fd, + os.fsencode(filesystem.location), + int(flags), + ctypes.byref(attr), + ctypes.sizeof(attr), + ) + + +def move_mount( + from_: AtLocationLike, + to: AtLocationLike, + flags: MoveMountFlags = MoveMountFlags.NONE, +) -> None: + """Python wrapper for move_mount(2).""" + from_ = AtLocation(from_) + to = AtLocation(to) + if flags & ~MoveMountFlags.ALL_FLAGS: + raise ValueError("invalid flags for move_mount") + if from_.flags & AtFlags.AT_SYMLINK_NOFOLLOW: + flags &= ~MoveMountFlags.F_SYMLINKS + else: + flags |= MoveMountFlags.F_SYMLINKS + if from_.flags & AtFlags.AT_NO_AUTOMOUNT: + flags &= ~MoveMountFlags.F_AUTOMOUNTS + else: + flags |= MoveMountFlags.F_AUTOMOUNTS + if from_.flags & AtFlags.AT_EMPTY_PATH: + flags |= MoveMountFlags.F_EMPTY_PATH + else: + flags &= ~MoveMountFlags.F_EMPTY_PATH + if to.flags & AtFlags.AT_SYMLINK_NOFOLLOW: + flags &= ~MoveMountFlags.T_SYMLINKS + else: + flags |= MoveMountFlags.T_SYMLINKS + if to.flags & AtFlags.AT_NO_AUTOMOUNT: + flags &= ~MoveMountFlags.T_AUTOMOUNTS + else: + flags |= MoveMountFlags.T_AUTOMOUNTS + if to.flags & AtFlags.AT_EMPTY_PATH: + flags |= MoveMountFlags.T_EMPTY_PATH + else: + flags &= ~MoveMountFlags.T_EMPTY_PATH + call_libc( + "move_mount", + from_.fd, + os.fsencode(from_.location), + to.fd, + os.fsencode(to.location), + int(flags), + ) + + +def open_tree( + source: AtLocationLike, flags: OpenTreeFlags = OpenTreeFlags.NONE +) -> AtLocation: + """Python wrapper for open_tree(2).""" + source = AtLocation(source) + if flags & ~OpenTreeFlags.ALL_FLAGS: + raise ValueError("invalid flags for open_tree") + if ( + flags & OpenTreeFlags.AT_RECURSIVE + and not flags & OpenTreeFlags.OPEN_TREE_CLONE + ): + raise ValueError("invalid flags for open_tree") + if source.flags & AtFlags.AT_SYMLINK_NOFOLLOW: + flags |= OpenTreeFlags.AT_SYMLINK_NOFOLLOW + else: + flags &= ~OpenTreeFlags.AT_SYMLINK_NOFOLLOW + if source.flags & AtFlags.AT_NO_AUTOMOUNT: + flags |= OpenTreeFlags.AT_NO_AUTOMOUNT + else: + flags &= ~OpenTreeFlags.AT_NO_AUTOMOUNT + if source.flags & AtFlags.AT_EMPTY_PATH: + flags |= OpenTreeFlags.AT_EMPTY_PATH + else: + flags &= ~OpenTreeFlags.AT_EMPTY_PATH + return AtLocation( + call_libc( + "open_tree", source.fd, os.fsencode(source.location), int(flags) + ) + ) + + +def pivot_root(new_root: PathConvertible, put_old: PathConvertible) -> None: + """Python wrapper for pivot_root(2).""" + call_libc("pivot_root", os.fsencode(new_root), os.fsencode(put_old)) + + +def setns(fd: int, nstype: CloneFlags = CloneFlags.NONE) -> None: + """Python wrapper for setns(2).""" + if fd < 0: + raise ValueError("invalid file descriptor") + if nstype & ~CloneFlags.NS_FLAGS != 0: + raise ValueError("invalid nstype for setns") + call_libc("setns", fd, int(nstype)) + + +def umount( + path: PathConvertible, flags: UmountFlags = UmountFlags.NONE +) -> None: + """Python wrapper for umount(2).""" + if flags & ~UmountFlags.ALL_FLAGS: + raise ValueError("umount flags out of range") + if flags & UmountFlags.EXPIRE and flags & ( + UmountFlags.FORCE | UmountFlags.DETACH + ): + raise ValueError("invalid flags for umount") + call_libc("umount2", os.fsencode(path), int(flags)) + + +def unshare(flags: CloneFlags) -> None: + """Python wrapper for unshare(2).""" + if flags & ~CloneFlags.UNSHARE_FLAGS: + raise ValueError("invalid flags for unshare") + call_libc("unshare", int(flags)) -- cgit v1.2.3