summaryrefslogtreecommitdiff
path: root/linuxnamespaces/syscalls.py
diff options
context:
space:
mode:
authorHelmut Grohne <helmut@subdivi.de>2024-01-18 22:13:03 +0100
committerHelmut Grohne <helmut@subdivi.de>2024-01-18 22:13:03 +0100
commit034f732a1af4ce295d993e6951decc4898967dd3 (patch)
tree5a49cc7b5f5db586ae67f5276071ab525ef832f3 /linuxnamespaces/syscalls.py
downloadpython-linuxnamespaces-034f732a1af4ce295d993e6951decc4898967dd3.tar.gz
initial checkin
Diffstat (limited to 'linuxnamespaces/syscalls.py')
-rw-r--r--linuxnamespaces/syscalls.py504
1 files changed, 504 insertions, 0 deletions
diff --git a/linuxnamespaces/syscalls.py b/linuxnamespaces/syscalls.py
new file mode 100644
index 0000000..0e33a44
--- /dev/null
+++ b/linuxnamespaces/syscalls.py
@@ -0,0 +1,504 @@
+# Copyright 2024 Helmut Grohne <helmut@subdivi.de>
+# SPDX-License-Identifier: GPL-3
+
+"""Provide typed Python functions for a number of Linux system calls relevant
+for Linux namespaces including the new mount API.
+"""
+
+import ctypes
+import dataclasses
+import enum
+import os
+import typing
+
+from .atlocation import AtFlags, AtLocation, AtLocationLike, PathConvertible
+
+
+LIBC_SO = ctypes.CDLL("libc.so.6", use_errno=True)
+
+
+class CloneFlags(enum.IntFlag):
+ """This value may be supplied to
+ * unshare(2) flags
+ * clone3(2) clone_args.flags
+ * setns(2) nstype
+ """
+
+ NONE = 0x00000000
+ NEWTIME = 0x00000080
+ VM = 0x00000100
+ FS = 0x00000200
+ FILES = 0x00000400
+ SIGHAND = 0x00000800
+ PIDFD = 0x00001000
+ PTRACE = 0x00002000
+ VFORK = 0x00004000
+ PARENT = 0x00008000
+ THREAD = 0x00010000
+ NEWNS = 0x00020000
+ SYSVSEM = 0x00040000
+ SETTLS = 0x00080000
+ PARENT_SETTID = 0x00100000
+ CHILD_CLEARTID = 0x00200000
+ DETACHED = 0x00400000
+ UNTRACED = 0x00800000
+ CHILD_SETTID = 0x01000000
+ NEWCGROUP = 0x02000000
+ NEWUTS = 0x04000000
+ NEWIPC = 0x08000000
+ NEWUSER = 0x10000000
+ NEWPID = 0x20000000
+ NEWNET = 0x40000000
+ IO = 0x80000000
+ NS_FLAGS = (
+ NEWCGROUP
+ | NEWIPC
+ | NEWNET
+ | NEWNS
+ | NEWPID
+ | NEWTIME
+ | NEWUSER
+ | NEWUTS
+ )
+ UNSHARE_FLAGS = NS_FLAGS | FILES | FS | SYSVSEM
+
+
+class EventFDFlags(enum.IntFlag):
+ """This value may be supplied as flags to eventfd(2)."""
+
+ NONE = 0
+ CLOEXEC = 0o2000000
+ NONBLOCK = 0o4000
+ SEMAPHORE = 0o1
+ ALL_FLAGS = CLOEXEC | NONBLOCK | SEMAPHORE
+
+
+class MountFlags(enum.IntFlag):
+ """This value may be supplied as mountflags to mount(2)."""
+
+ NONE = 0
+ RDONLY = 1 << 0
+ NOSUID = 1 << 1
+ NODEV = 1 << 2
+ NOEXEC = 1 << 3
+ SYNCHRONOUS = 1 << 4
+ REMOUNT = 1 << 5
+ MANDLOCK = 1 << 6
+ DIRSYNC = 1 << 7
+ NOSYMFOLLOW = 1 << 8
+ # Bit 9 vanished
+ NOATIME = 1 << 10
+ NODIRATIME = 1 << 11
+ BIND = 1 << 12
+ MOVE = 1 << 13
+ REC = 1 << 14
+ SILENT = 1 << 15
+ POSIXACL = 1 << 16
+ UNBINDABLE = 1 << 17
+ PRIVATE = 1 << 18
+ SLAVE = 1 << 19
+ SHARED = 1 << 20
+ RELATIME = 1 << 21
+ KERNMOUNT = 1 << 22
+ I_VERSION = 1 << 23
+ STRICTATIME = 1 << 24
+ LAZYTIME = 1 << 25
+ SUBMOUNT = 1 << 26
+ NOREMOTELOCK = 1 << 27
+ NOSEC = 1 << 28
+ BORN = 1 << 29
+ ACTIVE = 1 << 30
+ NOUSER = 1 << 31
+
+ PROPAGATION_FLAGS = UNBINDABLE | PRIVATE | SLAVE | SHARED
+
+
+class MountSetattrFlags(enum.IntFlag):
+ """This value may be supplied as flags to mount_setattr(2)."""
+
+ NONE = 0
+ AT_SYMLINK_NOFOLLOW = 0x100
+ AT_NO_AUTOMOUNT = 0x800
+ AT_EMPTY_PATH = 0x1000
+ AT_RECURSIVE = 0x8000
+
+ @staticmethod
+ def from_atflags(flags: AtFlags) -> "MountSetattrFlags":
+ ret = MountSetattrFlags.NONE
+ if flags & AtFlags.AT_SYMLINK_NOFOLLOW:
+ ret |= MountSetattrFlags.AT_SYMLINK_NOFOLLOW
+ if flags & AtFlags.AT_NO_AUTOMOUNT:
+ ret |= MountSetattrFlags.AT_NO_AUTOMOUNT
+ if flags & AtFlags.AT_EMPTY_PATH:
+ ret |= MountSetattrFlags.AT_EMPTY_PATH
+ return ret
+
+
+class MountAttrFlags(enum.IntFlag):
+ """This value may be supplied as attr->attr_set or attr->attr_clr to
+ mount_setattr(2).
+ """
+
+ NONE = 0x000000
+ RDONLY = 0x000001 # Mount read-only.
+ NOSUID = 0x000002 # Ignore suid and sgid bits.
+ NODEV = 0x000004 # Disallow access to device special files.
+ NOEXEC = 0x000008 # Disallow program execution.
+ RELATIME = 0x000000 # - Update atime relative to mtime/ctime.
+ NOATIME = 0x000010 # - Do not update access times.
+ STRICTATIME = 0x000020 # - Always perform atime updates
+ _ATIME = 0x000070 | NOATIME | STRICTATIME
+ # Setting on how atime should be updated.
+ NODIRATIME = 0x000080 # Do not update directory access times.
+ IDMAP = 0x100000 # Idmap mount to @userns_fd in struct mount_attr.
+ NOSYMFOLLOW = 0x200000 # Do not follow symlinks.
+
+ ALL_FLAGS = (
+ RDONLY
+ | NOSYMFOLLOW
+ | NODEV
+ | NOEXEC
+ | _ATIME
+ | NODIRATIME
+ | IDMAP
+ | NOSYMFOLLOW
+ )
+
+
+class MountAttr(ctypes.Structure):
+ """This value may be supplied to mount_setattr(2) as attr."""
+
+ _fields_ = [
+ ("attr_set", ctypes.c_ulonglong),
+ ("attr_clr", ctypes.c_ulonglong),
+ ("propagation", ctypes.c_ulonglong),
+ ("userns_fd", ctypes.c_ulonglong),
+ ]
+
+
+class MoveMountFlags(enum.IntFlag):
+ """This value may be supplied to move_mount(2) as flags."""
+
+ NONE = 0x00000000
+ F_SYMLINKS = 0x00000001 # Follow symlinks on from path
+ F_AUTOMOUNTS = 0x00000002 # Follow automounts on from path
+ F_EMPTY_PATH = 0x00000004 # Empty from path permitted
+ T_SYMLINKS = 0x00000010 # Follow symlinks on to path
+ T_AUTOMOUNTS = 0x00000020 # Follow automounts on to path
+ T_EMPTY_PATH = 0x00000040 # Empty to path permitted
+ SET_GROUP = 0x00000100 # Set sharing group instead
+ ALL_FLAGS = (
+ F_SYMLINKS
+ | F_AUTOMOUNTS
+ | F_EMPTY_PATH
+ | T_SYMLINKS
+ | T_AUTOMOUNTS
+ | T_EMPTY_PATH
+ | SET_GROUP
+ )
+
+
+class OpenTreeFlags(enum.IntFlag):
+ """This value may be supplied to open_tree(2) as flags."""
+
+ NONE = 0
+ OPEN_TREE_CLONE = 0x1
+ OPEN_TREE_CLOEXEC = os.O_CLOEXEC
+ AT_SYMLINK_NOFOLLOW = 0x100
+ AT_NO_AUTOMOUNT = 0x800
+ AT_EMPTY_PATH = 0x1000
+ AT_RECURSIVE = 0x8000
+ ALL_FLAGS = (
+ OPEN_TREE_CLONE
+ | OPEN_TREE_CLOEXEC
+ | AT_SYMLINK_NOFOLLOW
+ | AT_NO_AUTOMOUNT
+ | AT_EMPTY_PATH
+ | AT_RECURSIVE
+ )
+
+
+class UmountFlags(enum.IntFlag):
+ """This value may be supplied to umount2(2) as flags."""
+
+ NONE = 0
+ FORCE = 1
+ DETACH = 2
+ EXPIRE = 4
+ NOFOLLOW = 8
+ ALL_FLAGS = FORCE | DETACH | EXPIRE | NOFOLLOW
+
+
+def call_libc(funcname: str, *args: typing.Any) -> int:
+ """Call a function from the C library with given args. This assumes that
+ the function returns an integer that is non-negative on success. On
+ failure, an OSError with errno is raised.
+ """
+ ret: int = LIBC_SO[funcname](*args)
+ if ret < 0:
+ err = ctypes.get_errno()
+ raise OSError(
+ err, f"{funcname}() failed with error {err}: {os.strerror(err)}"
+ )
+ return ret
+
+
+@dataclasses.dataclass
+class CapabilitySets:
+ """Represent the main capability sets that capget/capset deal with."""
+
+ effective: int
+ permitted: int
+ inheritable: int
+
+ @staticmethod
+ def _create_header(pid: int) -> ctypes.Array[ctypes.c_uint32]:
+ return (ctypes.c_uint32 * 2)(
+ 0x20080522, # _LINUX_CAPABILITY_VERSION_3
+ pid,
+ )
+
+ @classmethod
+ def get(cls, pid: int = 0) -> "CapabilitySets":
+ """Call capget to retrieve the current capability sets."""
+ header = cls._create_header(pid)
+ data = (ctypes.c_uint32 * 6)()
+ call_libc("capget", ctypes.byref(header), ctypes.byref(data))
+ return cls(
+ (data[3] << 32) | data[0],
+ (data[4] << 32) | data[1],
+ (data[5] << 32) | data[2],
+ )
+
+ def set(self, pid: int = 0) -> None:
+ """Call capset to set the capabilities."""
+ header = self._create_header(pid)
+ data = (ctypes.c_uint32 * 6)(
+ self.effective & 0xffffffff,
+ self.permitted & 0xffffffff,
+ self.inheritable & 0xffffffff,
+ self.effective >> 32,
+ self.permitted >> 32,
+ self.inheritable >> 32,
+ )
+ call_libc("capset", ctypes.byref(header), ctypes.byref(data))
+
+
+class EventFD:
+ """Represent a file decriptor returned from eventfd(2)."""
+
+ def __init__(
+ self, initval: int = 0, flags: EventFDFlags = EventFDFlags.NONE
+ ) -> None:
+ if flags & ~EventFDFlags.ALL_FLAGS:
+ raise ValueError("invalid flags for eventfd")
+ self.fd = call_libc("eventfd", initval, int(flags))
+
+ def read(self) -> int:
+ """Decrease the value of the eventfd using eventfd_read."""
+ if self.fd < 0:
+ raise ValueError("attempt to read from closed eventfd")
+ cvalue = ctypes.c_ulonglong()
+ call_libc("eventfd_read", self.fd, ctypes.byref(cvalue))
+ return cvalue.value
+
+ def write(self, value: int = 1) -> None:
+ """Add the given value to the eventfd using eventfd_write."""
+ if self.fd < 0:
+ raise ValueError("attempt to read from closed eventfd")
+ if value < 0 or (value >> 64):
+ raise ValueError("value for eventfd_write out of range")
+ call_libc("eventfd_write", self.fd, ctypes.c_ulonglong(value))
+
+ def fileno(self) -> int:
+ """Return the underlying file descriptor."""
+ return self.fd
+
+ def close(self) -> None:
+ """Close the underlying file descriptor."""
+ if self.fd >= 0:
+ try:
+ os.close(self.fd)
+ finally:
+ self.fd = -1
+
+ __del__ = close
+
+ def __bool__(self) -> bool:
+ """Return True unless the eventfd is closed."""
+ return self.fd >= 0
+
+ def __enter__(self) -> "EventFD":
+ """When used as a context manager, the EventFD is closed on scope exit.
+ """
+ return self
+
+ def __exit__(
+ self,
+ exc_type: typing.Any,
+ exc_value: typing.Any,
+ traceback: typing.Any,
+ ) -> None:
+ self.close()
+
+
+def mount(
+ source: PathConvertible,
+ target: PathConvertible,
+ filesystemtype: str | None,
+ flags: MountFlags = MountFlags.NONE,
+ data: str | None = None,
+) -> None:
+ """Python wrapper for mount(2)."""
+ if (flags & MountFlags.PROPAGATION_FLAGS).bit_count() > 1:
+ raise ValueError("invalid flags for mount")
+ if (
+ flags & MountFlags.PROPAGATION_FLAGS
+ and flags & ~(
+ MountFlags.PROPAGATION_FLAGS | MountFlags.REC | MountFlags.SILENT
+ )
+ ):
+ raise ValueError("invalid flags for mount")
+ call_libc(
+ "mount",
+ os.fsencode(source),
+ os.fsencode(target),
+ None if filesystemtype is None else os.fsencode(filesystemtype),
+ int(flags),
+ None if data is None else os.fsencode(data),
+ )
+
+
+def mount_setattr(
+ filesystem: AtLocationLike,
+ recursive: bool,
+ attr_set: MountAttrFlags = MountAttrFlags.NONE,
+ attr_clr: MountAttrFlags = MountAttrFlags.NONE,
+ propagation: int = 0,
+ userns_fd: int = -1,
+) -> None:
+ """Python wrapper for mount_setattr(2)."""
+ filesystem = AtLocation(filesystem)
+ flags = MountSetattrFlags.from_atflags(filesystem.flags)
+ if recursive:
+ flags |= MountSetattrFlags.AT_RECURSIVE
+ if attr_clr & MountAttrFlags.IDMAP:
+ raise ValueError("cannot clear the MOUNT_ATTR_IDMAP flag")
+ attr = MountAttr(attr_set, attr_clr, propagation, userns_fd)
+ call_libc(
+ "mount_setattr",
+ filesystem.fd,
+ os.fsencode(filesystem.location),
+ int(flags),
+ ctypes.byref(attr),
+ ctypes.sizeof(attr),
+ )
+
+
+def move_mount(
+ from_: AtLocationLike,
+ to: AtLocationLike,
+ flags: MoveMountFlags = MoveMountFlags.NONE,
+) -> None:
+ """Python wrapper for move_mount(2)."""
+ from_ = AtLocation(from_)
+ to = AtLocation(to)
+ if flags & ~MoveMountFlags.ALL_FLAGS:
+ raise ValueError("invalid flags for move_mount")
+ if from_.flags & AtFlags.AT_SYMLINK_NOFOLLOW:
+ flags &= ~MoveMountFlags.F_SYMLINKS
+ else:
+ flags |= MoveMountFlags.F_SYMLINKS
+ if from_.flags & AtFlags.AT_NO_AUTOMOUNT:
+ flags &= ~MoveMountFlags.F_AUTOMOUNTS
+ else:
+ flags |= MoveMountFlags.F_AUTOMOUNTS
+ if from_.flags & AtFlags.AT_EMPTY_PATH:
+ flags |= MoveMountFlags.F_EMPTY_PATH
+ else:
+ flags &= ~MoveMountFlags.F_EMPTY_PATH
+ if to.flags & AtFlags.AT_SYMLINK_NOFOLLOW:
+ flags &= ~MoveMountFlags.T_SYMLINKS
+ else:
+ flags |= MoveMountFlags.T_SYMLINKS
+ if to.flags & AtFlags.AT_NO_AUTOMOUNT:
+ flags &= ~MoveMountFlags.T_AUTOMOUNTS
+ else:
+ flags |= MoveMountFlags.T_AUTOMOUNTS
+ if to.flags & AtFlags.AT_EMPTY_PATH:
+ flags |= MoveMountFlags.T_EMPTY_PATH
+ else:
+ flags &= ~MoveMountFlags.T_EMPTY_PATH
+ call_libc(
+ "move_mount",
+ from_.fd,
+ os.fsencode(from_.location),
+ to.fd,
+ os.fsencode(to.location),
+ int(flags),
+ )
+
+
+def open_tree(
+ source: AtLocationLike, flags: OpenTreeFlags = OpenTreeFlags.NONE
+) -> AtLocation:
+ """Python wrapper for open_tree(2)."""
+ source = AtLocation(source)
+ if flags & ~OpenTreeFlags.ALL_FLAGS:
+ raise ValueError("invalid flags for open_tree")
+ if (
+ flags & OpenTreeFlags.AT_RECURSIVE
+ and not flags & OpenTreeFlags.OPEN_TREE_CLONE
+ ):
+ raise ValueError("invalid flags for open_tree")
+ if source.flags & AtFlags.AT_SYMLINK_NOFOLLOW:
+ flags |= OpenTreeFlags.AT_SYMLINK_NOFOLLOW
+ else:
+ flags &= ~OpenTreeFlags.AT_SYMLINK_NOFOLLOW
+ if source.flags & AtFlags.AT_NO_AUTOMOUNT:
+ flags |= OpenTreeFlags.AT_NO_AUTOMOUNT
+ else:
+ flags &= ~OpenTreeFlags.AT_NO_AUTOMOUNT
+ if source.flags & AtFlags.AT_EMPTY_PATH:
+ flags |= OpenTreeFlags.AT_EMPTY_PATH
+ else:
+ flags &= ~OpenTreeFlags.AT_EMPTY_PATH
+ return AtLocation(
+ call_libc(
+ "open_tree", source.fd, os.fsencode(source.location), int(flags)
+ )
+ )
+
+
+def pivot_root(new_root: PathConvertible, put_old: PathConvertible) -> None:
+ """Python wrapper for pivot_root(2)."""
+ call_libc("pivot_root", os.fsencode(new_root), os.fsencode(put_old))
+
+
+def setns(fd: int, nstype: CloneFlags = CloneFlags.NONE) -> None:
+ """Python wrapper for setns(2)."""
+ if fd < 0:
+ raise ValueError("invalid file descriptor")
+ if nstype & ~CloneFlags.NS_FLAGS != 0:
+ raise ValueError("invalid nstype for setns")
+ call_libc("setns", fd, int(nstype))
+
+
+def umount(
+ path: PathConvertible, flags: UmountFlags = UmountFlags.NONE
+) -> None:
+ """Python wrapper for umount(2)."""
+ if flags & ~UmountFlags.ALL_FLAGS:
+ raise ValueError("umount flags out of range")
+ if flags & UmountFlags.EXPIRE and flags & (
+ UmountFlags.FORCE | UmountFlags.DETACH
+ ):
+ raise ValueError("invalid flags for umount")
+ call_libc("umount2", os.fsencode(path), int(flags))
+
+
+def unshare(flags: CloneFlags) -> None:
+ """Python wrapper for unshare(2)."""
+ if flags & ~CloneFlags.UNSHARE_FLAGS:
+ raise ValueError("invalid flags for unshare")
+ call_libc("unshare", int(flags))