summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHelmut Grohne <helmut@subdivi.de>2024-01-18 22:13:03 +0100
committerHelmut Grohne <helmut@subdivi.de>2024-01-18 22:13:03 +0100
commit034f732a1af4ce295d993e6951decc4898967dd3 (patch)
tree5a49cc7b5f5db586ae67f5276071ab525ef832f3
downloadpython-linuxnamespaces-034f732a1af4ce295d993e6951decc4898967dd3.tar.gz
initial checkin
-rw-r--r--.gitignore3
-rw-r--r--README.md10
-rw-r--r--conftest.py0
-rw-r--r--linuxnamespaces/__init__.py333
-rw-r--r--linuxnamespaces/atlocation.py362
-rw-r--r--linuxnamespaces/syscalls.py504
-rw-r--r--tests/test_simple.py164
7 files changed, 1376 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..a2eeca3
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,3 @@
+__pycache__
+.mypy_cache
+.pytest_cache
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..b215cd6
--- /dev/null
+++ b/README.md
@@ -0,0 +1,10 @@
+linuxnamespaces
+===============
+
+This is a plumbing-level Python module for working with Linux namespaces via
+ctypes. It leverages glibc wrappers to access the relevant system calls and
+provides typed abstractions for them.
+
+License
+-------
+GPL-3
diff --git a/conftest.py b/conftest.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/conftest.py
diff --git a/linuxnamespaces/__init__.py b/linuxnamespaces/__init__.py
new file mode 100644
index 0000000..29d41f6
--- /dev/null
+++ b/linuxnamespaces/__init__.py
@@ -0,0 +1,333 @@
+# Copyright 2024 Helmut Grohne <helmut@subdivi.de>
+# SPDX-License-Identifier: GPL-3
+
+"""Provide plumbing-layer functionality for working with Linux namespaces in
+Python.
+"""
+
+import bisect
+import dataclasses
+import os
+import pathlib
+import stat
+import subprocess
+import sys
+import typing
+
+from .atlocation import *
+from .syscalls import *
+
+
+def subidranges(
+ kind: typing.Literal["uid", "gid"], login: str | None = None
+) -> typing.Iterator[tuple[int, int]]:
+ """Parse a `/etc/sub?id` file for ranges allocated to the given or current
+ user. Return all ranges as (start, count) pairs.
+ """
+ if login is None:
+ login = os.getlogin()
+ with open(f"/etc/sub{kind}") as filelike:
+ for line in filelike:
+ parts = line.strip().split(":")
+ if parts[0] == login:
+ yield (int(parts[1]), int(parts[2]))
+
+
+@dataclasses.dataclass(frozen=True)
+class IDMapping:
+ """Represent one range in a user or goup id mapping."""
+
+ innerstart: int
+ outerstart: int
+ count: int
+
+ def __post_init__(self) -> None:
+ if self.outerstart < 0:
+ raise ValueError("outerstart must not be negative")
+ if self.innerstart < 0:
+ raise ValueError("innerstart must not be negative")
+ if self.count <= 0:
+ raise ValueError("count must be positive")
+ if self.outerstart + self.count >= 1 << 64:
+ raise ValueError("outerstart + count exceed 64bits")
+ if self.innerstart + self.count >= 1 << 64:
+ raise ValueError("innerstart + count exceed 64bits")
+
+
+class IDAllocation:
+ """This represents a subset of IDs (user or group). It can be used to
+ allocate a continguous range for use with a user namespace.
+ """
+
+ def __init__(self) -> None:
+ self.ranges: list[tuple[int, int]] = []
+
+ def add_range(self, start: int, count: int) -> None:
+ """Add count ids starting from start to this allocation."""
+ if start < 0 or count <= 0:
+ raise ValueError("invalid range")
+ index = bisect.bisect_right(self.ranges, (start, 0))
+ prevrange = None
+ if index > 0:
+ prevrange = self.ranges[index - 1]
+ if prevrange[0] + prevrange[1] > start:
+ raise ValueError("attempt to add overlapping range")
+ nextrange = None
+ if index < len(self.ranges):
+ nextrange = self.ranges[index]
+ if nextrange[0] < start + count:
+ raise ValueError("attempt to add overlapping range")
+ if prevrange and prevrange[0] + prevrange[1] == start:
+ if nextrange and nextrange[0] == start + count:
+ self.ranges[index - 1] = (
+ prevrange[0],
+ prevrange[1] + count + nextrange[1],
+ )
+ del self.ranges[index]
+ else:
+ self.ranges[index - 1] = (prevrange[0], prevrange[1] + count)
+ elif nextrange and nextrange[0] == start + count:
+ self.ranges[index] = (start, count + nextrange[1])
+ else:
+ self.ranges.insert(index, (start, count))
+
+ @classmethod
+ def loadsubid(
+ cls, kind: typing.Literal["uid", "gid"], login: str | None = None,
+ ) -> "IDAllocation":
+ """Load a `/etc/sub?id` file and return ids allocated to the given
+ login or current user.
+ """
+ self = cls()
+ for start, count in subidranges(kind, login):
+ self.add_range(start, count)
+ return self
+
+ def find(self, count: int) -> int:
+ """Locate count continguous ids from this allocation. The start of
+ the allocation is returned. The allocation object is left unchanged.
+ """
+ for start, available in self.ranges:
+ if available >= count:
+ return start
+ raise ValueError("could not satisfy allocation request")
+
+ def allocate(self, count: int) -> int:
+ """Allocate count contiguous ids from this allocation. The start of
+ the allocation is returned and the ids are removed from this
+ IDAllocation object.
+ """
+ for index, (start, available) in enumerate(self.ranges):
+ if available > count:
+ self.ranges[index] = (start + count, available - count)
+ return start
+ if available == count:
+ del self.ranges[index]
+ return start
+ raise ValueError("could not satisfy allocation request")
+
+ def allocatemap(self, count: int, target: int) -> IDMapping:
+ """Allocate count contiguous ids from this allocation. An IDMapping
+ with its innerstart set to target is returned. The allocation is
+ removed from this IDAllocation object.
+ """
+ return IDMapping(target, self.allocate(count), count)
+
+
+def newidmap(
+ kind: typing.Literal["uid", "gid"],
+ pid: int,
+ mapping: list[IDMapping],
+ helper: bool | None = None,
+) -> None:
+ """Apply the given uid or gid mapping to the given process. A positive pid
+ identifies a process, other values identify the currently running process.
+ Whether setuid binaries newuidmap and newgidmap are used is determined via
+ the helper argument. A None value indicate automatic detection of whether
+ a helper is required for setting up the given mapping.
+ """
+
+ assert kind in ("uid", "gid")
+ if pid <= 0:
+ pid = os.getpid()
+ if helper is None:
+ # We cannot reliably test whether we have the right EUID and we don't
+ # implement checking whether setgroups has been denied either. Please
+ # be explicit about the helper choice in such cases.
+ helper = len(mapping) > 1 or mapping[0].count > 1
+ if helper:
+ argv = [f"new{kind}map", str(pid)]
+ for idblock in mapping:
+ argv.extend(map(str, dataclasses.astuple(idblock)))
+ subprocess.check_call(argv)
+ else:
+ pathlib.Path(f"/proc/{pid}/{kind}_map").write_text(
+ "".join(
+ "%d %d %d\n" % dataclasses.astuple(idblock)
+ for idblock in mapping
+ ),
+ encoding="ascii",
+ )
+
+
+def newuidmap(pid: int, mapping: list[IDMapping], helper: bool = True) -> None:
+ """Apply a given uid mapping to the given process. Refer to newidmap for
+ details.
+ """
+ newidmap("uid", pid, mapping, helper)
+
+
+def newgidmap(pid: int, mapping: list[IDMapping], helper: bool = True) -> None:
+ """Apply a given gid mapping to the given process. Refer to newidmap for
+ details.
+ """
+ newidmap("gid", pid, mapping, helper)
+
+
+def newidmaps(
+ pid: int,
+ uidmapping: list[IDMapping],
+ gidmapping: list[IDMapping],
+ helper: bool = True,
+) -> None:
+ """Appply a given uid and gid mapping to the given process. Refer to
+ newidmap for details.
+ """
+ newgidmap(pid, gidmapping, helper)
+ newuidmap(pid, uidmapping, helper)
+
+
+class run_in_fork:
+ """Decorator for running the decorated function once in a separate process.
+ """
+ def __init__(self, function: typing.Callable[[], None]):
+ """Fork a new process that will eventually run the given function and
+ then exit.
+ """
+ self.efd = EventFD()
+ self.pid = os.fork()
+ if self.pid == 0:
+ self.efd.read()
+ self.efd.close()
+ function()
+ sys.exit(0)
+
+ def start(self) -> None:
+ """Start the decorated function. It can only be started once."""
+ if not self.efd:
+ raise ValueError("this function can only be called once")
+ self.efd.write(1)
+ self.efd.close()
+
+ def wait(self) -> None:
+ """Wait for the process running the decorated function to finish."""
+ if self.efd:
+ raise ValueError("start must be called before wait")
+ ret = os.waitpid(self.pid, 0)
+ if ret != (self.pid, 0):
+ raise ValueError("something failed")
+
+ def __call__(self) -> None:
+ """Start the decorated function and wait for its process to finish."""
+ self.start()
+ self.wait()
+
+
+def bind_mount(
+ source: AtLocationLike,
+ target: AtLocationLike,
+ recursive: bool = False,
+ readonly: bool = False,
+) -> None:
+ """Create a bind mount from source to target. Depending on whether one of
+ the locations involves a file descriptor or not, the new or old mount API
+ will be used.
+ """
+ source = AtLocation(source)
+ target = AtLocation(target)
+ try:
+ # mypy does not know that os.fspath accepts AtLocation
+ srcloc: str | bytes
+ srcloc = os.fspath(source) # type: ignore
+ tgtloc: str | bytes
+ tgtloc = os.fspath(target) # type: ignore
+ except ValueError:
+ otflags = OpenTreeFlags.OPEN_TREE_CLONE
+ if recursive:
+ otflags |= OpenTreeFlags.AT_RECURSIVE
+ with open_tree(source, otflags) as srcfd:
+ if readonly:
+ mount_setattr(srcfd, recursive, MountAttrFlags.RDONLY)
+ return move_mount(srcfd, target)
+ else:
+ mflags = MountFlags.BIND
+ if recursive:
+ mflags |= MountFlags.REC
+ if readonly:
+ mflags |= MountFlags.RDONLY
+ return mount(srcloc, tgtloc, None, mflags)
+
+
+def populate_dev(
+ origroot: AtLocationLike,
+ newroot: PathConvertible,
+ *,
+ fuse: bool = True,
+ pidns: bool = True,
+ tun: bool = True,
+) -> None:
+ """Mount a tmpfs to the dev directory beneath newroot and populate it with
+ basic devices by bind mounting them from the dev directory beneath
+ origroot. Also mount a new pts instance.
+ """
+ origdev = AtLocation(origroot) / "dev"
+ newdev = AtLocation(newroot) / "dev"
+ mount(
+ "devtmpfs",
+ newdev,
+ "tmpfs",
+ MountFlags.NOSUID | MountFlags.NOEXEC,
+ "mode=0755",
+ )
+ bind_devices = "null zero full random urandom tty".split()
+ bind_directories = []
+ if fuse:
+ bind_devices.append("fuse")
+ if pidns:
+ (newdev / "pts").mkdir()
+ mount(
+ "devpts",
+ newdev / "pts",
+ "devpts",
+ MountFlags.NOSUID | MountFlags.NOEXEC,
+ "gid=5,mode=620,ptmxmode=666",
+ )
+ (newdev / "ptmx").symlink("pts/ptmx")
+ else:
+ bind_devices.append("ptmx")
+ bind_directories.append("pts")
+ if tun:
+ (newdev / "net").mkdir()
+ bind_devices.append("net/tun")
+ for node in bind_devices:
+ (newdev / node).mknod(stat.S_IFREG)
+ bind_mount(origdev / node, newdev / node, True)
+ for node in bind_directories:
+ (newdev / node).mkdir()
+ bind_mount(origdev / node, newdev / node, True)
+
+
+def unshare_user_idmap(
+ uidmap: list[IDMapping],
+ gidmap: list[IDMapping],
+ flags: CloneFlags = CloneFlags.NEWUSER,
+) -> None:
+ """Unshare the given namespaces (must include user) and set up the given
+ id mappings.
+ """
+ pid = os.getpid()
+ @run_in_fork
+ def setup_idmaps() -> None:
+ newidmaps(pid, uidmap, gidmap)
+ unshare(flags)
+ setup_idmaps()
diff --git a/linuxnamespaces/atlocation.py b/linuxnamespaces/atlocation.py
new file mode 100644
index 0000000..2c827a2
--- /dev/null
+++ b/linuxnamespaces/atlocation.py
@@ -0,0 +1,362 @@
+# Copyright 2024 Helmut Grohne <helmut@subdivi.de>
+# SPDX-License-Identifier: GPL-3
+
+"""Describe a location in the filesystem by a combination of a file descriptor
+and a file name each of which can be optional. Many Linux system calls are able
+to work with a location described in this way and this module provides support
+code for doing so.
+"""
+
+import enum
+import os
+import os.path
+import pathlib
+import typing
+
+
+AT_FDCWD = -100
+
+
+PathConvertible = typing.Union[bytes, str, os.PathLike]
+
+
+class AtFlags(enum.IntFlag):
+ """Linux AT_* flags used with many different syscalls."""
+
+ NONE = 0
+ AT_SYMLINK_NOFOLLOW = 0x100
+ AT_NO_AUTOMOUNT = 0x800
+ AT_EMPTY_PATH = 0x1000
+
+
+class AtLocation:
+ """Represent a location in the filesystem suitable for use with the
+ at-family of syscalls. If flags has the AT_EMPTY_PATH bit set, the
+ location string must be empty and the file descriptor specifies the
+ filesystem object. Otherwise, the location specifies the filesystem object.
+ If it is relative, it the anchor is the file descriptor or the current
+ working directory if the file descriptor is AT_FDCWD.
+ """
+
+ fd: int
+ location: PathConvertible
+ flags: AtFlags
+
+ def __new__(
+ cls,
+ thing: typing.Union["AtLocation", int, PathConvertible],
+ location: PathConvertible | None = None,
+ flags: AtFlags = AtFlags.NONE,
+ ) -> "AtLocation":
+ """The argument thing can be many different thing. If it is an
+ AtLocation, it is copied and all other arguments must be unset. If it
+ is an integer, it is considered to be a file descriptor and the
+ location must be unset if flags contains AT_EMPTY_PATH. flags are used
+ as is except that AT_EMPTY_PATH is automatically added when given a
+ file descriptor and no location.
+ """
+ if isinstance(thing, AtLocation):
+ if location is not None or flags != AtFlags.NONE:
+ raise ValueError(
+ "cannot override location or flags for an AtLocation"
+ )
+ return thing # Don't copy.
+ obj = super(AtLocation, cls).__new__(cls)
+ if isinstance(thing, int):
+ if thing < 0 and thing != AT_FDCWD:
+ raise ValueError("fd cannot be negative")
+ obj.fd = thing
+ if location is None:
+ obj.location = ""
+ obj.flags = flags | AtFlags.AT_EMPTY_PATH
+ elif flags & AtFlags.AT_EMPTY_PATH:
+ raise ValueError(
+ "cannot set AT_EMPTY_PATH with a non-empty location"
+ )
+ else:
+ obj.location = location
+ obj.flags = flags
+ elif location is not None:
+ raise ValueError("location specified twice")
+ else:
+ obj.fd = AT_FDCWD
+ obj.location = thing
+ obj.flags = flags
+ return obj
+
+ def close(self) -> None:
+ """Close the underlying file descriptor."""
+ if self.fd >= 0:
+ os.close(self.fd)
+ self.fd = AT_FDCWD
+
+ def nosymfollow(self) -> "AtLocation":
+ """Return a copy with the AT_SYMLINK_NOFOLLOW set."""
+ return AtLocation(
+ self.fd, self.location, self.flags | AtFlags.AT_SYMLINK_NOFOLLOW
+ )
+
+ def symfollow(self) -> "AtLocation":
+ """Return a copy with AT_SYMLINK_NOFOLLOW cleared."""
+ return AtLocation(
+ self.fd, self.location, self.flags & ~AtFlags.AT_SYMLINK_NOFOLLOW
+ )
+
+ def noautomount(self) -> "AtLocation":
+ """Return a copy with AT_NO_AUTOMOUNT set."""
+ return AtLocation(
+ self.fd, self.location, self.flags | AtFlags.AT_NO_AUTOMOUNT
+ )
+
+ def automount(self) -> "AtLocation":
+ """Return a copy with AT_NO_AUTOMOUNT cleared."""
+ return AtLocation(
+ self.fd, self.location, self.flags & ~AtFlags.AT_NO_AUTOMOUNT
+ )
+
+ def joinpath(self, name: PathConvertible) -> "AtLocation":
+ """Combine an AtLocation and a path by doing the equivalent of joining
+ them with a slash as separator.
+ """
+ if self.flags & AtFlags.AT_EMPTY_PATH:
+ return AtLocation(
+ self.fd, name, self.flags & ~AtFlags.AT_EMPTY_PATH
+ )
+ if not self.location:
+ return AtLocation(self.fd, name, self.flags)
+ if isinstance(self.location, bytes) or isinstance(name, bytes):
+ return AtLocation(
+ self.fd,
+ os.path.join(os.fsencode(self.location), os.fsencode(name)),
+ self.flags,
+ )
+ return AtLocation(
+ self.fd, pathlib.Path(self.location).joinpath(name), self.flags
+ )
+
+ def __truediv__(self, name: PathConvertible) -> "AtLocation":
+ return self.joinpath(name)
+
+ def fileno(self) -> int:
+ """Return the underlying file descriptor if this is an AT_EMPTY_PATH
+ location and raise a ValueError otherwise.
+ """
+ if self.flags != AtFlags.AT_EMPTY_PATH:
+ raise ValueError("AtLocation is not simply a file descriptor")
+ assert self.fd >= 0
+ assert not self.location
+ return self.fd
+
+ @property
+ def fd_or_none(self) -> int | None:
+ """A variant of the fd attribute that replaces AT_FDCWD with None."""
+ return None if self.fd == AT_FDCWD else self.fd
+
+ def access(self, mode: int, *, effective_ids: bool = False) -> bool:
+ """Wrapper for os.access supplying path, dir_fd and follow_symlinks."""
+ if self.flags == AtFlags.AT_SYMLINK_NOFOLLOW:
+ follow_symlinks = False
+ elif self.flags == AtFlags.NONE:
+ follow_symlinks = True
+ else:
+ raise NotImplementedError(
+ "access on AtLocation only supports flag AT_SYMLINK_NOFOLLOW"
+ )
+ assert self.location
+ return os.access(
+ self.location,
+ mode,
+ dir_fd=self.fd_or_none,
+ effective_ids=effective_ids,
+ follow_symlinks=follow_symlinks,
+ )
+
+ def chdir(self) -> None:
+ """Wrapper for os.chdir or os.fchdir."""
+ if self.flags == AtFlags.AT_EMPTY_PATH:
+ return os.fchdir(self.fd)
+ if self.flags != AtFlags.NONE:
+ raise NotImplementedError(
+ "chdir on AtLocation only supports flag AT_EMPTY_PATH"
+ )
+ assert self.location
+ return os.chdir(self.location)
+
+ def chmod(self, mode: int) -> None:
+ """Wrapper for os.chmod or os.fchmod."""
+ if self.flags == AtFlags.AT_EMPTY_PATH:
+ return os.fchmod(self.fd, mode)
+ if self.flags == AtFlags.AT_SYMLINK_NOFOLLOW:
+ follow_symlinks = False
+ elif self.flags == AtFlags.NONE:
+ follow_symlinks = True
+ else:
+ raise NotImplementedError(
+ "chmod on AtLocation with unsupported flags"
+ )
+ assert self.location
+ return os.chmod(
+ self.location,
+ mode,
+ dir_fd=self.fd_or_none,
+ follow_symlinks=follow_symlinks,
+ )
+
+ def chown(self, uid: int, gid: int) -> None:
+ """Wrapper for os.chown or os.chown."""
+ if self.flags == AtFlags.AT_EMPTY_PATH:
+ return os.fchown(self.fd, uid, gid)
+ if self.flags == AtFlags.AT_SYMLINK_NOFOLLOW:
+ follow_symlinks = False
+ elif self.flags == AtFlags.NONE:
+ follow_symlinks = True
+ else:
+ raise NotImplementedError(
+ "chmod on AtLocation with unsupported flags"
+ )
+ assert self.location
+ return os.chown(
+ self.location,
+ uid,
+ gid,
+ dir_fd=self.fd_or_none,
+ follow_symlinks=follow_symlinks,
+ )
+
+ def mkdir(self, mode: int = 0o777) -> None:
+ """Wrapper for os.mkdir supplying path and dir_fd."""
+ if self.flags != AtFlags.NONE:
+ raise NotImplementedError(
+ "mkdir is not supported for an AtLocation with flags"
+ )
+ assert self.location
+ os.mkdir(self.location, mode, dir_fd=self.fd_or_none)
+
+ def mknod(self, mode: int = 0o600, device: int = 0) -> None:
+ """Wrapper for os.mknod supplying path and dir_fd."""
+ if self.flags != AtFlags.NONE:
+ raise NotImplementedError(
+ "mknod is not supported for an AtLocation with flags"
+ )
+ assert self.location
+ os.mknod(self.location, mode, device, dir_fd=self.fd_or_none)
+
+ def open(self, flags: int, mode: int = 0o777) -> int:
+ """Wrapper for os.open supplying path and dir_fd."""
+ if self.flags == AtFlags.AT_SYMLINK_NOFOLLOW:
+ flags |= os.O_NOFOLLOW
+ elif self.flags != AtFlags.NONE:
+ raise NotImplementedError(
+ "opening an AtLocation only supports flag AT_SYMLINK_NOFOLLOW"
+ )
+ assert self.location
+ return os.open(self.location, flags, mode, dir_fd=self.fd_or_none)
+
+ def readlink(self) -> str:
+ """Wrapper for os.readlink supplying path and dir_fd."""
+ if self.flags & ~AtFlags.AT_EMPTY_PATH != AtFlags.NONE:
+ raise NotImplementedError(
+ "readlink on AtLocation only support flag AT_EMPTY_PATH"
+ )
+ return os.fsdecode(
+ os.readlink(os.fspath(self.location), dir_fd=self.fd_or_none)
+ )
+
+ def rmdir(self) -> None:
+ """Wrapper for os.rmdir suppling path and dir_fd."""
+ if self.flags != AtFlags.NONE:
+ raise NotImplementedError(
+ "rmdir is not supported for an AtLocation with flags"
+ )
+ assert self.location
+ return os.rmdir(self.location, dir_fd=self.fd_or_none)
+
+ def symlink(self, linktarget: PathConvertible) -> None:
+ """Create a symlink at self pointing to linktarget. Note that this
+ method has its arguments reversed compared to the usual os.symlink,
+ because the dir_fd is applicable to the second argument there.
+ """
+ if self.flags != AtFlags.NONE:
+ raise NotImplementedError(
+ "symlink is not supported for an AtLocation with flags"
+ )
+ assert self.location
+ os.symlink(linktarget, self.location, dir_fd=self.fd_or_none)
+
+ def unlink(self) -> None:
+ """Wrapper for os.unlink suppling path and dir_fd."""
+ if self.flags != AtFlags.NONE:
+ raise NotImplementedError(
+ "unlink is not supported for an AtLocation with flags"
+ )
+ assert self.location
+ return os.unlink(self.location, dir_fd=self.fd_or_none)
+
+ def walk(
+ self,
+ topdown: bool = True,
+ onerror: typing.Callable[[OSError], typing.Any] | None = None,
+ follow_symlinks: bool = False,
+ ) -> typing.Iterator[
+ tuple[
+ "AtLocation", list["AtLocation"], list["AtLocation"], "AtLocation",
+ ]
+ ]:
+ """Resemble os.fwalk with a few differences. The returned iterator
+ yields the dirpath as an AtLocation that borrows the fd from self. The
+ dirnames and filenames become AtLocations whose location is the entry
+ name and whose fd is temporary. Finally, the dirfd also becomes an
+ AtLocations referencing the same object as the dirpath though as an
+ AT_EMPTY_PATH with temporary fd.
+ """
+ if self.flags != AtFlags.NONE:
+ raise NotImplementedError(
+ "walk is not supported for an AtLocation with flags"
+ )
+ for dirpath, dirnames, filenames, dirfd in os.fwalk(
+ self.location,
+ topdown=topdown,
+ onerror=onerror,
+ follow_symlinks=follow_symlinks,
+ dir_fd=self.fd_or_none,
+ ):
+ yield (
+ AtLocation(self.fd, dirpath),
+ [AtLocation(dirfd, dirname) for dirname in dirnames],
+ [AtLocation(dirfd, filename) for filename in filenames],
+ AtLocation(dirfd),
+ )
+
+ def __enter__(self) -> "AtLocation":
+ """When used as a context manager, the associated fd will be closed on
+ scope exit.
+ """
+ return self
+
+ def __exit__(
+ self,
+ exc_type: typing.Any,
+ exc_value: typing.Any,
+ traceback: typing.Any,
+ ) -> None:
+ """When used as a context manager, the associated fd will be closed on
+ scope exit.
+ """
+ self.close()
+
+ def __fspath__(self) -> str | bytes:
+ """Return the underlying location if it uniquely defines this object.
+ Otherwise raise a ValueError.
+ """
+ if self.fd != AT_FDCWD:
+ raise ValueError(
+ "AtLocation with fd is not convertible to plain path"
+ )
+ if self.flags != AtFlags.NONE:
+ raise ValueError(
+ "AtLocation with flags is not convertible to plain path"
+ )
+ return os.fspath(self.location)
+
+
+AtLocationLike = typing.Union[AtLocation, int, PathConvertible]
diff --git a/linuxnamespaces/syscalls.py b/linuxnamespaces/syscalls.py
new file mode 100644
index 0000000..0e33a44
--- /dev/null
+++ b/linuxnamespaces/syscalls.py
@@ -0,0 +1,504 @@
+# Copyright 2024 Helmut Grohne <helmut@subdivi.de>
+# SPDX-License-Identifier: GPL-3
+
+"""Provide typed Python functions for a number of Linux system calls relevant
+for Linux namespaces including the new mount API.
+"""
+
+import ctypes
+import dataclasses
+import enum
+import os
+import typing
+
+from .atlocation import AtFlags, AtLocation, AtLocationLike, PathConvertible
+
+
+LIBC_SO = ctypes.CDLL("libc.so.6", use_errno=True)
+
+
+class CloneFlags(enum.IntFlag):
+ """This value may be supplied to
+ * unshare(2) flags
+ * clone3(2) clone_args.flags
+ * setns(2) nstype
+ """
+
+ NONE = 0x00000000
+ NEWTIME = 0x00000080
+ VM = 0x00000100
+ FS = 0x00000200
+ FILES = 0x00000400
+ SIGHAND = 0x00000800
+ PIDFD = 0x00001000
+ PTRACE = 0x00002000
+ VFORK = 0x00004000
+ PARENT = 0x00008000
+ THREAD = 0x00010000
+ NEWNS = 0x00020000
+ SYSVSEM = 0x00040000
+ SETTLS = 0x00080000
+ PARENT_SETTID = 0x00100000
+ CHILD_CLEARTID = 0x00200000
+ DETACHED = 0x00400000
+ UNTRACED = 0x00800000
+ CHILD_SETTID = 0x01000000
+ NEWCGROUP = 0x02000000
+ NEWUTS = 0x04000000
+ NEWIPC = 0x08000000
+ NEWUSER = 0x10000000
+ NEWPID = 0x20000000
+ NEWNET = 0x40000000
+ IO = 0x80000000
+ NS_FLAGS = (
+ NEWCGROUP
+ | NEWIPC
+ | NEWNET
+ | NEWNS
+ | NEWPID
+ | NEWTIME
+ | NEWUSER
+ | NEWUTS
+ )
+ UNSHARE_FLAGS = NS_FLAGS | FILES | FS | SYSVSEM
+
+
+class EventFDFlags(enum.IntFlag):
+ """This value may be supplied as flags to eventfd(2)."""
+
+ NONE = 0
+ CLOEXEC = 0o2000000
+ NONBLOCK = 0o4000
+ SEMAPHORE = 0o1
+ ALL_FLAGS = CLOEXEC | NONBLOCK | SEMAPHORE
+
+
+class MountFlags(enum.IntFlag):
+ """This value may be supplied as mountflags to mount(2)."""
+
+ NONE = 0
+ RDONLY = 1 << 0
+ NOSUID = 1 << 1
+ NODEV = 1 << 2
+ NOEXEC = 1 << 3
+ SYNCHRONOUS = 1 << 4
+ REMOUNT = 1 << 5
+ MANDLOCK = 1 << 6
+ DIRSYNC = 1 << 7
+ NOSYMFOLLOW = 1 << 8
+ # Bit 9 vanished
+ NOATIME = 1 << 10
+ NODIRATIME = 1 << 11
+ BIND = 1 << 12
+ MOVE = 1 << 13
+ REC = 1 << 14
+ SILENT = 1 << 15
+ POSIXACL = 1 << 16
+ UNBINDABLE = 1 << 17
+ PRIVATE = 1 << 18
+ SLAVE = 1 << 19
+ SHARED = 1 << 20
+ RELATIME = 1 << 21
+ KERNMOUNT = 1 << 22
+ I_VERSION = 1 << 23
+ STRICTATIME = 1 << 24
+ LAZYTIME = 1 << 25
+ SUBMOUNT = 1 << 26
+ NOREMOTELOCK = 1 << 27
+ NOSEC = 1 << 28
+ BORN = 1 << 29
+ ACTIVE = 1 << 30
+ NOUSER = 1 << 31
+
+ PROPAGATION_FLAGS = UNBINDABLE | PRIVATE | SLAVE | SHARED
+
+
+class MountSetattrFlags(enum.IntFlag):
+ """This value may be supplied as flags to mount_setattr(2)."""
+
+ NONE = 0
+ AT_SYMLINK_NOFOLLOW = 0x100
+ AT_NO_AUTOMOUNT = 0x800
+ AT_EMPTY_PATH = 0x1000
+ AT_RECURSIVE = 0x8000
+
+ @staticmethod
+ def from_atflags(flags: AtFlags) -> "MountSetattrFlags":
+ ret = MountSetattrFlags.NONE
+ if flags & AtFlags.AT_SYMLINK_NOFOLLOW:
+ ret |= MountSetattrFlags.AT_SYMLINK_NOFOLLOW
+ if flags & AtFlags.AT_NO_AUTOMOUNT:
+ ret |= MountSetattrFlags.AT_NO_AUTOMOUNT
+ if flags & AtFlags.AT_EMPTY_PATH:
+ ret |= MountSetattrFlags.AT_EMPTY_PATH
+ return ret
+
+
+class MountAttrFlags(enum.IntFlag):
+ """This value may be supplied as attr->attr_set or attr->attr_clr to
+ mount_setattr(2).
+ """
+
+ NONE = 0x000000
+ RDONLY = 0x000001 # Mount read-only.
+ NOSUID = 0x000002 # Ignore suid and sgid bits.
+ NODEV = 0x000004 # Disallow access to device special files.
+ NOEXEC = 0x000008 # Disallow program execution.
+ RELATIME = 0x000000 # - Update atime relative to mtime/ctime.
+ NOATIME = 0x000010 # - Do not update access times.
+ STRICTATIME = 0x000020 # - Always perform atime updates
+ _ATIME = 0x000070 | NOATIME | STRICTATIME
+ # Setting on how atime should be updated.
+ NODIRATIME = 0x000080 # Do not update directory access times.
+ IDMAP = 0x100000 # Idmap mount to @userns_fd in struct mount_attr.
+ NOSYMFOLLOW = 0x200000 # Do not follow symlinks.
+
+ ALL_FLAGS = (
+ RDONLY
+ | NOSYMFOLLOW
+ | NODEV
+ | NOEXEC
+ | _ATIME
+ | NODIRATIME
+ | IDMAP
+ | NOSYMFOLLOW
+ )
+
+
+class MountAttr(ctypes.Structure):
+ """This value may be supplied to mount_setattr(2) as attr."""
+
+ _fields_ = [
+ ("attr_set", ctypes.c_ulonglong),
+ ("attr_clr", ctypes.c_ulonglong),
+ ("propagation", ctypes.c_ulonglong),
+ ("userns_fd", ctypes.c_ulonglong),
+ ]
+
+
+class MoveMountFlags(enum.IntFlag):
+ """This value may be supplied to move_mount(2) as flags."""
+
+ NONE = 0x00000000
+ F_SYMLINKS = 0x00000001 # Follow symlinks on from path
+ F_AUTOMOUNTS = 0x00000002 # Follow automounts on from path
+ F_EMPTY_PATH = 0x00000004 # Empty from path permitted
+ T_SYMLINKS = 0x00000010 # Follow symlinks on to path
+ T_AUTOMOUNTS = 0x00000020 # Follow automounts on to path
+ T_EMPTY_PATH = 0x00000040 # Empty to path permitted
+ SET_GROUP = 0x00000100 # Set sharing group instead
+ ALL_FLAGS = (
+ F_SYMLINKS
+ | F_AUTOMOUNTS
+ | F_EMPTY_PATH
+ | T_SYMLINKS
+ | T_AUTOMOUNTS
+ | T_EMPTY_PATH
+ | SET_GROUP
+ )
+
+
+class OpenTreeFlags(enum.IntFlag):
+ """This value may be supplied to open_tree(2) as flags."""
+
+ NONE = 0
+ OPEN_TREE_CLONE = 0x1
+ OPEN_TREE_CLOEXEC = os.O_CLOEXEC
+ AT_SYMLINK_NOFOLLOW = 0x100
+ AT_NO_AUTOMOUNT = 0x800
+ AT_EMPTY_PATH = 0x1000
+ AT_RECURSIVE = 0x8000
+ ALL_FLAGS = (
+ OPEN_TREE_CLONE
+ | OPEN_TREE_CLOEXEC
+ | AT_SYMLINK_NOFOLLOW
+ | AT_NO_AUTOMOUNT
+ | AT_EMPTY_PATH
+ | AT_RECURSIVE
+ )
+
+
+class UmountFlags(enum.IntFlag):
+ """This value may be supplied to umount2(2) as flags."""
+
+ NONE = 0
+ FORCE = 1
+ DETACH = 2
+ EXPIRE = 4
+ NOFOLLOW = 8
+ ALL_FLAGS = FORCE | DETACH | EXPIRE | NOFOLLOW
+
+
+def call_libc(funcname: str, *args: typing.Any) -> int:
+ """Call a function from the C library with given args. This assumes that
+ the function returns an integer that is non-negative on success. On
+ failure, an OSError with errno is raised.
+ """
+ ret: int = LIBC_SO[funcname](*args)
+ if ret < 0:
+ err = ctypes.get_errno()
+ raise OSError(
+ err, f"{funcname}() failed with error {err}: {os.strerror(err)}"
+ )
+ return ret
+
+
+@dataclasses.dataclass
+class CapabilitySets:
+ """Represent the main capability sets that capget/capset deal with."""
+
+ effective: int
+ permitted: int
+ inheritable: int
+
+ @staticmethod
+ def _create_header(pid: int) -> ctypes.Array[ctypes.c_uint32]:
+ return (ctypes.c_uint32 * 2)(
+ 0x20080522, # _LINUX_CAPABILITY_VERSION_3
+ pid,
+ )
+
+ @classmethod
+ def get(cls, pid: int = 0) -> "CapabilitySets":
+ """Call capget to retrieve the current capability sets."""
+ header = cls._create_header(pid)
+ data = (ctypes.c_uint32 * 6)()
+ call_libc("capget", ctypes.byref(header), ctypes.byref(data))
+ return cls(
+ (data[3] << 32) | data[0],
+ (data[4] << 32) | data[1],
+ (data[5] << 32) | data[2],
+ )
+
+ def set(self, pid: int = 0) -> None:
+ """Call capset to set the capabilities."""
+ header = self._create_header(pid)
+ data = (ctypes.c_uint32 * 6)(
+ self.effective & 0xffffffff,
+ self.permitted & 0xffffffff,
+ self.inheritable & 0xffffffff,
+ self.effective >> 32,
+ self.permitted >> 32,
+ self.inheritable >> 32,
+ )
+ call_libc("capset", ctypes.byref(header), ctypes.byref(data))
+
+
+class EventFD:
+ """Represent a file decriptor returned from eventfd(2)."""
+
+ def __init__(
+ self, initval: int = 0, flags: EventFDFlags = EventFDFlags.NONE
+ ) -> None:
+ if flags & ~EventFDFlags.ALL_FLAGS:
+ raise ValueError("invalid flags for eventfd")
+ self.fd = call_libc("eventfd", initval, int(flags))
+
+ def read(self) -> int:
+ """Decrease the value of the eventfd using eventfd_read."""
+ if self.fd < 0:
+ raise ValueError("attempt to read from closed eventfd")
+ cvalue = ctypes.c_ulonglong()
+ call_libc("eventfd_read", self.fd, ctypes.byref(cvalue))
+ return cvalue.value
+
+ def write(self, value: int = 1) -> None:
+ """Add the given value to the eventfd using eventfd_write."""
+ if self.fd < 0:
+ raise ValueError("attempt to read from closed eventfd")
+ if value < 0 or (value >> 64):
+ raise ValueError("value for eventfd_write out of range")
+ call_libc("eventfd_write", self.fd, ctypes.c_ulonglong(value))
+
+ def fileno(self) -> int:
+ """Return the underlying file descriptor."""
+ return self.fd
+
+ def close(self) -> None:
+ """Close the underlying file descriptor."""
+ if self.fd >= 0:
+ try:
+ os.close(self.fd)
+ finally:
+ self.fd = -1
+
+ __del__ = close
+
+ def __bool__(self) -> bool:
+ """Return True unless the eventfd is closed."""
+ return self.fd >= 0
+
+ def __enter__(self) -> "EventFD":
+ """When used as a context manager, the EventFD is closed on scope exit.
+ """
+ return self
+
+ def __exit__(
+ self,
+ exc_type: typing.Any,
+ exc_value: typing.Any,
+ traceback: typing.Any,
+ ) -> None:
+ self.close()
+
+
+def mount(
+ source: PathConvertible,
+ target: PathConvertible,
+ filesystemtype: str | None,
+ flags: MountFlags = MountFlags.NONE,
+ data: str | None = None,
+) -> None:
+ """Python wrapper for mount(2)."""
+ if (flags & MountFlags.PROPAGATION_FLAGS).bit_count() > 1:
+ raise ValueError("invalid flags for mount")
+ if (
+ flags & MountFlags.PROPAGATION_FLAGS
+ and flags & ~(
+ MountFlags.PROPAGATION_FLAGS | MountFlags.REC | MountFlags.SILENT
+ )
+ ):
+ raise ValueError("invalid flags for mount")
+ call_libc(
+ "mount",
+ os.fsencode(source),
+ os.fsencode(target),
+ None if filesystemtype is None else os.fsencode(filesystemtype),
+ int(flags),
+ None if data is None else os.fsencode(data),
+ )
+
+
+def mount_setattr(
+ filesystem: AtLocationLike,
+ recursive: bool,
+ attr_set: MountAttrFlags = MountAttrFlags.NONE,
+ attr_clr: MountAttrFlags = MountAttrFlags.NONE,
+ propagation: int = 0,
+ userns_fd: int = -1,
+) -> None:
+ """Python wrapper for mount_setattr(2)."""
+ filesystem = AtLocation(filesystem)
+ flags = MountSetattrFlags.from_atflags(filesystem.flags)
+ if recursive:
+ flags |= MountSetattrFlags.AT_RECURSIVE
+ if attr_clr & MountAttrFlags.IDMAP:
+ raise ValueError("cannot clear the MOUNT_ATTR_IDMAP flag")
+ attr = MountAttr(attr_set, attr_clr, propagation, userns_fd)
+ call_libc(
+ "mount_setattr",
+ filesystem.fd,
+ os.fsencode(filesystem.location),
+ int(flags),
+ ctypes.byref(attr),
+ ctypes.sizeof(attr),
+ )
+
+
+def move_mount(
+ from_: AtLocationLike,
+ to: AtLocationLike,
+ flags: MoveMountFlags = MoveMountFlags.NONE,
+) -> None:
+ """Python wrapper for move_mount(2)."""
+ from_ = AtLocation(from_)
+ to = AtLocation(to)
+ if flags & ~MoveMountFlags.ALL_FLAGS:
+ raise ValueError("invalid flags for move_mount")
+ if from_.flags & AtFlags.AT_SYMLINK_NOFOLLOW:
+ flags &= ~MoveMountFlags.F_SYMLINKS
+ else:
+ flags |= MoveMountFlags.F_SYMLINKS
+ if from_.flags & AtFlags.AT_NO_AUTOMOUNT:
+ flags &= ~MoveMountFlags.F_AUTOMOUNTS
+ else:
+ flags |= MoveMountFlags.F_AUTOMOUNTS
+ if from_.flags & AtFlags.AT_EMPTY_PATH:
+ flags |= MoveMountFlags.F_EMPTY_PATH
+ else:
+ flags &= ~MoveMountFlags.F_EMPTY_PATH
+ if to.flags & AtFlags.AT_SYMLINK_NOFOLLOW:
+ flags &= ~MoveMountFlags.T_SYMLINKS
+ else:
+ flags |= MoveMountFlags.T_SYMLINKS
+ if to.flags & AtFlags.AT_NO_AUTOMOUNT:
+ flags &= ~MoveMountFlags.T_AUTOMOUNTS
+ else:
+ flags |= MoveMountFlags.T_AUTOMOUNTS
+ if to.flags & AtFlags.AT_EMPTY_PATH:
+ flags |= MoveMountFlags.T_EMPTY_PATH
+ else:
+ flags &= ~MoveMountFlags.T_EMPTY_PATH
+ call_libc(
+ "move_mount",
+ from_.fd,
+ os.fsencode(from_.location),
+ to.fd,
+ os.fsencode(to.location),
+ int(flags),
+ )
+
+
+def open_tree(
+ source: AtLocationLike, flags: OpenTreeFlags = OpenTreeFlags.NONE
+) -> AtLocation:
+ """Python wrapper for open_tree(2)."""
+ source = AtLocation(source)
+ if flags & ~OpenTreeFlags.ALL_FLAGS:
+ raise ValueError("invalid flags for open_tree")
+ if (
+ flags & OpenTreeFlags.AT_RECURSIVE
+ and not flags & OpenTreeFlags.OPEN_TREE_CLONE
+ ):
+ raise ValueError("invalid flags for open_tree")
+ if source.flags & AtFlags.AT_SYMLINK_NOFOLLOW:
+ flags |= OpenTreeFlags.AT_SYMLINK_NOFOLLOW
+ else:
+ flags &= ~OpenTreeFlags.AT_SYMLINK_NOFOLLOW
+ if source.flags & AtFlags.AT_NO_AUTOMOUNT:
+ flags |= OpenTreeFlags.AT_NO_AUTOMOUNT
+ else:
+ flags &= ~OpenTreeFlags.AT_NO_AUTOMOUNT
+ if source.flags & AtFlags.AT_EMPTY_PATH:
+ flags |= OpenTreeFlags.AT_EMPTY_PATH
+ else:
+ flags &= ~OpenTreeFlags.AT_EMPTY_PATH
+ return AtLocation(
+ call_libc(
+ "open_tree", source.fd, os.fsencode(source.location), int(flags)
+ )
+ )
+
+
+def pivot_root(new_root: PathConvertible, put_old: PathConvertible) -> None:
+ """Python wrapper for pivot_root(2)."""
+ call_libc("pivot_root", os.fsencode(new_root), os.fsencode(put_old))
+
+
+def setns(fd: int, nstype: CloneFlags = CloneFlags.NONE) -> None:
+ """Python wrapper for setns(2)."""
+ if fd < 0:
+ raise ValueError("invalid file descriptor")
+ if nstype & ~CloneFlags.NS_FLAGS != 0:
+ raise ValueError("invalid nstype for setns")
+ call_libc("setns", fd, int(nstype))
+
+
+def umount(
+ path: PathConvertible, flags: UmountFlags = UmountFlags.NONE
+) -> None:
+ """Python wrapper for umount(2)."""
+ if flags & ~UmountFlags.ALL_FLAGS:
+ raise ValueError("umount flags out of range")
+ if flags & UmountFlags.EXPIRE and flags & (
+ UmountFlags.FORCE | UmountFlags.DETACH
+ ):
+ raise ValueError("invalid flags for umount")
+ call_libc("umount2", os.fsencode(path), int(flags))
+
+
+def unshare(flags: CloneFlags) -> None:
+ """Python wrapper for unshare(2)."""
+ if flags & ~CloneFlags.UNSHARE_FLAGS:
+ raise ValueError("invalid flags for unshare")
+ call_libc("unshare", int(flags))
diff --git a/tests/test_simple.py b/tests/test_simple.py
new file mode 100644
index 0000000..e0cb66e
--- /dev/null
+++ b/tests/test_simple.py
@@ -0,0 +1,164 @@
+# Copyright 2024 Helmut Grohne <helmut@subdivi.de>
+# SPDX-License-Identifier: GPL-3
+
+import functools
+import os
+import pathlib
+import socket
+import unittest
+
+import pytest
+
+import linuxnamespaces
+
+
+def allow_fork_exit(function):
+ @functools.wraps(function)
+ def wrapped(*args, **kwargs):
+ mainpid = os.getpid()
+ try:
+ return function(*args, **kwargs)
+ except SystemExit as sysexit:
+ if sysexit.code or os.getpid() == mainpid:
+ raise
+
+ # We're supposed to successfully exit from a child process. If we
+ # were to return or raise here, pytest would record success or
+ # failure. Instead we hide this process from pytest.
+ os._exit(0)
+ return pytest.mark.forked(wrapped)
+
+class IDAllocationTest(unittest.TestCase):
+ def test_idalloc(self) -> None:
+ alloc = linuxnamespaces.IDAllocation()
+ alloc.add_range(1, 2)
+ alloc.add_range(5, 4)
+ self.assertIn(alloc.find(3), (5, 6))
+ self.assertIn(alloc.allocate(3), (5, 6))
+ self.assertRaises(ValueError, alloc.find, 3)
+ self.assertRaises(ValueError, alloc.allocate, 3)
+ self.assertEqual(alloc.find(2), 1)
+
+ def test_merge(self) -> None:
+ alloc = linuxnamespaces.IDAllocation()
+ alloc.add_range(1, 2)
+ alloc.add_range(3, 2)
+ self.assertIn(alloc.allocate(3), (1, 2))
+
+
+class UnshareTest(unittest.TestCase):
+ @pytest.mark.forked
+ def test_unshare_user(self) -> None:
+ overflowuid = int(pathlib.Path("/proc/sys/fs/overflowuid").read_text())
+ idmap = linuxnamespaces.IDMapping(0, os.getuid(), 1)
+ linuxnamespaces.unshare(linuxnamespaces.CloneFlags.NEWUSER)
+ self.assertEqual(os.getuid(), overflowuid)
+ linuxnamespaces.newuidmap(-1, [idmap], False)
+ self.assertEqual(os.getuid(), 0)
+ # UID 1 is not mapped.
+ self.assertRaises(OSError, os.setuid, 1)
+
+ @allow_fork_exit
+ def test_mount_proc(self) -> None:
+ idmap = linuxnamespaces.IDMapping(0, os.getuid(), 1)
+ linuxnamespaces.unshare(
+ linuxnamespaces.CloneFlags.NEWUSER
+ | linuxnamespaces.CloneFlags.NEWNS
+ | linuxnamespaces.CloneFlags.NEWPID
+ )
+ linuxnamespaces.newuidmap(-1, [idmap], False)
+ @linuxnamespaces.run_in_fork
+ def setup() -> None:
+ self.assertEqual(os.getpid(), 1)
+ linuxnamespaces.mount("proc", "/proc", "proc")
+ setup()
+
+ @pytest.mark.forked
+ def test_sethostname(self) -> None:
+ self.assertRaises(socket.error, socket.sethostname, "example")
+ linuxnamespaces.unshare(
+ linuxnamespaces.CloneFlags.NEWUSER
+ | linuxnamespaces.CloneFlags.NEWUTS
+ )
+ socket.sethostname("example")
+
+ @pytest.mark.forked
+ def test_populate_dev(self) -> None:
+ uidmap = linuxnamespaces.IDMapping(0, os.getuid(), 1)
+ gidmap = linuxnamespaces.IDMapping(0, os.getgid(), 1)
+ linuxnamespaces.unshare(
+ linuxnamespaces.CloneFlags.NEWUSER
+ | linuxnamespaces.CloneFlags.NEWNS
+ )
+ pathlib.Path("/proc/self/setgroups").write_text("deny")
+ linuxnamespaces.newuidmap(-1, [uidmap], False)
+ linuxnamespaces.newgidmap(-1, [gidmap], False)
+ linuxnamespaces.mount("tmpfs", "/mnt", "tmpfs", data="mode=0755")
+ os.mkdir("/mnt/dev")
+ linuxnamespaces.populate_dev("/", "/mnt", pidns=False)
+ self.assertTrue(os.access("/mnt/dev/null", os.W_OK))
+ pathlib.Path("/mnt/dev/null").write_text("")
+
+
+class UnshareIdmapTest(unittest.TestCase):
+ def setUp(self) -> None:
+ super().setUp()
+ self.uidalloc = linuxnamespaces.IDAllocation.loadsubid("uid")
+ self.gidalloc = linuxnamespaces.IDAllocation.loadsubid("gid")
+ try:
+ self.uidalloc.find(65536)
+ self.gidalloc.find(65536)
+ except ValueError:
+ self.skipTest("insufficient /etc/sub?id allocation")
+
+ @allow_fork_exit
+ def test_unshare_user_idmap(self) -> None:
+ overflowuid = int(pathlib.Path("/proc/sys/fs/overflowuid").read_text())
+ uidmap = linuxnamespaces.IDMapping(
+ 0, self.uidalloc.allocate(65536), 65536
+ )
+ self.assertNotEqual(os.getuid(), uidmap.outerstart)
+ gidmap = linuxnamespaces.IDMapping(
+ 0, self.gidalloc.allocate(65536), 65536
+ )
+ pid = os.getpid()
+ @linuxnamespaces.run_in_fork
+ def setup() -> None:
+ linuxnamespaces.newgidmap(pid, [gidmap])
+ linuxnamespaces.newuidmap(pid, [uidmap])
+ linuxnamespaces.unshare(linuxnamespaces.CloneFlags.NEWUSER)
+ setup()
+ self.assertEqual(os.getuid(), overflowuid)
+ os.setuid(0)
+ self.assertEqual(os.getuid(), 0)
+ os.setuid(1)
+ self.assertEqual(os.getuid(), 1)
+
+ @allow_fork_exit
+ def test_populate_dev(self) -> None:
+ uidmap = linuxnamespaces.IDMapping(
+ 0, self.uidalloc.allocate(65536), 65536
+ )
+ self.assertNotEqual(os.getuid(), uidmap.outerstart)
+ gidmap = linuxnamespaces.IDMapping(
+ 0, self.gidalloc.allocate(65536), 65536
+ )
+ pid = os.getpid()
+ @linuxnamespaces.run_in_fork
+ def setup() -> None:
+ linuxnamespaces.newgidmap(pid, [gidmap])
+ linuxnamespaces.newuidmap(pid, [uidmap])
+ linuxnamespaces.unshare(
+ linuxnamespaces.CloneFlags.NEWUSER
+ | linuxnamespaces.CloneFlags.NEWNS
+ | linuxnamespaces.CloneFlags.NEWPID
+ )
+ setup()
+ os.setreuid(0, 0)
+ os.setregid(0, 0)
+ linuxnamespaces.mount("tmpfs", "/mnt", "tmpfs")
+ os.mkdir("/mnt/dev")
+ @linuxnamespaces.run_in_fork
+ def test() -> None:
+ linuxnamespaces.populate_dev("/", "/mnt")
+ test()