summaryrefslogtreecommitdiff
path: root/linuxnamespaces/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'linuxnamespaces/__init__.py')
-rw-r--r--linuxnamespaces/__init__.py333
1 files changed, 333 insertions, 0 deletions
diff --git a/linuxnamespaces/__init__.py b/linuxnamespaces/__init__.py
new file mode 100644
index 0000000..29d41f6
--- /dev/null
+++ b/linuxnamespaces/__init__.py
@@ -0,0 +1,333 @@
+# Copyright 2024 Helmut Grohne <helmut@subdivi.de>
+# SPDX-License-Identifier: GPL-3
+
+"""Provide plumbing-layer functionality for working with Linux namespaces in
+Python.
+"""
+
+import bisect
+import dataclasses
+import os
+import pathlib
+import stat
+import subprocess
+import sys
+import typing
+
+from .atlocation import *
+from .syscalls import *
+
+
+def subidranges(
+ kind: typing.Literal["uid", "gid"], login: str | None = None
+) -> typing.Iterator[tuple[int, int]]:
+ """Parse a `/etc/sub?id` file for ranges allocated to the given or current
+ user. Return all ranges as (start, count) pairs.
+ """
+ if login is None:
+ login = os.getlogin()
+ with open(f"/etc/sub{kind}") as filelike:
+ for line in filelike:
+ parts = line.strip().split(":")
+ if parts[0] == login:
+ yield (int(parts[1]), int(parts[2]))
+
+
+@dataclasses.dataclass(frozen=True)
+class IDMapping:
+ """Represent one range in a user or goup id mapping."""
+
+ innerstart: int
+ outerstart: int
+ count: int
+
+ def __post_init__(self) -> None:
+ if self.outerstart < 0:
+ raise ValueError("outerstart must not be negative")
+ if self.innerstart < 0:
+ raise ValueError("innerstart must not be negative")
+ if self.count <= 0:
+ raise ValueError("count must be positive")
+ if self.outerstart + self.count >= 1 << 64:
+ raise ValueError("outerstart + count exceed 64bits")
+ if self.innerstart + self.count >= 1 << 64:
+ raise ValueError("innerstart + count exceed 64bits")
+
+
+class IDAllocation:
+ """This represents a subset of IDs (user or group). It can be used to
+ allocate a continguous range for use with a user namespace.
+ """
+
+ def __init__(self) -> None:
+ self.ranges: list[tuple[int, int]] = []
+
+ def add_range(self, start: int, count: int) -> None:
+ """Add count ids starting from start to this allocation."""
+ if start < 0 or count <= 0:
+ raise ValueError("invalid range")
+ index = bisect.bisect_right(self.ranges, (start, 0))
+ prevrange = None
+ if index > 0:
+ prevrange = self.ranges[index - 1]
+ if prevrange[0] + prevrange[1] > start:
+ raise ValueError("attempt to add overlapping range")
+ nextrange = None
+ if index < len(self.ranges):
+ nextrange = self.ranges[index]
+ if nextrange[0] < start + count:
+ raise ValueError("attempt to add overlapping range")
+ if prevrange and prevrange[0] + prevrange[1] == start:
+ if nextrange and nextrange[0] == start + count:
+ self.ranges[index - 1] = (
+ prevrange[0],
+ prevrange[1] + count + nextrange[1],
+ )
+ del self.ranges[index]
+ else:
+ self.ranges[index - 1] = (prevrange[0], prevrange[1] + count)
+ elif nextrange and nextrange[0] == start + count:
+ self.ranges[index] = (start, count + nextrange[1])
+ else:
+ self.ranges.insert(index, (start, count))
+
+ @classmethod
+ def loadsubid(
+ cls, kind: typing.Literal["uid", "gid"], login: str | None = None,
+ ) -> "IDAllocation":
+ """Load a `/etc/sub?id` file and return ids allocated to the given
+ login or current user.
+ """
+ self = cls()
+ for start, count in subidranges(kind, login):
+ self.add_range(start, count)
+ return self
+
+ def find(self, count: int) -> int:
+ """Locate count continguous ids from this allocation. The start of
+ the allocation is returned. The allocation object is left unchanged.
+ """
+ for start, available in self.ranges:
+ if available >= count:
+ return start
+ raise ValueError("could not satisfy allocation request")
+
+ def allocate(self, count: int) -> int:
+ """Allocate count contiguous ids from this allocation. The start of
+ the allocation is returned and the ids are removed from this
+ IDAllocation object.
+ """
+ for index, (start, available) in enumerate(self.ranges):
+ if available > count:
+ self.ranges[index] = (start + count, available - count)
+ return start
+ if available == count:
+ del self.ranges[index]
+ return start
+ raise ValueError("could not satisfy allocation request")
+
+ def allocatemap(self, count: int, target: int) -> IDMapping:
+ """Allocate count contiguous ids from this allocation. An IDMapping
+ with its innerstart set to target is returned. The allocation is
+ removed from this IDAllocation object.
+ """
+ return IDMapping(target, self.allocate(count), count)
+
+
+def newidmap(
+ kind: typing.Literal["uid", "gid"],
+ pid: int,
+ mapping: list[IDMapping],
+ helper: bool | None = None,
+) -> None:
+ """Apply the given uid or gid mapping to the given process. A positive pid
+ identifies a process, other values identify the currently running process.
+ Whether setuid binaries newuidmap and newgidmap are used is determined via
+ the helper argument. A None value indicate automatic detection of whether
+ a helper is required for setting up the given mapping.
+ """
+
+ assert kind in ("uid", "gid")
+ if pid <= 0:
+ pid = os.getpid()
+ if helper is None:
+ # We cannot reliably test whether we have the right EUID and we don't
+ # implement checking whether setgroups has been denied either. Please
+ # be explicit about the helper choice in such cases.
+ helper = len(mapping) > 1 or mapping[0].count > 1
+ if helper:
+ argv = [f"new{kind}map", str(pid)]
+ for idblock in mapping:
+ argv.extend(map(str, dataclasses.astuple(idblock)))
+ subprocess.check_call(argv)
+ else:
+ pathlib.Path(f"/proc/{pid}/{kind}_map").write_text(
+ "".join(
+ "%d %d %d\n" % dataclasses.astuple(idblock)
+ for idblock in mapping
+ ),
+ encoding="ascii",
+ )
+
+
+def newuidmap(pid: int, mapping: list[IDMapping], helper: bool = True) -> None:
+ """Apply a given uid mapping to the given process. Refer to newidmap for
+ details.
+ """
+ newidmap("uid", pid, mapping, helper)
+
+
+def newgidmap(pid: int, mapping: list[IDMapping], helper: bool = True) -> None:
+ """Apply a given gid mapping to the given process. Refer to newidmap for
+ details.
+ """
+ newidmap("gid", pid, mapping, helper)
+
+
+def newidmaps(
+ pid: int,
+ uidmapping: list[IDMapping],
+ gidmapping: list[IDMapping],
+ helper: bool = True,
+) -> None:
+ """Appply a given uid and gid mapping to the given process. Refer to
+ newidmap for details.
+ """
+ newgidmap(pid, gidmapping, helper)
+ newuidmap(pid, uidmapping, helper)
+
+
+class run_in_fork:
+ """Decorator for running the decorated function once in a separate process.
+ """
+ def __init__(self, function: typing.Callable[[], None]):
+ """Fork a new process that will eventually run the given function and
+ then exit.
+ """
+ self.efd = EventFD()
+ self.pid = os.fork()
+ if self.pid == 0:
+ self.efd.read()
+ self.efd.close()
+ function()
+ sys.exit(0)
+
+ def start(self) -> None:
+ """Start the decorated function. It can only be started once."""
+ if not self.efd:
+ raise ValueError("this function can only be called once")
+ self.efd.write(1)
+ self.efd.close()
+
+ def wait(self) -> None:
+ """Wait for the process running the decorated function to finish."""
+ if self.efd:
+ raise ValueError("start must be called before wait")
+ ret = os.waitpid(self.pid, 0)
+ if ret != (self.pid, 0):
+ raise ValueError("something failed")
+
+ def __call__(self) -> None:
+ """Start the decorated function and wait for its process to finish."""
+ self.start()
+ self.wait()
+
+
+def bind_mount(
+ source: AtLocationLike,
+ target: AtLocationLike,
+ recursive: bool = False,
+ readonly: bool = False,
+) -> None:
+ """Create a bind mount from source to target. Depending on whether one of
+ the locations involves a file descriptor or not, the new or old mount API
+ will be used.
+ """
+ source = AtLocation(source)
+ target = AtLocation(target)
+ try:
+ # mypy does not know that os.fspath accepts AtLocation
+ srcloc: str | bytes
+ srcloc = os.fspath(source) # type: ignore
+ tgtloc: str | bytes
+ tgtloc = os.fspath(target) # type: ignore
+ except ValueError:
+ otflags = OpenTreeFlags.OPEN_TREE_CLONE
+ if recursive:
+ otflags |= OpenTreeFlags.AT_RECURSIVE
+ with open_tree(source, otflags) as srcfd:
+ if readonly:
+ mount_setattr(srcfd, recursive, MountAttrFlags.RDONLY)
+ return move_mount(srcfd, target)
+ else:
+ mflags = MountFlags.BIND
+ if recursive:
+ mflags |= MountFlags.REC
+ if readonly:
+ mflags |= MountFlags.RDONLY
+ return mount(srcloc, tgtloc, None, mflags)
+
+
+def populate_dev(
+ origroot: AtLocationLike,
+ newroot: PathConvertible,
+ *,
+ fuse: bool = True,
+ pidns: bool = True,
+ tun: bool = True,
+) -> None:
+ """Mount a tmpfs to the dev directory beneath newroot and populate it with
+ basic devices by bind mounting them from the dev directory beneath
+ origroot. Also mount a new pts instance.
+ """
+ origdev = AtLocation(origroot) / "dev"
+ newdev = AtLocation(newroot) / "dev"
+ mount(
+ "devtmpfs",
+ newdev,
+ "tmpfs",
+ MountFlags.NOSUID | MountFlags.NOEXEC,
+ "mode=0755",
+ )
+ bind_devices = "null zero full random urandom tty".split()
+ bind_directories = []
+ if fuse:
+ bind_devices.append("fuse")
+ if pidns:
+ (newdev / "pts").mkdir()
+ mount(
+ "devpts",
+ newdev / "pts",
+ "devpts",
+ MountFlags.NOSUID | MountFlags.NOEXEC,
+ "gid=5,mode=620,ptmxmode=666",
+ )
+ (newdev / "ptmx").symlink("pts/ptmx")
+ else:
+ bind_devices.append("ptmx")
+ bind_directories.append("pts")
+ if tun:
+ (newdev / "net").mkdir()
+ bind_devices.append("net/tun")
+ for node in bind_devices:
+ (newdev / node).mknod(stat.S_IFREG)
+ bind_mount(origdev / node, newdev / node, True)
+ for node in bind_directories:
+ (newdev / node).mkdir()
+ bind_mount(origdev / node, newdev / node, True)
+
+
+def unshare_user_idmap(
+ uidmap: list[IDMapping],
+ gidmap: list[IDMapping],
+ flags: CloneFlags = CloneFlags.NEWUSER,
+) -> None:
+ """Unshare the given namespaces (must include user) and set up the given
+ id mappings.
+ """
+ pid = os.getpid()
+ @run_in_fork
+ def setup_idmaps() -> None:
+ newidmaps(pid, uidmap, gidmap)
+ unshare(flags)
+ setup_idmaps()