summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.gitignore2
-rw-r--r--README.md2
-rwxr-xr-xexamples/cgroup.py19
-rwxr-xr-xexamples/chhostname.py60
-rwxr-xr-xexamples/chrootfuse.py22
-rwxr-xr-xexamples/chroottar.py6
-rwxr-xr-xexamples/unschroot.py353
-rwxr-xr-xexamples/unschroot_fs.py486
-rwxr-xr-xexamples/unschroot_proc.py991
-rwxr-xr-xexamples/userchroot.py2
-rwxr-xr-xexamples/withallsubuids.py8
-rw-r--r--linuxnamespaces/__init__.py395
-rw-r--r--linuxnamespaces/atlocation.py49
-rw-r--r--linuxnamespaces/filedescriptor.py35
-rw-r--r--linuxnamespaces/idmap.py250
-rw-r--r--linuxnamespaces/syscalls.py358
-rw-r--r--linuxnamespaces/systemd/__init__.py53
-rw-r--r--linuxnamespaces/systemd/dbussy.py12
-rw-r--r--linuxnamespaces/tarutils.py28
-rw-r--r--pyproject.toml11
-rw-r--r--tests/test_atlocation.py7
-rw-r--r--tests/test_simple.py47
22 files changed, 2455 insertions, 741 deletions
diff --git a/.gitignore b/.gitignore
index a2eeca3..2c9537e 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,3 +1,5 @@
__pycache__
+.coverage
.mypy_cache
+.hypothesis
.pytest_cache
diff --git a/README.md b/README.md
index b215cd6..9acba17 100644
--- a/README.md
+++ b/README.md
@@ -5,6 +5,8 @@ This is a plumbing-level Python module for working with Linux namespaces via
ctypes. It leverages glibc wrappers to access the relevant system calls and
provides typed abstractions for them.
+To see these are composed, consider looking into the `examples/` directory.
+
License
-------
GPL-3
diff --git a/examples/cgroup.py b/examples/cgroup.py
index 219dc62..2a423d3 100755
--- a/examples/cgroup.py
+++ b/examples/cgroup.py
@@ -18,19 +18,8 @@ import linuxnamespaces
import linuxnamespaces.systemd
-def get_cgroup(pid: int = -1) -> pathlib.PurePath:
- """Look up the cgroup that the given pid or the running process belongs
- to.
- """
- return pathlib.PurePath(
- pathlib.Path(
- f"/proc/{pid}/cgroup" if pid > 0 else "/proc/self/cgroup"
- ).read_text().split(":", 2)[2].strip()
- )
-
-
def main() -> None:
- mycgroup = get_cgroup()
+ mycgroup = linuxnamespaces.get_cgroup()
if not os.access(
pathlib.Path("/sys/fs/cgroup") / mycgroup.relative_to("/"),
os.W_OK,
@@ -45,7 +34,7 @@ def main() -> None:
properties={"Delegate": True},
),
)
- mycgroup = get_cgroup()
+ mycgroup = linuxnamespaces.get_cgroup()
except NotImplementedError:
linuxnamespaces.systemd.reexec_as_transient_unit(
properties={"Delegate": True}
@@ -56,8 +45,8 @@ def main() -> None:
| linuxnamespaces.CloneFlags.NEWCGROUP
)
linuxnamespaces.unshare_user_idmap(
- [linuxnamespaces.IDMapping(os.getuid(), os.getuid(), 1)],
- [linuxnamespaces.IDMapping(os.getgid(), os.getgid(), 1)],
+ [linuxnamespaces.IDMapping.identity(os.getuid())],
+ [linuxnamespaces.IDMapping.identity(os.getgid())],
namespaces,
)
linuxnamespaces.populate_sys("/", "/", namespaces, mycgroup)
diff --git a/examples/chhostname.py b/examples/chhostname.py
new file mode 100755
index 0000000..bf174e6
--- /dev/null
+++ b/examples/chhostname.py
@@ -0,0 +1,60 @@
+#!/usr/bin/python3
+# Copyright 2024 Helmut Grohne <helmut@subdivi.de>
+# SPDX-License-Identifier: GPL-3
+
+"""Unshare a UTS (and user and mount) namespace and change the hostname."""
+
+import os
+import pathlib
+import socket
+import sys
+import tempfile
+
+if __file__.split("/")[-2:-1] == ["examples"]:
+ sys.path.insert(0, "/".join(__file__.split("/")[:-2]))
+
+import linuxnamespaces
+
+
+def change_file(location: pathlib.Path, content: bytes | str) -> None:
+ if isinstance(content, str):
+ content = content.encode("ascii")
+ try:
+ st = location.stat()
+ except FileNotFoundError as err:
+ raise ValueError(
+ f"cannot change non-existent file: {location!r}"
+ ) from err
+ if st.st_size == len(content) and location.read_bytes() == content:
+ return
+ with tempfile.NamedTemporaryFile() as tfile:
+ tfile.write(content)
+ # In Python >= 3.12, we should set delete_on_close=False rather than
+ # closing the underlying file object behind tempfile's back.
+ tfile.file.close()
+ linuxnamespaces.bind_mount(tfile.name, location)
+
+
+def main() -> None:
+ hostname = sys.argv[1]
+ linuxnamespaces.unshare_user_idmap(
+ [linuxnamespaces.IDMapping.identity(os.getuid())],
+ [linuxnamespaces.IDMapping.identity(os.getgid())],
+ linuxnamespaces.CloneFlags.NEWUSER
+ | linuxnamespaces.CloneFlags.NEWNS
+ | linuxnamespaces.CloneFlags.NEWUTS,
+ )
+ socket.sethostname(hostname)
+ etc = pathlib.Path("/etc")
+ change_file(etc / "hostname", f"{hostname}\n")
+ change_file(
+ etc / "hosts",
+ f"""127.0.0.1 {hostname} localhost
+::1 {hostname} localhost
+""",
+ )
+ os.execlp(os.environ["SHELL"], os.environ["SHELL"])
+
+
+if __name__ == "__main__":
+ main()
diff --git a/examples/chrootfuse.py b/examples/chrootfuse.py
index 93a9da0..d8ca965 100755
--- a/examples/chrootfuse.py
+++ b/examples/chrootfuse.py
@@ -24,13 +24,13 @@ import linuxnamespaces
def main() -> None:
parser = argparse.ArgumentParser()
- parser.add_argument("--fstype", choices=["ext4", "squashfs"])
+ parser.add_argument("--fstype", choices=["erofs", "ext4", "squashfs"])
parser.add_argument("fsimage", type=pathlib.Path)
args = parser.parse_args()
assert args.fsimage.exists()
if args.fstype is None:
args.fstype = args.fsimage.suffix.removeprefix(".")
- if args.fstype not in ("ext4", "squashfs"):
+ if args.fstype not in ("erofs", "ext4", "squashfs"):
print("Cannot determine filesystem type for image.")
sys.exit(1)
uidmap = linuxnamespaces.IDAllocation.loadsubid("uid").allocatemap(65536)
@@ -46,6 +46,7 @@ def main() -> None:
childsock.close()
os.set_inheritable(fds[0], True)
driver = {
+ "erofs": "erofsfuse",
"ext4": "fuse2fs",
"squashfs": "squashfuse",
}[args.fstype]
@@ -62,6 +63,7 @@ def main() -> None:
socket.send_fds(mainsock, [b"\0"], [fusefd])
mainsock.close()
readonly = {
+ "erofs": True,
"ext4": False,
"squashfs": True,
}[args.fstype]
@@ -72,18 +74,18 @@ def main() -> None:
linuxnamespaces.MountFlags.RDONLY
if readonly
else linuxnamespaces.MountFlags.NONE,
- [
- "fd=%d" % fusefd,
- "rootmode=040755",
- "user_id=0",
- "group_id=0",
- "allow_other",
- ],
+ {
+ "fd": fusefd,
+ "rootmode": "040755",
+ "user_id": 0,
+ "group_id": 0,
+ "allow_other": None,
+ },
)
os.chdir("/mnt")
linuxnamespaces.bind_mount("/proc", "proc", recursive=True)
linuxnamespaces.bind_mount("/sys", "sys", recursive=True)
- linuxnamespaces.populate_dev("/", ".", pidns=False, tun=False)
+ linuxnamespaces.populate_dev("/", ".", pts="host", tun=False)
if readonly:
linuxnamespaces.mount(
"tmpfs", "tmp", "tmpfs", linuxnamespaces.MountFlags.NODEV
diff --git a/examples/chroottar.py b/examples/chroottar.py
index 3c38a97..cf0f87e 100755
--- a/examples/chroottar.py
+++ b/examples/chroottar.py
@@ -9,6 +9,7 @@ a user and mount namespace.
import argparse
import os
import pathlib
+import re
import socket
import sys
import tempfile
@@ -74,14 +75,15 @@ def main() -> None:
os.setregid(0, 0)
os.setgroups([])
for tmem in tarf:
- if tmem.name.removeprefix("./").startswith("dev/"):
+ name = re.sub(r"^/*(\.{1,2}/+)*", "", tmem.name)
+ if name.startswith("dev/"):
continue
tarf.extract(tmem, numeric_owner=True)
linuxnamespaces.bind_mount(".", "/mnt", recursive=True)
os.chdir("/mnt")
linuxnamespaces.bind_mount("/proc", "proc", recursive=True)
linuxnamespaces.bind_mount("/sys", "sys", recursive=True)
- linuxnamespaces.populate_dev("/", ".", pidns=False, tun=False)
+ linuxnamespaces.populate_dev("/", ".", pts="host", tun=False)
linuxnamespaces.pivot_root(".", ".")
linuxnamespaces.umount(".", linuxnamespaces.UmountFlags.DETACH)
if args.command:
diff --git a/examples/unschroot.py b/examples/unschroot.py
deleted file mode 100755
index 3d1c900..0000000
--- a/examples/unschroot.py
+++ /dev/null
@@ -1,353 +0,0 @@
-#!/usr/bin/python3
-# Copyright 2024 Helmut Grohne <helmut@subdivi.de>
-# SPDX-License-Identifier: GPL-3
-
-"""Emulate schroot using namespaces sufficiently well that sbuild can deal with
-it but not any better. It assumes that ~/.cache/sbuild contains tars suitable
-for sbuild --chroot-mode=unshare. Additionally, those tars are expected to
-contain the non-essential passwd package. The actual sessions are stored in
-~/.cache/unschroot. For using it with sbuild, your sbuildrc should contain:
-
- $chroot_mode = "schroot";
- $schroot = "/path/to/unschroot";
-"""
-
-
-import argparse
-import functools
-import grp
-import itertools
-import os
-import pathlib
-import pwd
-import shutil
-import signal
-import socket
-import stat
-import sys
-import tempfile
-import typing
-
-if __file__.split("/")[-2:-1] == ["examples"]:
- sys.path.insert(0, "/".join(__file__.split("/")[:-2]))
-
-import linuxnamespaces
-import linuxnamespaces.tarutils
-
-
-class TarFile(
- linuxnamespaces.tarutils.ZstdTarFile, linuxnamespaces.tarutils.XAttrTarFile
-):
- pass
-
-
-class Chroot:
- # Ignore $HOME as sbuild sets to something invalid
- home = pathlib.Path(pwd.getpwuid(os.getuid()).pw_dir)
- cache_sbuild = home / ".cache/sbuild"
- cache_unschroot = home / ".cache/unschroot"
-
- def __init__(self, path: pathlib.Path, aliases: set[str] | None = None):
- self.path = path
- self.aliases = set() if aliases is None else aliases
-
- @functools.cached_property
- def namespace(self) -> str:
- if self.path.is_file():
- return "Chroot"
- if self.path.is_dir():
- return "Session"
- raise ValueError("invalid chroot object")
-
- @functools.cached_property
- def name(self) -> str:
- suffix = "-sbuild" if self.namespace == "Chroot" else ""
- return self.path.name.split(".", 1)[0] + suffix
-
- def infostr(self) -> str:
- lines = [
- f"--- {self.namespace} ---",
- f"Name {self.name}",
- ]
- if self.namespace == "Chroot":
- lines.extend(["Type file", f"File {self.path}"])
- if self.namespace == "Session":
- lines.extend(
- [
- f"Location {self.path}",
- "Session Purged true",
- "Type unshare",
- ]
- )
- lines.append("Aliases " + " ".join(sorted(self.aliases)))
- return "".join(map("%s\n".__mod__, lines))
-
- @classmethod
- def searchchroot(cls, name: str) -> "Chroot":
- name = name.removeprefix("chroot:")
- name = name.removesuffix("-sbuild")
- for path in cls.cache_sbuild.iterdir():
- if path.name.startswith(name + ".t"):
- return cls(path)
- raise KeyError(name)
-
- @classmethod
- def searchsession(cls, name: str) -> "Chroot":
- name = name.removeprefix("session:")
- path = cls.cache_unschroot / name
- if not path.is_dir():
- raise KeyError(name)
- return cls(path)
-
- @classmethod
- def newsession(cls) -> "Chroot":
- cls.cache_unschroot.mkdir(parents=True, exist_ok=True)
- return Chroot(
- pathlib.Path(
- tempfile.mkdtemp(prefix="chroot", dir=cls.cache_unschroot)
- ),
- )
-
- @classmethod
- def scan_sbuild(cls) -> typing.Iterator["Chroot"]:
- if cls.cache_sbuild.is_dir():
- chroots = []
- aliases: dict[str, set[str]] = {}
- for path in cls.cache_sbuild.iterdir():
- if path.is_symlink():
- alias = path.name.split(".", 1)[0] + "-sbuild"
- aliases.setdefault(str(path.readlink()), set()).add(alias)
- elif path.is_file():
- chroots.append(path)
- for path in chroots:
- yield cls(path, aliases.get(path.name, set()))
-
- @classmethod
- def scan_unschroot(cls) -> typing.Iterator["Chroot"]:
- if cls.cache_unschroot.is_dir():
- yield from map(cls, cls.cache_unschroot.iterdir())
-
-
-def do_info(args: argparse.Namespace) -> None:
- """Show information about selected chroots"""
- chroots: typing.Iterable[Chroot]
- if args.chroot:
- try:
- chroots = [Chroot.searchchroot(args.chroot)]
- except KeyError:
- chroots = [Chroot.searchsession(args.chroot)]
- else:
- chroots = itertools.chain(
- Chroot.scan_sbuild(), Chroot.scan_unschroot()
- )
- sys.stdout.write("\n".join(chroot.infostr() for chroot in chroots))
-
-
-def do_begin_session(args: argparse.Namespace) -> None:
- """Begin a session; returns the session ID"""
- source = Chroot.searchchroot(args.chroot)
- session = Chroot.newsession()
- uidmap = linuxnamespaces.IDAllocation.loadsubid("uid").allocatemap(65536)
- gidmap = linuxnamespaces.IDAllocation.loadsubid("gid").allocatemap(65536)
- mainsock, childsock = socket.socketpair()
- with TarFile.open(source.path, "r:*") as tarf:
- pid = os.fork()
- if pid == 0:
- mainsock.close()
- os.chdir(session.path)
- linuxnamespaces.unshare(
- linuxnamespaces.CloneFlags.NEWUSER
- | linuxnamespaces.CloneFlags.NEWNS,
- )
- childsock.send(b"\0")
- childsock.recv(1)
- childsock.close()
- os.setgid(0)
- os.setuid(0)
- for tmem in tarf:
- if not tmem.name.startswith(("dev/", "./dev/")):
- tarf.extract(tmem, numeric_owner=True)
- etc_hosts = pathlib.Path("./etc/hosts")
- if not etc_hosts.exists():
- etc_hosts.write_text(
- """127.0.0.1 localhost
-127.0.1.1 %s
-::1 localhost ip6-localhost ip6-loopback
-"""
- % socket.gethostname(),
- )
- sys.exit(0)
- childsock.close()
- mainsock.recv(1)
- linuxnamespaces.newidmaps(pid, [uidmap], [gidmap])
- linuxnamespaces.unshare_user_idmap(
- [uidmap, linuxnamespaces.IDMapping(65536, os.getuid(), 1)],
- [gidmap, linuxnamespaces.IDMapping(65536, os.getgid(), 1)],
- )
- os.chown(session.path, 0, 0)
- session.path.chmod(0o755)
- mainsock.send(b"\0")
- mainsock.close()
- _, ret = os.waitpid(pid, 0)
- print(session.name)
- sys.exit(ret)
-
-
-def exec_perl_dumb_init(pid: int) -> typing.NoReturn:
- """Roughly implement dumb-init in perl: Wait for all children until we
- receive an exit from the given pid and forward its status.
- """
- os.execlp(
- "perl",
- "perl",
- "-e",
- "$r=255<<8;" # exit 255 when we run out of children
- "do{"
- "$p=wait;"
- f"$r=$?,$p=0 if $p=={pid};"
- "}while($p>0);"
- "exit(0<$r<256?128|$r:$r>>8);", # sig -> 128+sig; exit -> exit
- )
-
-
-def do_run_session(args: argparse.Namespace) -> None:
- """Run an existing session"""
- session = Chroot.searchsession(args.chroot)
- uidmap = linuxnamespaces.IDAllocation.loadsubid("uid").allocatemap(65536)
- gidmap = linuxnamespaces.IDAllocation.loadsubid("gid").allocatemap(65536)
- mainsock, childsock = socket.socketpair()
- pid = os.fork()
- pidfd: int
- if pid == 0:
- mainsock.close()
- for fd in (1, 2):
- if stat.S_ISFIFO(os.fstat(fd).st_mode):
- os.fchmod(fd, 0o666)
- os.chdir(session.path)
- ns = (
- linuxnamespaces.CloneFlags.NEWUSER
- | linuxnamespaces.CloneFlags.NEWNS
- | linuxnamespaces.CloneFlags.NEWPID
- )
- if args.isolate_network:
- ns |= linuxnamespaces.CloneFlags.NEWNET
- linuxnamespaces.unshare(ns)
- childsock.send(b"\0")
- childsock.recv(1)
- if os.fork() != 0:
- sys.exit(0)
- assert os.getpid() == 1
- with linuxnamespaces.FileDescriptor(os.pidfd_open(1, 0)) as pidfd:
- socket.send_fds(childsock, [b"\0"], [pidfd])
- os.setgid(0)
- os.setuid(0)
- linuxnamespaces.bind_mount(".", "/mnt", recursive=True)
- os.chdir("/mnt")
- linuxnamespaces.populate_sys("/", ".", ns, devices=True)
- linuxnamespaces.populate_proc("/", ".", ns)
- linuxnamespaces.populate_dev(
- "/", ".", tun=bool(ns & linuxnamespaces.CloneFlags.NEWNET)
- )
- linuxnamespaces.pivot_root(".", ".")
- linuxnamespaces.umount(".", linuxnamespaces.UmountFlags.DETACH)
- os.chdir("/")
- if ns & linuxnamespaces.CloneFlags.NEWNET:
- linuxnamespaces.enable_loopback_if()
- if args.user.isdigit():
- spw = pwd.getpwuid(int(args.user))
- else:
- spw = pwd.getpwnam(args.user)
- supplementary = [
- sgr.gr_gid for sgr in grp.getgrall() if spw.pw_name in sgr.gr_mem
- ]
-
- childsock.recv(1)
- childsock.close()
- rfd, wfd = linuxnamespaces.FileDescriptor.pipe(inheritable=False)
- pid = os.fork()
- if pid == 0:
- wfd.close()
- if args.directory:
- os.chdir(args.directory)
- os.setgroups(supplementary)
- os.setgid(spw.pw_gid)
- os.setuid(spw.pw_uid)
- if "PATH" not in os.environ:
- if spw.pw_uid == 0:
- os.environ["PATH"] = "/usr/sbin:/sbin:/usr/bin:/bin"
- else:
- os.environ["PATH"] = "/usr/bin:/bin"
- if not args.command:
- args.command.append("bash")
- # Wait until Python has handed off to Perl.
- os.read(rfd, 1)
- os.execvp(args.command[0], args.command)
- else:
- rfd.close()
- linuxnamespaces.prctl_set_pdeathsig(signal.SIGKILL)
- os.close(0)
- # It is important that we now exec to get rid of our previous
- # execution context that carries pieces such as memory maps from
- # different namespaces that could allow escalating privileges. The
- # exec will close wfd and allow the target process to exec.
- exec_perl_dumb_init(pid)
- childsock.close()
- mainsock.recv(1)
- linuxnamespaces.newidmaps(pid, [uidmap], [gidmap])
- linuxnamespaces.prctl_set_child_subreaper(True)
- mainsock.send(b"\0")
- _data, fds, _flags, _address = socket.recv_fds(mainsock, 1, 1)
- pidfd = fds[0]
- os.waitpid(pid, 0)
- linuxnamespaces.prctl_set_child_subreaper(False)
- mainsock.send(b"\0")
- wres = os.waitid(os.P_PIDFD, pidfd, os.WEXITED)
- assert wres is not None
- sys.exit(wres.si_status)
-
-
-def do_end_session(args: argparse.Namespace) -> None:
- """End an existing session"""
- session = Chroot.searchsession(args.chroot)
- uidmap = linuxnamespaces.IDAllocation.loadsubid("uid").allocatemap(65536)
- gidmap = linuxnamespaces.IDAllocation.loadsubid("gid").allocatemap(65536)
- linuxnamespaces.unshare_user_idmap(
- [uidmap, linuxnamespaces.IDMapping(65536, os.getuid(), 1)],
- [gidmap, linuxnamespaces.IDMapping(65536, os.getgid(), 1)],
- )
- shutil.rmtree(session.path)
-
-
-def main() -> None:
- parser = argparse.ArgumentParser()
- group = parser.add_mutually_exclusive_group(required=True)
- for comm in ("info", "begin-session", "run-session", "end-session"):
- func = globals()["do_" + comm.replace("-", "_")]
- group.add_argument(
- f"-{comm[0]}",
- f"--{comm}",
- dest="subcommand",
- action="store_const",
- const=func,
- help=func.__doc__,
- )
- parser.add_argument(
- "-c",
- "--chroot",
- dest="chroot",
- action="store",
- help="Use specified chroot",
- )
- parser.add_argument("-d", "--directory", action="store")
- parser.add_argument("-p", "--preserve-environment", action="store_true")
- parser.add_argument("-q", "--quiet", action="store_true")
- parser.add_argument("-u", "--user", action="store", default=os.getlogin())
- parser.add_argument("--isolate-network", action="store_true")
- parser.add_argument("command", nargs="*")
- args = parser.parse_args()
- assert args.subcommand is not None
- args.subcommand(args)
-
-
-if __name__ == "__main__":
- main()
diff --git a/examples/unschroot_fs.py b/examples/unschroot_fs.py
new file mode 100755
index 0000000..68e2320
--- /dev/null
+++ b/examples/unschroot_fs.py
@@ -0,0 +1,486 @@
+#!/usr/bin/python3
+# Copyright 2024 Helmut Grohne <helmut@subdivi.de>
+# SPDX-License-Identifier: GPL-3
+
+"""Emulate schroot using namespaces sufficiently well that sbuild can deal with
+it but not any better. It assumes that ~/.cache/sbuild contains tars suitable
+for sbuild --chroot-mode=unshare. Additionally, those tars are expected to
+contain the non-essential passwd package. The actual sessions are stored in
+~/.cache/unschroot. For using it with sbuild, your sbuildrc should contain:
+
+ $chroot_mode = "schroot";
+ $schroot = "/path/to/unschroot";
+
+State and sessions are retained via the filesystem in ~/.cache/unschroot
+between calls with no background processes or persistent namespaces.
+"""
+
+
+import argparse
+import grp
+import os
+import pathlib
+import pwd
+import shutil
+import signal
+import socket
+import stat
+import sys
+import tempfile
+import typing
+
+if __file__.split("/")[-2:-1] == ["examples"]:
+ sys.path.insert(0, "/".join(__file__.split("/")[:-2]))
+
+import linuxnamespaces
+import linuxnamespaces.tarutils
+
+
+class TarFile(
+ linuxnamespaces.tarutils.ZstdTarFile, linuxnamespaces.tarutils.XAttrTarFile
+):
+ pass
+
+
+def write_etc_hosts(root: os.PathLike[str] | str) -> None:
+ etc_hosts = pathlib.Path(root) / "etc/hosts"
+ if not etc_hosts.exists():
+ etc_hosts.write_text(
+ """127.0.0.1 localhost
+127.0.1.1 %s
+::1 localhost ip6-localhost ip6-loopback
+"""
+ % socket.gethostname(),
+ encoding="ascii",
+ )
+
+
+def load_subids() -> (
+ tuple[linuxnamespaces.IDMapping, linuxnamespaces.IDMapping]
+):
+ return (
+ linuxnamespaces.IDAllocation.loadsubid("uid").allocatemap(65536),
+ linuxnamespaces.IDAllocation.loadsubid("gid").allocatemap(65536),
+ )
+
+
+# Ignore $HOME as sbuild sets to something invalid
+HOME = pathlib.Path(pwd.getpwuid(os.getuid()).pw_dir)
+CACHE_SBUILD = HOME / ".cache/sbuild"
+CACHE_UNSCHROOT = HOME / ".cache/unschroot"
+CACHE_DIRECTORY_CHROOTS = HOME / ".cache/directory_chroots"
+
+
+class ChrootBase:
+ namespace: str
+ name: str
+
+ def __init__(self) -> None:
+ self.aliases: set[str] = set()
+
+ def infodata(self) -> dict[str, str]:
+ return {
+ "Name": self.name,
+ "Aliases": " ".join(sorted(self.aliases)),
+ }
+
+ def infostr(self) -> str:
+ return f"--- {self.namespace} ---\n" + "".join(
+ map("%s %s\n".__mod__, self.infodata().items())
+ )
+
+
+class SourceChroot(ChrootBase):
+ namespace = "Chroot"
+
+ def newsession(self) -> "SessionChroot":
+ raise NotImplementedError
+
+
+class SessionChroot(ChrootBase):
+ namespace = "Session"
+
+ def infodata(self) -> dict[str, str]:
+ data = super().infodata()
+ data["Session Purged"] = "true"
+ data["Type"] = "unshare"
+ return data
+
+ def mount(self) -> pathlib.Path:
+ raise NotImplementedError
+
+
+class TarSourceChroot(SourceChroot):
+ def __init__(self, path: pathlib.Path):
+ super().__init__()
+ self.path = path
+ self.name = path.name.split(".", 1)[0] + "-sbuild"
+
+ def infodata(self) -> dict[str, str]:
+ data = super().infodata()
+ data["Type"] = "file"
+ data["File"] = str(self.path)
+ return data
+
+ def newsession(self) -> "TarSessionChroot":
+ CACHE_UNSCHROOT.mkdir(parents=True, exist_ok=True)
+ session = TarSessionChroot(
+ pathlib.Path(tempfile.mkdtemp(prefix="tar-", dir=CACHE_UNSCHROOT)),
+ )
+
+ uidmap, gidmap = load_subids()
+ mainsock, childsock = socket.socketpair()
+ with TarFile.open(self.path, "r:*") as tarf:
+ pid = os.fork()
+ if pid == 0:
+ mainsock.close()
+ os.chdir(session.path)
+ linuxnamespaces.unshare(
+ linuxnamespaces.CloneFlags.NEWUSER
+ | linuxnamespaces.CloneFlags.NEWNS,
+ )
+ childsock.send(b"\0")
+ childsock.recv(1)
+ childsock.close()
+ os.setgid(0)
+ os.setuid(0)
+ for tmem in tarf:
+ if not tmem.name.startswith(("dev/", "./dev/")):
+ tarf.extract(tmem, numeric_owner=True)
+ write_etc_hosts(".")
+ sys.exit(0)
+ childsock.close()
+ mainsock.recv(1)
+ pid2 = os.fork()
+ if pid2 == 0:
+ linuxnamespaces.unshare_user_idmap(
+ [uidmap, linuxnamespaces.IDMapping(65536, os.getuid(), 1)],
+ [gidmap, linuxnamespaces.IDMapping(65536, os.getgid(), 1)],
+ )
+ os.chown(session.path, 0, 0)
+ session.path.chmod(0o755)
+ sys.exit(0)
+ linuxnamespaces.newidmaps(pid, [uidmap], [gidmap])
+ _, ret = os.waitpid(pid2, 0)
+ assert ret == 0
+ mainsock.send(b"\0")
+ mainsock.close()
+ _, ret = os.waitpid(pid, 0)
+ assert ret == 0
+ return session
+
+
+class TarSessionChroot(SessionChroot):
+ def __init__(self, path: pathlib.Path):
+ super().__init__()
+ self.path = path
+ self.name = path.name
+
+ def mount(self) -> pathlib.Path:
+ linuxnamespaces.bind_mount(self.path, "/mnt", recursive=True)
+ return pathlib.Path("/mnt")
+
+
+class DirectorySourceChroot(SourceChroot):
+ def __init__(self, path: pathlib.Path):
+ super().__init__()
+ self.path = path
+ self.name = path.name + "-sbuild"
+
+ def infodata(self) -> dict[str, str]:
+ data = super().infodata()
+ data["Type"] = "directory"
+ data["Directory"] = str(self.path)
+ return data
+
+ def newsession(self) -> "DirectorySessionChroot":
+ CACHE_UNSCHROOT.mkdir(parents=True, exist_ok=True)
+ path = pathlib.Path(
+ tempfile.mkdtemp(
+ prefix=f"overlay-{self.name}-", dir=CACHE_UNSCHROOT
+ ),
+ )
+ session = DirectorySessionChroot(self, path)
+ uidmap, gidmap = load_subids()
+ pid = os.fork()
+ if pid == 0:
+ linuxnamespaces.unshare_user_idmap(
+ [uidmap, linuxnamespaces.IDMapping(65536, os.getuid(), 1)],
+ [gidmap, linuxnamespaces.IDMapping(65536, os.getgid(), 1)],
+ )
+ os.setgid(0)
+ os.setuid(0)
+ os.chown(path, 0, 0)
+ path.chmod(0o755)
+ (path / "upper").mkdir()
+ (path / "work").mkdir()
+ if not (self.path / "etc/hosts").exists():
+ (path / "upper/etc").mkdir()
+ write_etc_hosts(path / "upper")
+ sys.exit(0)
+ _, ret = os.waitpid(pid, 0)
+ assert ret == 0
+ return session
+
+
+class DirectorySessionChroot(SessionChroot):
+ def __init__(self, source: DirectorySourceChroot, path: pathlib.Path):
+ super().__init__()
+ self.source = source
+ self.path = path
+ self.name = path.name
+
+ def infodata(self) -> dict[str, str]:
+ data = super().infodata()
+ data["Type"] = "directory"
+ data["Directory"] = str(self.source.path)
+ # It's a gross lie, but sbuild does not work without. It has to
+ # actually exist and should not occur inside build logs.
+ data["Location"] = str(self.source.path)
+ return data
+
+ def mount(self) -> pathlib.Path:
+ mnt = "/mnt"
+ linuxnamespaces.mount(
+ "overlay",
+ mnt,
+ "overlay",
+ data={
+ "lowerdir": str(self.source.path),
+ "upperdir": str(self.path / "upper"),
+ "workdir": str(self.path / "work"),
+ "userxattr": None,
+ },
+ )
+ return pathlib.Path(mnt)
+
+
+def scan_chroots() -> dict[str, ChrootBase]:
+ chrootmap: dict[str, ChrootBase] = {}
+ chroot: ChrootBase
+ for loc, cls in (
+ (CACHE_SBUILD, TarSourceChroot),
+ (CACHE_DIRECTORY_CHROOTS, DirectorySourceChroot),
+ ):
+ if loc.is_dir():
+ chroots = []
+ aliases: dict[str, set[str]] = {}
+ for path in loc.iterdir():
+ if path.is_symlink():
+ alias = path.name.split(".", 1)[0] + "-sbuild"
+ aliases.setdefault(str(path.readlink()), set()).add(alias)
+ else:
+ chroots.append(path)
+ for path in chroots:
+ chroot = cls(path)
+ chrootaliases = aliases.get(path.name, set())
+ chroot.aliases.update(chrootaliases)
+ if chroot.name not in chrootmap:
+ chrootmap[chroot.name] = chroot
+ for alias in chrootaliases:
+ if alias not in chrootmap:
+ chrootmap[alias] = chroot
+
+ if CACHE_UNSCHROOT.is_dir():
+ for path in CACHE_UNSCHROOT.iterdir():
+ if path.name.startswith("tar-"):
+ chroot = TarSessionChroot(path)
+ if chroot.name not in chrootmap:
+ chrootmap[chroot.name] = chroot
+ elif path.name.startswith("overlay-"):
+ base = "-".join(path.name.split("-")[1:-1])
+ if base not in chrootmap:
+ continue
+ source = chrootmap[base]
+ assert isinstance(source, DirectorySourceChroot)
+ chroot = DirectorySessionChroot(source, path)
+ if chroot.name not in chrootmap:
+ chrootmap[chroot.name] = chroot
+
+ return chrootmap
+
+
+def do_info(args: argparse.Namespace) -> None:
+ """Show information about selected chroots"""
+ chrootmap = scan_chroots()
+ chroots: typing.Iterable[ChrootBase]
+ if args.chroot:
+ chroots = [
+ chrootmap[
+ args.chroot.removeprefix("chroot:").removeprefix("session:")
+ ],
+ ]
+ else:
+ chroots = chrootmap.values()
+ sys.stdout.write("\n".join(chroot.infostr() for chroot in chroots))
+
+
+def do_begin_session(args: argparse.Namespace) -> None:
+ """Begin a session; returns the session ID"""
+ chrootmap = scan_chroots()
+ source = chrootmap[args.chroot.removeprefix("chroot:")]
+ assert isinstance(source, SourceChroot)
+ session = source.newsession()
+ print(session.name)
+
+
+def exec_perl_dumb_init(pid: int) -> typing.NoReturn:
+ """Roughly implement dumb-init in perl: Wait for all children until we
+ receive an exit from the given pid and forward its status.
+ """
+ os.execlp(
+ "perl",
+ "perl",
+ "-e",
+ "$r=255<<8;" # exit 255 when we run out of children
+ "do{"
+ "$p=wait;"
+ f"$r=$?,$p=0 if $p=={pid};"
+ "}while($p>0);"
+ "exit(0<$r<256?128|$r:$r>>8);", # sig -> 128+sig; exit -> exit
+ )
+
+
+def do_run_session(args: argparse.Namespace) -> None:
+ """Run an existing session"""
+ chrootmap = scan_chroots()
+ session = chrootmap[args.chroot]
+ assert isinstance(session, SessionChroot)
+ uidmap, gidmap = load_subids()
+ mainsock, childsock = socket.socketpair()
+ pid = os.fork()
+ pidfd: int
+ if pid == 0:
+ mainsock.close()
+ for fd in (1, 2):
+ if stat.S_ISFIFO(os.fstat(fd).st_mode):
+ os.fchmod(fd, 0o666)
+ ns = (
+ linuxnamespaces.CloneFlags.NEWUSER
+ | linuxnamespaces.CloneFlags.NEWNS
+ | linuxnamespaces.CloneFlags.NEWPID
+ )
+ if args.isolate_network:
+ ns |= linuxnamespaces.CloneFlags.NEWNET
+ linuxnamespaces.unshare(ns)
+ childsock.send(b"\0")
+ childsock.recv(1)
+ if os.fork() != 0:
+ sys.exit(0)
+ assert os.getpid() == 1
+ with linuxnamespaces.FileDescriptor(os.pidfd_open(1, 0)) as pidfd:
+ socket.send_fds(childsock, [b"\0"], [pidfd])
+ os.setgid(0)
+ os.setuid(0)
+ root = session.mount()
+ os.chdir(root)
+ linuxnamespaces.populate_sys("/", ".", ns, devices=True)
+ linuxnamespaces.populate_proc("/", ".", ns)
+ linuxnamespaces.populate_dev(
+ "/", ".", tun=bool(ns & linuxnamespaces.CloneFlags.NEWNET)
+ )
+ linuxnamespaces.pivot_root(".", ".")
+ linuxnamespaces.umount(".", linuxnamespaces.UmountFlags.DETACH)
+ os.chdir("/")
+ if ns & linuxnamespaces.CloneFlags.NEWNET:
+ linuxnamespaces.enable_loopback_if()
+ if args.user.isdigit():
+ spw = pwd.getpwuid(int(args.user))
+ else:
+ spw = pwd.getpwnam(args.user)
+ supplementary = [
+ sgr.gr_gid for sgr in grp.getgrall() if spw.pw_name in sgr.gr_mem
+ ]
+
+ childsock.recv(1)
+ childsock.close()
+ rfd, wfd = linuxnamespaces.FileDescriptor.pipe(inheritable=False)
+ pid = os.fork()
+ if pid == 0:
+ wfd.close()
+ if args.directory:
+ os.chdir(args.directory)
+ os.setgroups(supplementary)
+ os.setgid(spw.pw_gid)
+ os.setuid(spw.pw_uid)
+ if "PATH" not in os.environ:
+ if spw.pw_uid == 0:
+ os.environ["PATH"] = "/usr/sbin:/sbin:/usr/bin:/bin"
+ else:
+ os.environ["PATH"] = "/usr/bin:/bin"
+ if not args.command:
+ args.command.append("bash")
+ # Wait until Python has handed off to Perl.
+ os.read(rfd, 1)
+ os.execvp(args.command[0], args.command)
+ else:
+ rfd.close()
+ linuxnamespaces.prctl_set_pdeathsig(signal.SIGKILL)
+ os.close(0)
+ # It is important that we now exec to get rid of our previous
+ # execution context that carries pieces such as memory maps from
+ # different namespaces that could allow escalating privileges. The
+ # exec will close wfd and allow the target process to exec.
+ exec_perl_dumb_init(pid)
+ childsock.close()
+ mainsock.recv(1)
+ linuxnamespaces.newidmaps(pid, [uidmap], [gidmap])
+ linuxnamespaces.prctl_set_child_subreaper(True)
+ mainsock.send(b"\0")
+ _data, fds, _flags, _address = socket.recv_fds(mainsock, 1, 1)
+ pidfd = fds[0]
+ os.waitpid(pid, 0)
+ linuxnamespaces.prctl_set_child_subreaper(False)
+ mainsock.send(b"\0")
+ wres = os.waitid(os.P_PIDFD, pidfd, os.WEXITED)
+ assert wres is not None
+ sys.exit(wres.si_status)
+
+
+def do_end_session(args: argparse.Namespace) -> None:
+ """End an existing session"""
+ chrootmap = scan_chroots()
+ session = chrootmap[args.chroot]
+ assert isinstance(session, (TarSessionChroot, DirectorySessionChroot))
+ uidmap = linuxnamespaces.IDAllocation.loadsubid("uid").allocatemap(65536)
+ gidmap = linuxnamespaces.IDAllocation.loadsubid("gid").allocatemap(65536)
+ linuxnamespaces.unshare_user_idmap(
+ [uidmap, linuxnamespaces.IDMapping(65536, os.getuid(), 1)],
+ [gidmap, linuxnamespaces.IDMapping(65536, os.getgid(), 1)],
+ )
+ shutil.rmtree(session.path)
+
+
+def main() -> None:
+ parser = argparse.ArgumentParser()
+ group = parser.add_mutually_exclusive_group(required=True)
+ for comm in ("info", "begin-session", "run-session", "end-session"):
+ func = globals()["do_" + comm.replace("-", "_")]
+ group.add_argument(
+ f"-{comm[0]}",
+ f"--{comm}",
+ dest="subcommand",
+ action="store_const",
+ const=func,
+ help=func.__doc__,
+ )
+ parser.add_argument(
+ "-c",
+ "--chroot",
+ dest="chroot",
+ action="store",
+ help="Use specified chroot",
+ )
+ parser.add_argument("-d", "--directory", action="store")
+ parser.add_argument("-p", "--preserve-environment", action="store_true")
+ parser.add_argument("-q", "--quiet", action="store_true")
+ parser.add_argument("-u", "--user", action="store", default=os.getlogin())
+ parser.add_argument("--isolate-network", action="store_true")
+ parser.add_argument("command", nargs="*")
+ args = parser.parse_args()
+ assert args.subcommand is not None
+ args.subcommand(args)
+
+
+if __name__ == "__main__":
+ main()
diff --git a/examples/unschroot_proc.py b/examples/unschroot_proc.py
new file mode 100755
index 0000000..271a898
--- /dev/null
+++ b/examples/unschroot_proc.py
@@ -0,0 +1,991 @@
+#!/usr/bin/python3
+# Copyright 2025 Helmut Grohne <helmut@subdivi.de>
+# SPDX-License-Identifier: GPL-3
+
+"""Emulate schroot using namespaces and a background session process
+sufficiently well that sbuild can deal with it but not any better. For using
+it with sbuild, your sbuildrc should contain:
+
+ $chroot_mode = "schroot";
+ $schroot = "/path/to/unschroot";
+
+It may automatically discover chroots from ~/.cache/sbuild in the layout that
+sbuild's unshare mode consumes, but you may also create a
+~/.config/unschroot.ini to apply more detailed configuration.
+
+State and sessions are retained via a background process and namespaces as well
+as a varlink socket below $RUNTIME_DIR/unschroot.
+"""
+
+import argparse
+import asyncio
+import collections.abc
+import configparser
+import contextlib
+import errno
+import functools
+import os
+import pathlib
+import pwd
+import signal
+import socket
+import stat
+import sys
+import tempfile
+import typing
+import uuid
+
+import platformdirs
+
+import asyncvarlink
+import asyncvarlink.serviceinterface
+import linuxnamespaces
+import linuxnamespaces.tarutils
+
+
+class TarFile(
+ linuxnamespaces.tarutils.ZstdTarFile, linuxnamespaces.tarutils.XAttrTarFile
+):
+ """A TarFile subclass that handles both zstd compressed archives and
+ extended attributes.
+ """
+
+
+# Ignore $HOME as sbuild sets to something invalid
+HOME = pathlib.Path(pwd.getpwuid(os.getuid()).pw_dir)
+UNSCHROOT_CONFIG = HOME / ".config/unschroot.ini"
+CACHE_SBUILD = HOME / ".cache/sbuild"
+CACHE_DIRECTORY_CHROOTS = HOME / ".cache/directory_chroots"
+
+
+_P = typing.ParamSpec("_P")
+_T = typing.TypeVar("_T")
+
+
+def async_as_sync(
+ func: typing.Callable[_P, typing.Coroutine[typing.Any, typing.Any, _T]]
+) -> typing.Callable[_P, _T]:
+ """Turn an async function into a sync one by running it its own loop."""
+
+ @functools.wraps(func)
+ def wrapped(*args: _P.args, **kwargs: _P.kwargs) -> _T:
+ return asyncio.run(func(*args, **kwargs))
+
+ return wrapped
+
+
+def run_here(func: typing.Callable[[], _T]) -> _T:
+ """A decorator to run the given function right where it is defined once. It
+ replaces the functionwith its call result.
+ """
+ return func()
+
+
+def runtime_path() -> pathlib.Path:
+ """Return the location where IPC sockets are to be stored."""
+ return platformdirs.user_runtime_path("unschroot")
+
+
+def path_below(path: str) -> pathlib.Path:
+ """Take a relative or absolute path, anchor it in / and return a relative
+ version of it.
+ """
+ parts: list[str] = []
+ for part in pathlib.PurePath("/", path).relative_to("/").parts:
+ if part == ".." and parts:
+ parts.pop()
+ else:
+ parts.append(part)
+ return pathlib.Path(*parts)
+
+
+class ChrootBase:
+ """Base class for chroots from a schroot way of looking at them."""
+
+ namespace: str
+
+ def __init__(self, name: str):
+ self.name = name
+
+ def aliases(self) -> set[str]:
+ """Return the set of alternative names recorded for this chroot."""
+ raise NotImplementedError
+
+ def infodata(self) -> dict[str, str]:
+ """Return a mapping with information needed for schroot -i."""
+ return {
+ "Name": self.name,
+ "Aliases": " ".join(sorted(self.aliases())),
+ }
+
+ def infostr(self) -> str:
+ """Construct the schroot -i output for this chroot."""
+ return f"--- {self.namespace} ---\n" + "".join(
+ map("%s %s\n".__mod__, self.infodata().items())
+ )
+
+
+class SourceChroot(ChrootBase):
+ """Represent a schroot source chroot to be instantiated into a session."""
+
+ namespace = "Chroot"
+
+ def __init__(self, name: str, config: collections.abc.Mapping[str, str]):
+ super().__init__(name)
+ self.config = config
+
+ def aliases(self) -> set[str]:
+ try:
+ aliasstr = self.config["aliases"]
+ except KeyError:
+ return set()
+ return set(map(str.strip, aliasstr.split(",")))
+
+ def infodata(self) -> dict[str, str]:
+ data = super().infodata()
+ if (
+ self.config.get("rootfstype") in ("none", None)
+ and self.config.get("rootfsextract")
+ ):
+ data["Type"] = "file"
+ data["File"] = self.config["rootfsextract"]
+ elif (
+ self.config["rootfstype"] == "bind"
+ and self.config.get("rootfsdir")
+ ):
+ data["Type"] = "directory"
+ data["Directory"] = self.config["rootfsdir"]
+ elif (
+ self.config["rootfstype"].startswith("fuse.")
+ and self.config.get("rootfsdev")
+ ):
+ data["Type"] = "file"
+ data["File"] = self.config["rootfsdev"]
+ else:
+ assert False, f"unexpected chroot configuration {self.config!r}"
+ return data
+
+ async def mount(
+ self,
+ proxy: asyncvarlink.VarlinkInterfaceProxy,
+ backingdir: str | None,
+ ) -> None:
+ """Create the root filesystem and chdir the supervisor to it."""
+ match self.config.get("backingstore"):
+ case "tmpfs" | None:
+ await proxy.Chdir(path="/opt")
+ await proxy.MountTmpfs(options={"mode": "0755"})
+ case "directory":
+ await proxy.Chdir(path=backingdir)
+ case _:
+ raise NotImplementedError("unsupported backingstore")
+ await proxy.Mkdir(path="lower")
+ match self.config.get("rootfstype", "").split("."):
+ case ["none"] | [""]:
+ pass
+ case ["bind"]:
+ await proxy.BindMount(
+ source=str(
+ pathlib.Path(self.config["rootfsdir"]).expanduser()
+ ),
+ target="lower",
+ readonly=configparser.ConfigParser.BOOLEAN_STATES[
+ self.config.get("overlayfs", "false").lower()
+ ],
+ )
+ case ["fuse", subtype]:
+ driver = {
+ "erofs": "erofsfuse",
+ "ext4": "fuse2fs",
+ "squashfs": "squashfuse",
+ }[subtype]
+ device = pathlib.Path(self.config["rootfsdev"])
+ with (
+ await proxy.MountFuse(
+ source=str(device),
+ target="lower",
+ options={
+ "rootmode": "040755",
+ "user_id": 0,
+ "group_id": 0,
+ "allow_other": None,
+ },
+ fstype=subtype,
+ ) as mountres,
+ mountres["fusefd"] as fusefd,
+ ):
+ @linuxnamespaces.run_in_fork.now
+ def _() -> None:
+ close_all_but([0, 1, 2, fusefd])
+ os.execvp(
+ driver,
+ [
+ driver,
+ str(device.expanduser()),
+ f"/dev/fd/{fusefd.fileno()}",
+ ],
+ )
+ case _:
+ raise NotImplementedError("unsupported rootfstype")
+ if self.config.get("overlayfs"):
+ assert self.config.get("rootfstype") not in ("none", None)
+ await proxy.Mkdir(path="work")
+ await proxy.Mkdir(path="upper")
+ await proxy.Mkdir(path="mnt")
+ # Mount to a subdir such that we may chdir("..")
+ await proxy.Mkdir(path="mnt/mnt")
+ await proxy.MountOverlayfs(
+ lower="lower",
+ upper="upper",
+ work="work",
+ target="mnt/mnt",
+ )
+ await proxy.Chdir(path="mnt/mnt")
+ if self.config.get("rootfsextract"):
+ tar = pathlib.Path(self.config["rootfsextract"])
+ extra = {}
+ if tar.suffix in (".tzst", ".tzstd", ".zst", ".zstd"):
+ extra["comptype"] = "zst"
+ with tar.expanduser().open("rb") as tarf:
+ await proxy.ExtractTar(tar=tarf, **extra)
+
+
+class SessionChroot(ChrootBase):
+ """Represent a schroot session. It's name is the basename of the IPC
+ socket.
+ """
+ namespace = "Session"
+
+ def aliases(self) -> set[str]:
+ return set()
+
+ def infodata(self) -> dict[str, str]:
+ data = super().infodata()
+ data["Session Purged"] = "true"
+ data["Type"] = "unshare"
+ # It's a gross lie. It has to exist as a directory or sbuild won't
+ # work.
+ data["Location"] = "/opt"
+ return data
+
+
+def load_config() -> configparser.ConfigParser:
+ config = configparser.ConfigParser(interpolation=None, delimiters=("=",))
+ config.read([UNSCHROOT_CONFIG])
+ return config
+
+
+def scan_chroots() -> dict[str, ChrootBase]:
+ """Scan chroots:
+ * ~/.config/unschroot.ini trumps
+ * ~/.cache/sbuild automatic tar-based
+ * ~/.cache/directory_chroots automatic overlay
+ * sessions
+ """
+ config = load_config()
+
+ chrootmap: dict[str, ChrootBase] = {}
+ chroot: ChrootBase
+ chrootconfig: collections.abc.Mapping[str, str]
+
+ for name, chrootconfig in config.items():
+ if name == "DEFAULT":
+ continue
+ chroot = SourceChroot(name, chrootconfig)
+ chrootmap[name] = chroot
+
+ for loc in (CACHE_SBUILD, CACHE_DIRECTORY_CHROOTS):
+ if loc.is_dir():
+ chroots = []
+ aliases: dict[str, set[str]] = {}
+ for path in loc.iterdir():
+ if path.is_symlink():
+ alias = path.name.split(".", 1)[0]
+ aliases.setdefault(str(path.readlink()), set()).add(alias)
+ else:
+ chroots.append(path)
+ for path in chroots:
+ if loc == CACHE_SBUILD:
+ chrootconfig = {
+ "rootfstype": "none",
+ "rootfsextract": str(path),
+ }
+ else:
+ chrootconfig = {
+ "rootfstype": "directory",
+ "rootfsdir": str(path),
+ }
+ chrootaliases = aliases.get(path.name, set())
+ if aliases:
+ chrootconfig["aliases"] = ",".join(sorted(chrootaliases))
+ chroot = SourceChroot(path.name.split(".", 1)[0], chrootconfig)
+ if chroot.name not in chrootmap:
+ chrootmap[chroot.name] = chroot
+ for chroot in list(chrootmap.values()):
+ for alias in chroot.aliases():
+ if alias not in chrootmap:
+ chrootmap[alias] = chroot
+ rtdir = runtime_path()
+ if rtdir.is_dir():
+ for sock in rtdir.iterdir():
+ if sock.name not in chrootmap:
+ chrootmap[sock.name] = SessionChroot(sock.name)
+ return chrootmap
+
+
+def getpwchroot(
+ user: str | int, rootdir: pathlib.Path = pathlib.Path("/")
+) -> pwd.struct_passwd:
+ """Look up the passwd record for a given user (name or uid) in the passwd
+ database of the given root directory. Deliberately use only the plain files
+ avoiding LDAP and NIS to allow working with a chroot. Similar to getpwnam
+ and getpwduid, raise a KeyError if the user cannot be found.
+ """
+ if isinstance(user, str) and user.isdigit():
+ user = int(user)
+ with (rootdir / "etc/passwd").open("r", encoding="utf8") as passwdf:
+ for line in passwdf:
+ parts = line.split(":")
+ # Skip over invalid records and bad data
+ if len(parts) != 7:
+ continue
+ try:
+ uid = int(parts[2])
+ gid = int(parts[3])
+ except ValueError:
+ continue
+ if uid < 0 or gid < 0 or uid > 0x1FFFFFFF or gid > 0x1FFFFFFF:
+ continue
+ if user == parts[0] if isinstance(user, str) else user == uid:
+ return pwd.struct_passwd((*parts[:2], uid, gid, *parts[4:]))
+ raise KeyError(user)
+
+
+def remap_fds(fds: list[linuxnamespaces.FileDescriptor | None]) -> None:
+ """Change the current processes' file descriptors such that each of the
+ given file descriptors is renumbered to its list index and every other
+ fiel descriptor is closed.
+ """
+ # Renumber fds such that every entry is at least as large as its index.
+ nextfd = max(max(filter(None, fds), default=0) + 1, len(fds))
+ for targetfd, sourcefd in enumerate(list(fds)):
+ if sourcefd is not None and sourcefd < targetfd:
+ fds[targetfd] = sourcefd.dup2(nextfd)
+ nextfd += 1
+ for targetfd, sourcefd in enumerate(fds):
+ if sourcefd is None:
+ try:
+ os.close(targetfd)
+ except OSError as err:
+ if err.errno != errno.EBADFD:
+ raise
+ elif sourcefd != targetfd:
+ sourcefd.dup2(targetfd)
+ os.closerange(len(fds), 0x7FFFFFFF)
+
+
+def close_all_but(
+ fds: typing.Iterable[linuxnamespaces.FileDescriptorLike],
+) -> None:
+ """Close all file descriptors but the ones gives."""
+ nextfd = 0
+ for fd in sorted(map(linuxnamespaces.FileDescriptor, fds)):
+ if nextfd >= fd:
+ nextfd += 1
+ else:
+ os.closerange(nextfd, fd)
+ nextfd = fd + 1
+ if nextfd < 0x7FFFFFFF:
+ os.closerange(nextfd, 0x7FFFFFFF)
+
+
+def clean_directory(
+ directory: pathlib.Path, statres: os.stat_result | None = None
+) -> None:
+ """Recursively delete/umount the given directory."""
+ if statres is None:
+ statres = directory.stat()
+ for entry in list(directory.iterdir()):
+ while True:
+ est = entry.lstat()
+ if statres.st_dev == est.st_dev:
+ break
+ linuxnamespaces.umount(entry, linuxnamespaces.UmountFlags.DETACH)
+ if stat.S_ISDIR(est.st_mode):
+ clean_directory(entry, est)
+ else:
+ entry.unlink()
+ directory.rmdir()
+
+
+class ContainerError(
+ asyncvarlink.TypedVarlinkErrorReply, interface="de.subdivi.unschroot"
+):
+ class Parameters:
+ message: str
+
+
+class ContainerSupervisor(asyncvarlink.VarlinkInterface):
+ name = "de.subdivi.unschroot.Container"
+
+ def __init__(self) -> None:
+ self.terminate_future = asyncio.get_running_loop().create_future()
+ self.cleanup_directories: list[pathlib.Path] = []
+
+ @asyncvarlink.varlinkmethod
+ def Terminate(self) -> None:
+ """Terminate the container."""
+ if not self.terminate_future.done():
+ self.terminate_future.set_result(None)
+ for directory in self.cleanup_directories:
+ clean_directory(directory)
+
+ @asyncvarlink.varlinkmethod
+ def AddCleanup(self, *, directory: str) -> None:
+ """Register the given directory for deletion at termination."""
+ self.cleanup_directories.append(pathlib.Path(directory))
+
+ @asyncvarlink.varlinkmethod
+ def MountTmpfs(
+ self,
+ target: str = "/",
+ options: dict[str, str | None] | None = None,
+ ) -> None:
+ """Mount a tmpfs to the given location. The target path is
+ understood as rooted in the backing store."""
+ target_path = path_below(target)
+ linuxnamespaces.mount(
+ "tmpfs",
+ target_path,
+ "tmpfs",
+ data=options,
+ )
+ if target_path == pathlib.Path("."):
+ os.chdir(os.getcwd())
+
+ @asyncvarlink.varlinkmethod
+ async def ExtractTar(
+ self, tar: asyncvarlink.FileDescriptor, comptype: str = "*"
+ ) -> None:
+ """Extract an opened tar archive into the working directory."""
+ # tarfile is synchronous, but this method is async. Rather than block,
+ # fork a process and wait for it.
+ @linuxnamespaces.async_run_in_fork.now
+ def extraction_process() -> None:
+ mode = "r:zst" if comptype == "zst" else "r|*"
+ with TarFile.open(
+ fileobj=os.fdopen(tar.fileno(), "rb"), mode=mode
+ ) as tarf:
+ for tmem in tarf:
+ tarf.extract(tmem, numeric_owner=True)
+ await extraction_process.wait()
+
+ @asyncvarlink.varlinkmethod
+ def Mkdir(self, path: str, mode: int = 0o755) -> None:
+ """Create a directory with given mode."""
+ path_below(path).mkdir(mode=mode)
+
+ @asyncvarlink.varlinkmethod
+ def WriteFile(self, path: str, content: str, mode: int = 0o644) -> None:
+ """Create a file with given content and mode."""
+ dest = path_below(path)
+ dest.write_text(content, encoding="utf-8")
+ dest.chmod(mode)
+
+ @asyncvarlink.varlinkmethod
+ def BindMount(
+ self, source: str, target: str, readonly: bool = False
+ ) -> None:
+ """Bind mount the source location to the target location. The target
+ location is anchored at the container root.
+ """
+ target_path = path_below(target)
+ linuxnamespaces.bind_mount(source, target_path, readonly=readonly)
+ if target_path == pathlib.Path("."):
+ os.chdir(os.getcwd())
+
+ @asyncvarlink.varlinkmethod
+ def Chdir(self, path: str) -> None:
+ """Change the working directory. The working directory defines the
+ container root filesystem for many other methods.
+ """
+ os.chdir(path)
+
+ @asyncvarlink.varlinkmethod
+ def Unshare(self, namespaces: int) -> None:
+ """Invoke the unshare syscall to create new namespaces."""
+ linuxnamespaces.unshare(linuxnamespaces.CloneFlags(namespaces))
+
+ class ForkResult(typing.TypedDict):
+ pid: int
+ pidfd: asyncvarlink.FileDescriptor
+ socket: asyncvarlink.FileDescriptor
+
+ @asyncvarlink.varlinkmethod
+ async def Fork(self) -> ForkResult:
+ """Create a child process to be configured as container payload. The
+ result includes its pidfd and a varlink socket for communication.
+ """
+ parent_sock, child_sock = socket.socketpair()
+ try:
+ pid = os.fork()
+ if pid == 0:
+ try:
+ asyncio.set_event_loop(None)
+ parent_sock.close()
+
+ @run_here
+ @async_as_sync
+ async def _() -> None:
+ interface = ContainerSupervisor()
+ protocol = asyncvarlink.VarlinkInterfaceServerProtocol(
+ create_registry(interface)
+ )
+ protocol.connection_lost = lambda _: os._exit(0)
+ protocol.eof_received = lambda: os._exit(0)
+ with contextlib.closing(
+ asyncvarlink.VarlinkTransport(
+ asyncio.get_running_loop(),
+ child_sock,
+ child_sock,
+ protocol,
+ )
+ ):
+ await interface.terminate_future
+ except SystemExit as err:
+ os._exit(err.code)
+ except:
+ os._exit(1)
+ assert False, "unreachable"
+ pidfd = os.pidfd_open(pid)
+ except:
+ parent_sock.close()
+ raise
+ finally:
+ child_sock.close()
+ return ContainerSupervisor.ForkResult(
+ pid=pid,
+ pidfd=asyncvarlink.FileDescriptor(pidfd),
+ socket=asyncvarlink.FileDescriptor(parent_sock),
+ )
+
+ @asyncvarlink.varlinkmethod
+ def Newidmaps(self, *, pid: int) -> None:
+ """Perform an identity mapping of uids and gids on the target process
+ with no helpers. The mapping process is assumed to be sufficiently
+ privileged.
+ """
+ identitymap = [linuxnamespaces.IDMapping(0, 0, 65536)]
+ linuxnamespaces.newidmaps(pid, identitymap, identitymap, helper=False)
+
+ @asyncvarlink.varlinkmethod(return_parameter="pidfd")
+ def Exec(
+ self,
+ *,
+ command: list[str],
+ fds: list[asyncvarlink.FileDescriptor | None] | None = None,
+ enable_loopback_if: bool = False,
+ user: str | None = None,
+ cwd: str | None = None,
+ ) -> asyncvarlink.FileDescriptor:
+ """Turn the corrent supervisor process (should be run from a fork)
+ into the container payload. It's actually another fork that ends
+ up doing the exec after this process calling Terminate on this
+ setup process.
+ """
+ if user is None:
+ uid, gid = 0, 0
+ else:
+ try:
+ record = getpwchroot(user, pathlib.Path("."))
+ except KeyError as err:
+ raise ContainerError(
+ message=f"user {user} does not exist"
+ ) from err
+ uid = record.pw_uid
+ gid = record.pw_gid
+
+ # In order for pivot_root to work, the new root must be a mount point,
+ # but as we unshared both a user and mount namespace, the working
+ # directory no longer is a mount point.
+ linuxnamespaces.bind_mount(".", ".", recursive=True)
+ os.chdir("../" + os.path.basename(os.getcwd()))
+ # In order to be able to mount a real proc later, the original /proc
+ # must be visible somewhere inside our container. Temporarily mount it
+ # to /bin.
+ linuxnamespaces.bind_mount("/proc", "bin", recursive=True)
+ linuxnamespaces.mount(
+ "devpts",
+ "dev/pts",
+ "devpts",
+ linuxnamespaces.MountFlags.NOSUID
+ | linuxnamespaces.MountFlags.NOEXEC,
+ "gid=5,mode=0620,ptmxmode=0666",
+ )
+ linuxnamespaces.pivot_root(".", ".")
+ linuxnamespaces.umount(".", linuxnamespaces.UmountFlags.DETACH)
+ if enable_loopback_if:
+ linuxnamespaces.enable_loopback_if()
+
+ # This pipe will be accessible to the container, but it's only used for
+ # synchronization.
+ rpipe, wpipe = linuxnamespaces.FileDescriptor.pipe(inheritable=False)
+
+ @linuxnamespaces.run_in_fork.now
+ def init_process() -> None:
+ wpipe.close()
+
+ # Now that we have forked and thus entered our PID namespace, we
+ # may mount /proc.
+ linuxnamespaces.mount(
+ "proc",
+ "proc",
+ "proc",
+ linuxnamespaces.MountFlags.NOSUID
+ | linuxnamespaces.MountFlags.NODEV
+ | linuxnamespaces.MountFlags.NOEXEC,
+ )
+ # Get rid of the mount that granted us mounting /proc.
+ linuxnamespaces.umount("bin", linuxnamespaces.UmountFlags.DETACH)
+
+ # Drop privileges.
+ if gid != 0:
+ os.setgid(gid)
+ if uid != 0:
+ os.setuid(uid)
+ else:
+ orig_path = os.environ.get("PATH", "")
+ if not orig_path:
+ os.environ["PATH"] = "/usr/sbin:/sbin:/usr/bin:/bin"
+ elif ":/usr/sbin:" not in f":{orig_path}:":
+ os.environ["PATH"] = orig_path + ":/usr/sbin"
+ if cwd:
+ os.chdir(cwd)
+
+ # Wait for parent exit and reparenting.
+ os.read(rpipe, 1)
+ rpipe.close()
+
+ remap_fds(
+ [
+ linuxnamespaces.FileDescriptor(fd) if fd else None
+ for fd in (fds or [])
+ ]
+ )
+
+ # The container may change this, but it's still useful for
+ # robustness when it does not.
+ linuxnamespaces.prctl_set_pdeathsig(signal.SIGKILL)
+ os.execvp(command[0], command)
+
+ # The caller should call Terminate next. Doing so will close the wpipe
+ # and thus allow the child process to proceed.
+ return asyncvarlink.FileDescriptor(os.pidfd_open(init_process.pid))
+
+ @asyncvarlink.varlinkmethod(return_parameter="status")
+ async def Waitpidfd(self, *, pidfd: asyncvarlink.FileDescriptor) -> int:
+ """Wait for the process identified by the given pidfd to exit and
+ returns its exit code.
+ """
+ res = await linuxnamespaces.async_waitpidfd(pidfd.fileno(), os.WEXITED)
+ assert res is not None
+ return res.si_status
+
+ @asyncvarlink.varlinkmethod
+ def Setpriv(
+ self,
+ uid: int | None = None,
+ gid: int | None = None,
+ groups: list[int] | None = None,
+ dumpable: bool | None = None,
+ ) -> None:
+ """Change the uid/gid/supplementary groups and the dumpable flag."""
+ if groups is not None:
+ os.setgroups(groups)
+ if gid is not None:
+ os.setgid(gid)
+ if uid is not None:
+ os.setuid(uid)
+ if dumpable is not None:
+ linuxnamespaces.prctl_set_dumpable(dumpable)
+
+ @asyncvarlink.varlinkmethod
+ def MountSpecials(self) -> None:
+ """Mount /dev without /dev/pts and /sys."""
+ linuxnamespaces.populate_dev("/", ".", pts="defer")
+ linuxnamespaces.populate_sys(
+ "/", ".", namespaces=linuxnamespaces.CloneFlags.NONE
+ )
+
+ @asyncvarlink.varlinkmethod
+ def MountOverlayfs(
+ self,
+ *,
+ lower: str,
+ upper: str,
+ work: str,
+ target: str,
+ ) -> None:
+ """Mount an overlay filesystem."""
+ linuxnamespaces.mount(
+ "overlay",
+ target,
+ "overlay",
+ data={
+ "lowerdir": lower,
+ "upperdir": upper,
+ "workdir": work,
+ "userxattr": None,
+ },
+ )
+
+ @asyncvarlink.varlinkmethod(return_parameter="fusefd")
+ def MountFuse(
+ self,
+ target: str = "/",
+ options: dict[str, str | int | None] | None = None,
+ source: str = "none",
+ fstype: str | None = None,
+ ) -> asyncvarlink.FileDescriptor:
+ """Mount a fuse filesystem and return the controlling file descriptor.
+ """
+ target_path = pathlib.Path(
+ (pathlib.PurePath("/") / target).relative_to("/")
+ )
+ if options is None:
+ options = {}
+ flags = linuxnamespaces.MountFlags.NONE
+ if options.get("ro", True) is None:
+ flags |= linuxnamespaces.MountFlags.RDONLY
+ del options["ro"]
+ fusefd = asyncvarlink.FileDescriptor(os.open("/dev/fuse", os.O_RDWR))
+ try:
+ options["fd"] = fusefd.fileno()
+ linuxnamespaces.mount(
+ source,
+ target_path,
+ "fuse" if fstype is None else f"fuse.{fstype}",
+ flags,
+ data=options,
+ )
+ except:
+ fusefd.close()
+ raise
+ return fusefd
+
+
+def create_registry(
+ *interfaces: asyncvarlink.VarlinkInterface,
+) -> asyncvarlink.VarlinkInterfaceRegistry:
+ registry = asyncvarlink.VarlinkInterfaceRegistry()
+ registry.register_interface(
+ asyncvarlink.serviceinterface.VarlinkServiceInterface(
+ "subdivi.de", "unschroot", "0.0", "url", registry
+ ),
+ )
+ for interface in interfaces:
+ registry.register_interface(interface)
+ return registry
+
+
+def do_info(args: argparse.Namespace) -> None:
+ """Show information about selected chroots"""
+ chrootmap = scan_chroots()
+ chroots: typing.Iterable[ChrootBase]
+ if args.chroot:
+ chroots = [
+ chrootmap[
+ args.chroot.removeprefix("chroot:").removeprefix("session:")
+ ],
+ ]
+ else:
+ chroots = chrootmap.values()
+ sys.stdout.write("\n".join(chroot.infostr() for chroot in chroots))
+
+
+async def supervisor_main(session: str, csock: socket.socket) -> None:
+ # We do double forking, collect secondary children for waitid.
+ linuxnamespaces.prctl_set_child_subreaper()
+
+ async with contextlib.AsyncExitStack() as stack:
+ interface = ContainerSupervisor()
+ registry = create_registry(interface)
+ setup_transport = asyncvarlink.VarlinkTransport(
+ asyncio.get_running_loop(),
+ csock,
+ csock,
+ registry.protocol_factory(),
+ )
+ stack.callback(setup_transport.close)
+ sockpath = runtime_path() / session
+ sockpath.parent.mkdir(mode=0o700, exist_ok=True)
+ server = stack.enter_async_context(
+ await asyncvarlink.create_unix_server(
+ registry.protocol_factory, sockpath
+ ),
+ )
+ stack.callback(server.close)
+ await interface.terminate_future
+
+
+def do_begin_session(args: argparse.Namespace) -> None:
+ """Begin a session; returns the session ID"""
+ session = args.session_name or str(uuid.uuid4())
+ chrootmap = scan_chroots()
+ source = chrootmap[args.chroot.removeprefix("chroot:")]
+ assert isinstance(source, SourceChroot)
+
+ uidmap = [
+ linuxnamespaces.IDAllocation.loadsubid("uid").allocatemap(65536),
+ linuxnamespaces.IDMapping(65536, os.getuid(), 1),
+ ]
+ gidmap = [
+ linuxnamespaces.IDAllocation.loadsubid("gid").allocatemap(65536),
+ linuxnamespaces.IDMapping(65536, os.getgid(), 1),
+ ]
+ # Create an extra socket to avoid ENOENT when connecting.
+ psock, csock = socket.socketpair(
+ socket.AF_UNIX, socket.SOCK_STREAM | socket.SOCK_NONBLOCK
+ )
+
+ @linuxnamespaces.run_in_fork.now
+ def supervisor_process() -> None:
+ # This child is the container supervisor process reachable via IPC.
+ # It will not be part of the PID namespaces, but it'll fork into them.
+ psock.close()
+ close_all_but([2, csock])
+ asyncio.run(supervisor_main(session, csock))
+
+ csock.close()
+
+ @run_here
+ @async_as_sync
+ async def _() -> None:
+ assert isinstance(source, SourceChroot) # assert again for mypy
+ protocol = asyncvarlink.VarlinkClientProtocol()
+ with contextlib.closing(
+ asyncvarlink.VarlinkTransport(
+ asyncio.get_running_loop(), psock, psock, protocol
+ ),
+ ):
+ await asyncio.sleep(0)
+ proxy = protocol.make_proxy(ContainerSupervisor)
+ common_namespaces = (
+ linuxnamespaces.CloneFlags.NEWUSER
+ | linuxnamespaces.CloneFlags.NEWNS
+ )
+ await proxy.Unshare(namespaces=int(common_namespaces))
+ linuxnamespaces.newidmaps(supervisor_process.pid, uidmap, gidmap)
+ tdir: str | None = None
+ if source.config.get("backingstore") == "directory":
+ tdir = tempfile.mkdtemp(prefix="unshroot")
+ await proxy.AddCleanup(directory=tdir)
+ await proxy.Setpriv(uid=0, gid=0, groups=[0], dumpable=True)
+ await source.mount(proxy, tdir)
+ await proxy.MountSpecials()
+ print(session)
+
+
+@async_as_sync
+async def do_run_session(args: argparse.Namespace) -> None:
+ """Run an existing session"""
+ sockpath = runtime_path() / args.chroot
+ transport, protocol = await asyncvarlink.connect_unix_varlink(
+ asyncvarlink.VarlinkClientProtocol,
+ sockpath,
+ )
+ with contextlib.ExitStack() as stack:
+ stack.callback(transport.close)
+ assert isinstance(protocol, asyncvarlink.VarlinkClientProtocol)
+ proxy = protocol.make_proxy(ContainerSupervisor)
+ namespaces = (
+ linuxnamespaces.CloneFlags.NEWUSER
+ | linuxnamespaces.CloneFlags.NEWNS
+ | linuxnamespaces.CloneFlags.NEWIPC
+ | linuxnamespaces.CloneFlags.NEWPID
+ )
+ if args.isolate_network:
+ namespaces |= linuxnamespaces.CloneFlags.NEWNET
+
+ with await proxy.Fork() as proc:
+ vsock = proc["socket"].take()
+ protocol2 = asyncvarlink.VarlinkClientProtocol()
+ stack.callback(
+ asyncvarlink.VarlinkTransport(
+ asyncio.get_running_loop(), vsock, vsock, protocol2
+ ).close
+ )
+ proxy2 = protocol2.make_proxy(ContainerSupervisor)
+ await asyncio.sleep(0) # wait for connection_made
+ await proxy2.Unshare(namespaces=namespaces)
+ await proxy.Newidmaps(pid=proc["pid"])
+ proc2 = stack.enter_context(
+ await proxy2.Exec(
+ command=args.command,
+ fds=[
+ asyncvarlink.FileDescriptor(0),
+ asyncvarlink.FileDescriptor(1),
+ asyncvarlink.FileDescriptor(2),
+ ],
+ enable_loopback_if=args.isolate_network,
+ user=args.user,
+ cwd=args.directory,
+ ),
+ )
+ stack.enter_context(proc2["pidfd"])
+ await protocol2.call(
+ asyncvarlink.VarlinkMethodCall(
+ "de.subdivi.unschroot.Container.Terminate", {}, oneway=True
+ )
+ )
+ sys.exit((await proxy.Waitpidfd(pidfd=proc2["pidfd"]))["status"])
+
+
+@async_as_sync
+async def do_end_session(args: argparse.Namespace) -> None:
+ """End an existing session"""
+ sockpath = runtime_path() / args.chroot
+ transport, protocol = await asyncvarlink.connect_unix_varlink(
+ asyncvarlink.VarlinkClientProtocol,
+ sockpath,
+ )
+ with contextlib.closing(transport):
+ assert isinstance(protocol, asyncvarlink.VarlinkClientProtocol)
+ proxy = protocol.make_proxy(ContainerSupervisor)
+ await proxy.Terminate()
+
+
+def main() -> None:
+ parser = argparse.ArgumentParser()
+ group = parser.add_mutually_exclusive_group(required=True)
+ for comm in ("info", "begin-session", "run-session", "end-session"):
+ func = globals()["do_" + comm.replace("-", "_")]
+ group.add_argument(
+ f"-{comm[0]}",
+ f"--{comm}",
+ dest="subcommand",
+ action="store_const",
+ const=func,
+ help=func.__doc__,
+ )
+ parser.add_argument(
+ "-c",
+ "--chroot",
+ dest="chroot",
+ action="store",
+ help="Use specified chroot",
+ )
+ parser.add_argument("-d", "--directory", action="store")
+ parser.add_argument("-n", "--session-name", action="store", default=None)
+ parser.add_argument("-p", "--preserve-environment", action="store_true")
+ parser.add_argument("-q", "--quiet", action="store_true")
+ parser.add_argument("-u", "--user", action="store", default=os.getlogin())
+ parser.add_argument("--isolate-network", action="store_true")
+ parser.add_argument("command", nargs="*")
+ args = parser.parse_args()
+ assert args.subcommand is not None
+ args.subcommand(args)
+
+
+if __name__ == "__main__":
+ main()
diff --git a/examples/userchroot.py b/examples/userchroot.py
index 2caea33..4006dc6 100755
--- a/examples/userchroot.py
+++ b/examples/userchroot.py
@@ -39,7 +39,7 @@ def main() -> None:
linuxnamespaces.bind_mount(chrootdir, "/mnt", recursive=True)
linuxnamespaces.bind_mount("/proc", "/mnt/proc", recursive=True)
linuxnamespaces.bind_mount("/sys", "/mnt/sys", recursive=True)
- linuxnamespaces.populate_dev("/", "/mnt", pidns=False)
+ linuxnamespaces.populate_dev("/", "/mnt", pts="host")
os.chdir("/mnt")
linuxnamespaces.pivot_root(".", ".")
linuxnamespaces.umount(".", linuxnamespaces.UmountFlags.DETACH)
diff --git a/examples/withallsubuids.py b/examples/withallsubuids.py
index 3bed3bc..c247002 100755
--- a/examples/withallsubuids.py
+++ b/examples/withallsubuids.py
@@ -4,7 +4,9 @@
"""Map all available ranges from /etc/subuid and /etc/subgid as identity and
run a given command with all capabilities (including CAP_DAC_OVERRIDE)
-inherited.
+inherited. This is vaguely similar to running:
+
+ unshare --map-auto --map-user=$(id -u) --map-group=$(id -g) --keep-caps
"""
import os
@@ -19,7 +21,7 @@ import linuxnamespaces
def main() -> None:
# Construct an identity mapping of all available user/group ids
uidmap = [
- linuxnamespaces.IDMapping(os.getuid(), os.getuid(), 1),
+ linuxnamespaces.IDMapping.identity(os.getuid()),
*(
linuxnamespaces.IDMapping(start, start, count)
for start, count
@@ -27,7 +29,7 @@ def main() -> None:
),
]
gidmap = [
- linuxnamespaces.IDMapping(os.getgid(), os.getgid(), 1),
+ linuxnamespaces.IDMapping.identity(os.getgid()),
*(
linuxnamespaces.IDMapping(start, start, count)
for start, count
diff --git a/linuxnamespaces/__init__.py b/linuxnamespaces/__init__.py
index 8c1def3..83358b6 100644
--- a/linuxnamespaces/__init__.py
+++ b/linuxnamespaces/__init__.py
@@ -6,253 +6,61 @@ Python.
"""
import asyncio
-import bisect
import contextlib
-import dataclasses
import errno
import fcntl
+import logging
import os
import pathlib
import socket
import stat
import struct
-import subprocess
import typing
from .filedescriptor import *
+from .idmap import *
from .atlocation import *
from .syscalls import *
-def subidranges(
- kind: typing.Literal["uid", "gid"], login: str | None = None
-) -> typing.Iterator[tuple[int, int]]:
- """Parse a `/etc/sub?id` file for ranges allocated to the given or current
- user. Return all ranges as (start, count) pairs.
- """
- if login is None:
- login = os.getlogin()
- with open(f"/etc/sub{kind}") as filelike:
- for line in filelike:
- parts = line.strip().split(":")
- if parts[0] == login:
- yield (int(parts[1]), int(parts[2]))
-
-
-@dataclasses.dataclass(frozen=True)
-class IDMapping:
- """Represent one range in a user or group id mapping."""
-
- innerstart: int
- outerstart: int
- count: int
-
- def __post_init__(self) -> None:
- if self.outerstart < 0:
- raise ValueError("outerstart must not be negative")
- if self.innerstart < 0:
- raise ValueError("innerstart must not be negative")
- if self.count <= 0:
- raise ValueError("count must be positive")
- if self.outerstart + self.count >= 1 << 64:
- raise ValueError("outerstart + count exceed 64bits")
- if self.innerstart + self.count >= 1 << 64:
- raise ValueError("innerstart + count exceed 64bits")
-
-
-class IDAllocation:
- """This represents a subset of IDs (user or group). It can be used to
- allocate a contiguous range for use with a user namespace.
- """
-
- def __init__(self) -> None:
- self.ranges: list[tuple[int, int]] = []
-
- def add_range(self, start: int, count: int) -> None:
- """Add count ids starting from start to this allocation."""
- if start < 0 or count <= 0:
- raise ValueError("invalid range")
- index = bisect.bisect_right(self.ranges, (start, 0))
- prevrange = None
- if index > 0:
- prevrange = self.ranges[index - 1]
- if prevrange[0] + prevrange[1] > start:
- raise ValueError("attempt to add overlapping range")
- nextrange = None
- if index < len(self.ranges):
- nextrange = self.ranges[index]
- if nextrange[0] < start + count:
- raise ValueError("attempt to add overlapping range")
- if prevrange and prevrange[0] + prevrange[1] == start:
- if nextrange and nextrange[0] == start + count:
- self.ranges[index - 1] = (
- prevrange[0],
- prevrange[1] + count + nextrange[1],
- )
- del self.ranges[index]
- else:
- self.ranges[index - 1] = (prevrange[0], prevrange[1] + count)
- elif nextrange and nextrange[0] == start + count:
- self.ranges[index] = (start, count + nextrange[1])
- else:
- self.ranges.insert(index, (start, count))
-
- @classmethod
- def loadsubid(
- cls, kind: typing.Literal["uid", "gid"], login: str | None = None,
- ) -> "IDAllocation":
- """Load a `/etc/sub?id` file and return ids allocated to the given
- login or current user.
- """
- self = cls()
- for start, count in subidranges(kind, login):
- self.add_range(start, count)
- return self
-
- def find(self, count: int) -> int:
- """Locate count contiguous ids from this allocation. The start of
- the allocation is returned. The allocation object is left unchanged.
- """
- for start, available in self.ranges:
- if available >= count:
- return start
- raise ValueError("could not satisfy allocation request")
-
- def allocate(self, count: int) -> int:
- """Allocate count contiguous ids from this allocation. The start of
- the allocation is returned and the ids are removed from this
- IDAllocation object.
- """
- for index, (start, available) in enumerate(self.ranges):
- if available > count:
- self.ranges[index] = (start + count, available - count)
- return start
- if available == count:
- del self.ranges[index]
- return start
- raise ValueError("could not satisfy allocation request")
-
- def allocatemap(self, count: int, target: int = 0) -> IDMapping:
- """Allocate count contiguous ids from this allocation. An IDMapping
- with its innerstart set to target is returned. The allocation is
- removed from this IDAllocation object.
- """
- return IDMapping(target, self.allocate(count), count)
-
- def reserve(self, start: int, count: int) -> None:
- """Reserve (and remove) the given range from this allocation. If the
- range is not fully contained in this allocation, a ValueError is
- raised.
- """
- if count < 0:
- raise ValueError("negative count")
- index = bisect.bisect_right(self.ranges, (start, float("inf"))) - 1
- if index < 0:
- raise ValueError("range to reserve not found")
- cur_start, cur_count = self.ranges[index]
- assert cur_start <= start
- if cur_start == start:
- # Requested range starts at range boundary
- if cur_count < count:
- raise ValueError("range to reserve not found")
- if cur_count == count:
- # Requested range matches a range exactly
- del self.ranges[index]
- else:
- # Requested range is a head of the matched range
- self.ranges[index] = (start + count, cur_count - count)
- elif cur_start + cur_count >= start + count:
- # Requested range fits into a matched range
- self.ranges[index] = (cur_start, start - cur_start)
- if cur_start + cur_count > start + count:
- # Requested range punches a hole into a matched range
- self.ranges.insert(
- index + 1,
- (start + count, cur_start + cur_count - (start + count)),
- )
- # else: Requested range is a tail of a matched range
- else:
- raise ValueError("range to reserve not found")
-
-
-def newidmap(
- kind: typing.Literal["uid", "gid"],
- pid: int,
- mapping: list[IDMapping],
- helper: bool | None = None,
-) -> None:
- """Apply the given uid or gid mapping to the given process. A positive pid
- identifies a process, other values identify the currently running process.
- Whether setuid binaries newuidmap and newgidmap are used is determined via
- the helper argument. A None value indicate automatic detection of whether
- a helper is required for setting up the given mapping.
- """
-
- assert kind in ("uid", "gid")
- if pid <= 0:
- pid = os.getpid()
- if helper is None:
- # We cannot reliably test whether we have the right EUID and we don't
- # implement checking whether setgroups has been denied either. Please
- # be explicit about the helper choice in such cases.
- helper = len(mapping) > 1 or mapping[0].count > 1
- if helper:
- argv = [f"new{kind}map", str(pid)]
- for idblock in mapping:
- argv.extend(map(str, dataclasses.astuple(idblock)))
- subprocess.check_call(argv)
- else:
- pathlib.Path(f"/proc/{pid}/{kind}_map").write_text(
- "".join(
- "%d %d %d\n" % dataclasses.astuple(idblock)
- for idblock in mapping
- ),
- encoding="ascii",
- )
-
-
-def newuidmap(pid: int, mapping: list[IDMapping], helper: bool = True) -> None:
- """Apply a given uid mapping to the given process. Refer to newidmap for
- details.
- """
- newidmap("uid", pid, mapping, helper)
-
-
-def newgidmap(pid: int, mapping: list[IDMapping], helper: bool = True) -> None:
- """Apply a given gid mapping to the given process. Refer to newidmap for
- details.
- """
- newidmap("gid", pid, mapping, helper)
-
-
-def newidmaps(
- pid: int,
- uidmapping: list[IDMapping],
- gidmapping: list[IDMapping],
- helper: bool = True,
-) -> None:
- """Apply a given uid and gid mapping to the given process. Refer to
- newidmap for details.
- """
- newgidmap(pid, gidmapping, helper)
- newuidmap(pid, uidmapping, helper)
+_logger = logging.getLogger(__name__)
class run_in_fork:
"""Decorator for running the decorated function once in a separate process.
"""
- def __init__(self, function: typing.Callable[[], None]):
- """Fork a new process that will eventually run the given function and
- then exit.
+ def __init__(
+ self, function: typing.Callable[[], None], start: bool = False
+ ):
+ """Fork a new process that will run the given function and then exit.
+ If start is true, run it immediately, otherwise the start or __call__
+ method should be used.
"""
- self.efd = EventFD()
+ self.efd = None if start else EventFD()
self.pid = os.fork()
if self.pid == 0:
- self.efd.read()
- self.efd.close()
- function()
- os._exit(0)
+ code = 0
+ try:
+ if self.efd is not None:
+ self.efd.read()
+ self.efd.close()
+ self.efd = None
+ function()
+ except SystemExit as err:
+ code = err.code
+ except:
+ _logger.exception(
+ "uncaught exception in run_in_fork %r", function
+ )
+ code = 1
+ os._exit(code)
+
+ @classmethod
+ def now(cls, function: typing.Callable[[], None]) -> typing.Self:
+ """Fork a new process that will immediately run the given function and
+ then exit."""
+ return cls(function, start=True)
def start(self) -> None:
"""Start the decorated function. It can only be started once."""
@@ -260,6 +68,7 @@ class run_in_fork:
raise ValueError("this function can only be called once")
self.efd.write(1)
self.efd.close()
+ self.efd = None
def wait(self) -> None:
"""Wait for the process running the decorated function to finish."""
@@ -270,8 +79,11 @@ class run_in_fork:
raise ValueError("something failed")
def __call__(self) -> None:
- """Start the decorated function and wait for its process to finish."""
- self.start()
+ """Start the decorated function if needed and wait for its process to
+ finish.
+ """
+ if self.efd:
+ self.start()
self.wait()
@@ -282,9 +94,12 @@ class async_run_in_fork:
synchronous and it must not access the event loop of the main process.
"""
- def __init__(self, function: typing.Callable[[], None]):
- """Fork a new process that will eventually run the given function and
- then exit.
+ def __init__(
+ self, function: typing.Callable[[], None], start: bool = False
+ ):
+ """Fork a new process that will run the given function and then exit.
+ If start is true, run it immediately, otherwise the start or __call__
+ method should be used.
"""
loop = asyncio.get_running_loop()
with asyncio.get_child_watcher() as watcher:
@@ -293,15 +108,33 @@ class async_run_in_fork:
"active child watcher required for creating a process"
)
self.future = loop.create_future()
- self.efd = EventFD()
+ self.efd = None if start else EventFD()
self.pid = os.fork()
if self.pid == 0:
- self.efd.read()
- self.efd.close()
- function()
- os._exit(0)
+ code = 0
+ try:
+ if self.efd:
+ self.efd.read()
+ self.efd.close()
+ self.efd = None
+ asyncio.set_event_loop(None)
+ function()
+ except SystemExit as err:
+ code = err.code
+ except:
+ _logger.exception(
+ "uncaught exception in run_in_fork %r", function
+ )
+ code = 1
+ os._exit(code)
watcher.add_child_handler(self.pid, self._child_callback)
+ @classmethod
+ def now(cls, function: typing.Callable[[], None]) -> typing.Self:
+ """Fork a new process that will immediately run the given function and
+ then exit."""
+ return cls(function, start=True)
+
def _child_callback(self, pid: int, returncode: int) -> None:
if self.pid != pid:
return
@@ -313,6 +146,7 @@ class async_run_in_fork:
raise ValueError("this function can only be called once")
self.efd.write(1)
self.efd.close()
+ self.efd = None
async def wait(self) -> None:
"""Wait for the process running the decorated function to finish."""
@@ -323,8 +157,11 @@ class async_run_in_fork:
raise ValueError("something failed")
async def __call__(self) -> None:
- """Start the decorated function and wait for its process to finish."""
- self.start()
+ """Start the decorated function if needed and wait for its process to
+ finish.
+ """
+ if self.efd:
+ self.start()
await self.wait()
@@ -348,7 +185,7 @@ def bind_mount(
srcloc = os.fspath(source)
tgtloc = os.fspath(target)
except ValueError:
- otflags = OpenTreeFlags.OPEN_TREE_CLONE
+ otflags = OpenTreeFlags.CLONE
if recursive:
otflags |= OpenTreeFlags.AT_RECURSIVE
with open_tree(source, otflags) as srcfd:
@@ -362,6 +199,17 @@ def bind_mount(
mount(srcloc, tgtloc, None, mflags)
+def get_cgroup(pid: int = -1) -> pathlib.PurePath:
+ """Look up the cgroup that the given pid or the current process belongs
+ to.
+ """
+ return pathlib.PurePath(
+ pathlib.Path(
+ f"/proc/{pid}/cgroup" if pid > 0 else "/proc/self/cgroup"
+ ).read_text().split(":", 2)[2].strip()
+ )
+
+
_P = typing.ParamSpec("_P")
class _ExceptionExitCallback:
@@ -397,7 +245,7 @@ def populate_dev(
newroot: PathConvertible,
*,
fuse: bool = True,
- pidns: bool = True,
+ pts: typing.Literal["defer", "host", "new", "absent"] = "new",
tun: bool = True,
) -> None:
"""Mount a tmpfs to the dev directory beneath newroot and populate it with
@@ -407,6 +255,12 @@ def populate_dev(
Even though a CAP_SYS_ADMIN-enabled process can umount components of the
/dev hierarchy, they they cannot gain privileges in doing so as no
hierarchies are restricted via tmpfs mounts or read-only bind mounts.
+
+ The /dev/fuse and /dev/net/tun devices are optional and can be enabled or
+ disabled as desired. /dev/pts (and /dev/ptmx) can be shared with the host
+ or mounted as a new instance. Since a PID namespace is usually required for
+ mounting a new instance, it can also be deferred to a later manual mount.
+ If not desired, it can be left absent.
"""
origdev = AtLocation(origroot) / "dev"
newdev = AtLocation(newroot) / "dev"
@@ -423,31 +277,31 @@ def populate_dev(
for fn in "null zero full random urandom tty".split():
files.add(fn)
bind_mounts[fn] = exitstack.enter_context(
- open_tree(origdev / fn, OpenTreeFlags.OPEN_TREE_CLONE)
+ open_tree(origdev / fn, OpenTreeFlags.CLONE)
)
if fuse:
files.add("fuse")
bind_mounts["fuse"] = exitstack.enter_context(
- open_tree(origdev / "fuse", OpenTreeFlags.OPEN_TREE_CLONE)
+ open_tree(origdev / "fuse", OpenTreeFlags.CLONE)
)
- if pidns:
- symlinks["ptmx"] = "pts/ptmx"
- else:
+ if pts == "host":
bind_mounts["pts"] = exitstack.enter_context(
open_tree(
origdev / "pts",
- OpenTreeFlags.AT_RECURSIVE | OpenTreeFlags.OPEN_TREE_CLONE,
+ OpenTreeFlags.AT_RECURSIVE | OpenTreeFlags.CLONE,
)
)
files.add("ptmx")
bind_mounts["ptmx"] = exitstack.enter_context(
- open_tree(origdev / "ptmx", OpenTreeFlags.OPEN_TREE_CLONE)
+ open_tree(origdev / "ptmx", OpenTreeFlags.CLONE)
)
+ elif pts != "absent":
+ symlinks["ptmx"] = "pts/ptmx"
if tun:
directories.add("net")
files.add("net/tun")
bind_mounts["net/tun"] = exitstack.enter_context(
- open_tree(origdev / "net/tun", OpenTreeFlags.OPEN_TREE_CLONE)
+ open_tree(origdev / "net/tun", OpenTreeFlags.CLONE)
)
mount(
"devtmpfs",
@@ -472,7 +326,7 @@ def populate_dev(
(newdev / fn).mknod(stat.S_IFREG)
for fn, target in symlinks.items():
(newdev / fn).symlink_to(target)
- if pidns:
+ if pts == "new":
mount(
"devpts",
newdev / "pts",
@@ -514,7 +368,7 @@ def populate_proc(
if namespaces & CloneFlags.NEWNET == CloneFlags.NEWNET:
psn = open_tree(
newproc / "sys/net",
- OpenTreeFlags.OPEN_TREE_CLONE | OpenTreeFlags.AT_RECURSIVE,
+ OpenTreeFlags.CLONE | OpenTreeFlags.AT_RECURSIVE,
)
bind_mount(newproc / "sys", newproc / "sys", True, True)
if psn is not None:
@@ -570,7 +424,7 @@ def populate_sys(
bindfd = exitstack.enter_context(
open_tree(
AtLocation(origroot) / "sys" / source,
- OpenTreeFlags.OPEN_TREE_CLONE | OpenTreeFlags.AT_RECURSIVE,
+ OpenTreeFlags.CLONE | OpenTreeFlags.AT_RECURSIVE,
),
)
if rdonly:
@@ -612,8 +466,13 @@ def unshare_user_idmap(
unshare(flags)
setup_idmaps()
+
def unshare_user_idmap_nohelper(
- uid: int, gid: int, flags: CloneFlags = CloneFlags.NEWUSER
+ uid: int,
+ gid: int,
+ flags: CloneFlags = CloneFlags.NEWUSER,
+ *,
+ proc: AtLocationLike | None = None,
) -> None:
"""Unshare the given namespaces (must include user) and
map the current user and group to the given uid and gid
@@ -622,14 +481,20 @@ def unshare_user_idmap_nohelper(
uidmap = IDMapping(uid, os.getuid(), 1)
gidmap = IDMapping(gid, os.getgid(), 1)
unshare(flags)
- pathlib.Path("/proc/self/setgroups").write_bytes(b"deny")
- newidmaps(-1, [uidmap], [gidmap], False)
+ proc = AtLocation("/proc" if proc is None else proc)
+ (proc / "self/setgroups").write_bytes(b"deny")
+ newidmaps(-1, [uidmap], [gidmap], False, proc=proc)
class _AsyncFilesender:
bs = 65536
- def __init__(self, from_fd: int, to_fd: int, count: int | None = None):
+ def __init__(
+ self,
+ from_fd: FileDescriptor,
+ to_fd: FileDescriptor,
+ count: int | None = None,
+ ):
self.from_fd = from_fd
self.to_fd = to_fd
self.copied = 0
@@ -662,7 +527,12 @@ class _AsyncFilesender:
class _AsyncSplicer:
bs = 65536
- def __init__(self, from_fd: int, to_fd: int, count: int | None = None):
+ def __init__(
+ self,
+ from_fd: FileDescriptor,
+ to_fd: FileDescriptor,
+ count: int | None = None,
+ ):
self.from_fd = from_fd
self.to_fd = to_fd
self.copied = 0
@@ -706,7 +576,12 @@ class _AsyncSplicer:
class _AsyncCopier:
bs = 65536
- def __init__(self, from_fd: int, to_fd: int, count: int | None = None):
+ def __init__(
+ self,
+ from_fd: FileDescriptor,
+ to_fd: FileDescriptor,
+ count: int | None = None,
+ ):
self.from_fd = from_fd
self.to_fd = to_fd
self.buffer = b""
@@ -770,13 +645,17 @@ class _AsyncCopier:
def async_copyfd(
- from_fd: int, to_fd: int, count: int | None = None
+ from_fd: FileDescriptorLike,
+ to_fd: FileDescriptorLike,
+ count: int | None = None,
) -> asyncio.Future[int]:
"""Copy the given number of bytes from the first file descriptor to the
second file descriptor in an asyncio context. Both copies are performed
binary. An efficient implementation is chosen depending on the file type
of file descriptors.
"""
+ from_fd = FileDescriptor(from_fd)
+ to_fd = FileDescriptor(to_fd)
from_mode = os.fstat(from_fd).st_mode
if stat.S_ISREG(from_mode):
return _AsyncFilesender(from_fd, to_fd, count).fut
@@ -786,7 +665,7 @@ def async_copyfd(
class _AsyncPidfdWaiter:
- def __init__(self, pidfd: int, flags: int):
+ def __init__(self, pidfd: FileDescriptor, flags: int):
self.pidfd = pidfd
self.flags = flags
self.loop = asyncio.get_running_loop()
@@ -811,12 +690,12 @@ class _AsyncPidfdWaiter:
def async_waitpidfd(
- pidfd: int, flags: int
+ pidfd: FileDescriptorLike, flags: int
) -> asyncio.Future[os.waitid_result | None]:
"""Asynchronously wait for a process represented as a pidfd. This is an
async variant of waitid(P_PIDFD, pidfd, flags).
"""
- return _AsyncPidfdWaiter(pidfd, flags).fut
+ return _AsyncPidfdWaiter(FileDescriptor(pidfd), flags).fut
def enable_loopback_if() -> None:
diff --git a/linuxnamespaces/atlocation.py b/linuxnamespaces/atlocation.py
index 20d402a..46ac541 100644
--- a/linuxnamespaces/atlocation.py
+++ b/linuxnamespaces/atlocation.py
@@ -9,13 +9,14 @@ code for doing so.
import enum
import errno
+import locale
import os
import os.path
import pathlib
import stat
import typing
-from .filedescriptor import FileDescriptor
+from .filedescriptor import FileDescriptor, FileDescriptorLike, HasFileno
AT_FDCWD = FileDescriptor(-100)
@@ -58,7 +59,7 @@ class AtLocation:
def __new__(
cls,
- thing: typing.Union["AtLocation", int, PathConvertible],
+ thing: typing.Union["AtLocation", FileDescriptorLike, PathConvertible],
location: PathConvertible | None = None,
flags: AtFlags = AtFlags.NONE,
) -> "AtLocation":
@@ -76,13 +77,14 @@ class AtLocation:
)
return thing # Don't copy.
obj = super(AtLocation, cls).__new__(cls)
- if isinstance(thing, int):
+ if not isinstance(thing, FileDescriptor):
+ if isinstance(thing, (int, HasFileno)):
+ thing = FileDescriptor(thing)
+ if isinstance(thing, FileDescriptor):
if thing < 0 and thing != AT_FDCWD:
raise ValueError("fd cannot be negative")
if isinstance(thing, FileDescriptor):
obj.fd = thing
- else:
- obj.fd = FileDescriptor(thing)
if location is None:
obj.location = ""
obj.flags = flags | AtFlags.AT_EMPTY_PATH
@@ -148,7 +150,7 @@ class AtLocation:
them with a slash as separator. The returned AtLocation borrows its fd
if any.
"""
- if isinstance(other, int):
+ if isinstance(other, (int, HasFileno)):
# A an fd is considered an absolute AT_EMPTY_PATH path.
return AtLocation(other)
non_empty_flags = self.flags & ~AtFlags.AT_EMPTY_PATH
@@ -218,7 +220,12 @@ class AtLocation:
"chdir on AtLocation only supports flag AT_EMPTY_PATH"
)
assert self.location
- return os.chdir(self.location)
+ if self.fd == AT_FDCWD:
+ return os.chdir(self.location)
+ with FileDescriptor(
+ self.open(flags=os.O_PATH | os.O_CLOEXEC)
+ ) as dirfd:
+ return os.fchdir(dirfd)
def chmod(self, mode: int) -> None:
"""Wrapper for os.chmod or os.fchmod."""
@@ -417,7 +424,7 @@ class AtLocation:
assert self.location
os.mknod(self.location, mode, device, dir_fd=self.fd_or_none)
- def open(self, flags: int, mode: int = 0o777) -> int:
+ def open(self, flags: int, mode: int = 0o777) -> FileDescriptor:
"""Wrapper for os.open supplying path and dir_fd."""
if self.flags == AtFlags.AT_SYMLINK_NOFOLLOW:
flags |= os.O_NOFOLLOW
@@ -426,7 +433,9 @@ class AtLocation:
"opening an AtLocation only supports flag AT_SYMLINK_NOFOLLOW"
)
assert self.location
- return os.open(self.location, flags, mode, dir_fd=self.fd_or_none)
+ return FileDescriptor(
+ os.open(self.location, flags, mode, dir_fd=self.fd_or_none)
+ )
def readlink(self) -> str:
"""Wrapper for os.readlink supplying path and dir_fd."""
@@ -543,6 +552,26 @@ class AtLocation:
AtLocation(dirfd),
)
+ def write_bytes(self, data: bytes) -> None:
+ """Overwrite the file with the given data bytes."""
+ dataview = memoryview(data)
+ with self.open(os.O_CREAT | os.O_WRONLY) as fd:
+ while dataview:
+ written = os.write(fd, dataview)
+ dataview = dataview[written:]
+
+ def write_text(
+ self, data: str, encoding: str | None = None, errors: str | None = None
+ ) -> None:
+ """Overwrite the file with the given data string."""
+ if encoding is None:
+ encoding = locale.getencoding()
+ if errors is None:
+ databytes = data.encode(encoding=encoding)
+ else:
+ databytes = data.encode(encoding=encoding, errors=errors)
+ self.write_bytes(databytes)
+
def __enter__(self) -> "AtLocation":
"""When used as a context manager, the associated fd will be closed on
scope exit.
@@ -590,4 +619,4 @@ class AtLocation:
return f"{cn}({self.fd}, flags={self.flags!r})"
-AtLocationLike = typing.Union[AtLocation, int, PathConvertible]
+AtLocationLike = typing.Union[AtLocation, FileDescriptorLike, PathConvertible]
diff --git a/linuxnamespaces/filedescriptor.py b/linuxnamespaces/filedescriptor.py
index e4eff9b..ee96a94 100644
--- a/linuxnamespaces/filedescriptor.py
+++ b/linuxnamespaces/filedescriptor.py
@@ -8,11 +8,33 @@ import os
import typing
+# pylint: disable=too-few-public-methods # It's that one method we describe.
+@typing.runtime_checkable
+class HasFileno(typing.Protocol):
+ """A typing protocol representing a file-like object and looking up the
+ underlying file descriptor.
+ """
+
+ def fileno(self) -> int:
+ """Return the underlying file descriptor."""
+
+
+FileDescriptorLike = int | HasFileno
+
+
class FileDescriptor(int):
"""Type tag for integers that represent file descriptors. It also provides
a few very generic file descriptor methods.
"""
+ def __new__(cls, value: FileDescriptorLike) -> typing.Self:
+ """Construct a FileDescriptor from an int or HasFileno."""
+ if isinstance(value, cls):
+ return value # No need to copy, it's immutable.
+ if not isinstance(value, int):
+ value = value.fileno()
+ return super(FileDescriptor, cls).__new__(cls, value)
+
def __enter__(self) -> "FileDescriptor":
"""When used as a context manager, close the file descriptor on scope
exit.
@@ -37,11 +59,18 @@ class FileDescriptor(int):
return FileDescriptor(os.dup(self))
return FileDescriptor(fcntl.fcntl(self, fcntl.F_DUPFD_CLOEXEC, 0))
- def dup2(self, fd2: int, inheritable: bool = True) -> "FileDescriptor":
+ def dup2(
+ self, fd2: FileDescriptorLike, inheritable: bool = True
+ ) -> "FileDescriptor":
"""Duplicate the file to the given file descriptor number."""
- return FileDescriptor(os.dup2(self, fd2, inheritable))
+ return FileDescriptor(os.dup2(self, FileDescriptor(fd2), inheritable))
- def fileno(self) -> int:
+ @classmethod
+ def pidfd_open(cls, pid: int, flags: int = 0) -> typing.Self:
+ """Convenience wrapper for os.pidfd_open."""
+ return cls(os.pidfd_open(pid, flags))
+
+ def fileno(self) -> "FileDescriptor":
"""Return self such that it satisfies the HasFileno protocol."""
return self
diff --git a/linuxnamespaces/idmap.py b/linuxnamespaces/idmap.py
new file mode 100644
index 0000000..a10ec12
--- /dev/null
+++ b/linuxnamespaces/idmap.py
@@ -0,0 +1,250 @@
+# Copyright 2024-2025 Helmut Grohne <helmut@subdivi.de>
+# SPDX-License-Identifier: GPL-3
+
+"""Provide functionalit related to mapping user and group ids in a user
+namespace.
+"""
+
+import bisect
+import dataclasses
+import os
+import subprocess
+import typing
+
+from .atlocation import AtLocation, AtLocationLike
+
+
+def subidranges(
+ kind: typing.Literal["uid", "gid"], login: str | None = None
+) -> typing.Iterator[tuple[int, int]]:
+ """Parse a `/etc/sub?id` file for ranges allocated to the given or current
+ user. Return all ranges as (start, count) pairs.
+ """
+ if login is None:
+ login = os.getlogin()
+ with open(f"/etc/sub{kind}") as filelike:
+ for line in filelike:
+ parts = line.strip().split(":")
+ if parts[0] == login:
+ yield (int(parts[1]), int(parts[2]))
+
+
+@dataclasses.dataclass(frozen=True)
+class IDMapping:
+ """Represent one range in a user or group id mapping."""
+
+ innerstart: int
+ outerstart: int
+ count: int
+
+ def __post_init__(self) -> None:
+ if self.outerstart < 0:
+ raise ValueError("outerstart must not be negative")
+ if self.innerstart < 0:
+ raise ValueError("innerstart must not be negative")
+ if self.count <= 0:
+ raise ValueError("count must be positive")
+ if self.outerstart + self.count >= 1 << 64:
+ raise ValueError("outerstart + count exceed 64bits")
+ if self.innerstart + self.count >= 1 << 64:
+ raise ValueError("innerstart + count exceed 64bits")
+
+ @classmethod
+ def identity(cls, idn: int, count: int = 1) -> typing.Self:
+ """Construct an identity mapping for the given identifier."""
+ return cls(idn, idn, count)
+
+
+class IDAllocation:
+ """This represents a subset of IDs (user or group). It can be used to
+ allocate a contiguous range for use with a user namespace.
+ """
+
+ def __init__(self) -> None:
+ self.ranges: list[tuple[int, int]] = []
+
+ def add_range(self, start: int, count: int) -> None:
+ """Add count ids starting from start to this allocation."""
+ if start < 0 or count <= 0:
+ raise ValueError("invalid range")
+ index = bisect.bisect_right(self.ranges, (start, 0))
+ prevrange = None
+ if index > 0:
+ prevrange = self.ranges[index - 1]
+ if prevrange[0] + prevrange[1] > start:
+ raise ValueError("attempt to add overlapping range")
+ nextrange = None
+ if index < len(self.ranges):
+ nextrange = self.ranges[index]
+ if nextrange[0] < start + count:
+ raise ValueError("attempt to add overlapping range")
+ if prevrange and prevrange[0] + prevrange[1] == start:
+ if nextrange and nextrange[0] == start + count:
+ self.ranges[index - 1] = (
+ prevrange[0],
+ prevrange[1] + count + nextrange[1],
+ )
+ del self.ranges[index]
+ else:
+ self.ranges[index - 1] = (prevrange[0], prevrange[1] + count)
+ elif nextrange and nextrange[0] == start + count:
+ self.ranges[index] = (start, count + nextrange[1])
+ else:
+ self.ranges.insert(index, (start, count))
+
+ @classmethod
+ def loadsubid(
+ cls, kind: typing.Literal["uid", "gid"], login: str | None = None,
+ ) -> "IDAllocation":
+ """Load a `/etc/sub?id` file and return ids allocated to the given
+ login or current user.
+ """
+ self = cls()
+ for start, count in subidranges(kind, login):
+ self.add_range(start, count)
+ return self
+
+ def find(self, count: int) -> int:
+ """Locate count contiguous ids from this allocation. The start of
+ the allocation is returned. The allocation object is left unchanged.
+ """
+ for start, available in self.ranges:
+ if available >= count:
+ return start
+ raise ValueError("could not satisfy allocation request")
+
+ def allocate(self, count: int) -> int:
+ """Allocate count contiguous ids from this allocation. The start of
+ the allocation is returned and the ids are removed from this
+ IDAllocation object.
+ """
+ for index, (start, available) in enumerate(self.ranges):
+ if available > count:
+ self.ranges[index] = (start + count, available - count)
+ return start
+ if available == count:
+ del self.ranges[index]
+ return start
+ raise ValueError("could not satisfy allocation request")
+
+ def allocatemap(self, count: int, target: int = 0) -> IDMapping:
+ """Allocate count contiguous ids from this allocation. An IDMapping
+ with its innerstart set to target is returned. The allocation is
+ removed from this IDAllocation object.
+ """
+ return IDMapping(target, self.allocate(count), count)
+
+ def reserve(self, start: int, count: int) -> None:
+ """Reserve (and remove) the given range from this allocation. If the
+ range is not fully contained in this allocation, a ValueError is
+ raised.
+ """
+ if count < 0:
+ raise ValueError("negative count")
+ index = bisect.bisect_right(self.ranges, (start, float("inf"))) - 1
+ if index < 0:
+ raise ValueError("range to reserve not found")
+ cur_start, cur_count = self.ranges[index]
+ assert cur_start <= start
+ if cur_start == start:
+ # Requested range starts at range boundary
+ if cur_count < count:
+ raise ValueError("range to reserve not found")
+ if cur_count == count:
+ # Requested range matches a range exactly
+ del self.ranges[index]
+ else:
+ # Requested range is a head of the matched range
+ self.ranges[index] = (start + count, cur_count - count)
+ elif cur_start + cur_count >= start + count:
+ # Requested range fits into a matched range
+ self.ranges[index] = (cur_start, start - cur_start)
+ if cur_start + cur_count > start + count:
+ # Requested range punches a hole into a matched range
+ self.ranges.insert(
+ index + 1,
+ (start + count, cur_start + cur_count - (start + count)),
+ )
+ # else: Requested range is a tail of a matched range
+ else:
+ raise ValueError("range to reserve not found")
+
+
+def newidmap(
+ kind: typing.Literal["uid", "gid"],
+ pid: int,
+ mapping: list[IDMapping],
+ helper: bool | None = None,
+ *,
+ proc: AtLocationLike | None = None,
+) -> None:
+ """Apply the given uid or gid mapping to the given process. A positive pid
+ identifies a process, other values identify the currently running process.
+ Whether setuid binaries newuidmap and newgidmap are used is determined via
+ the helper argument. A None value indicate automatic detection of whether
+ a helper is required for setting up the given mapping.
+ """
+
+ assert kind in ("uid", "gid")
+ if pid <= 0:
+ pid = os.getpid()
+ if helper is None:
+ # We cannot reliably test whether we have the right EUID and we don't
+ # implement checking whether setgroups has been denied either. Please
+ # be explicit about the helper choice in such cases.
+ helper = len(mapping) > 1 or mapping[0].count > 1
+ if helper:
+ argv = [f"new{kind}map", str(pid)]
+ for idblock in mapping:
+ argv.extend(map(str, dataclasses.astuple(idblock)))
+ subprocess.check_call(argv)
+ else:
+ proc = AtLocation("/proc" if proc is None else proc)
+ (proc / f"{pid}/{kind}_map").write_text(
+ "".join(
+ "%d %d %d\n" % dataclasses.astuple(idblock)
+ for idblock in mapping
+ ),
+ encoding="ascii",
+ )
+
+
+def newuidmap(
+ pid: int,
+ mapping: list[IDMapping],
+ helper: bool = True,
+ *,
+ proc: AtLocationLike | None = None,
+) -> None:
+ """Apply a given uid mapping to the given process. Refer to newidmap for
+ details.
+ """
+ newidmap("uid", pid, mapping, helper, proc=proc)
+
+
+def newgidmap(
+ pid: int,
+ mapping: list[IDMapping],
+ helper: bool = True,
+ *,
+ proc: AtLocationLike | None = None,
+) -> None:
+ """Apply a given gid mapping to the given process. Refer to newidmap for
+ details.
+ """
+ newidmap("gid", pid, mapping, helper, proc=proc)
+
+
+def newidmaps(
+ pid: int,
+ uidmapping: list[IDMapping],
+ gidmapping: list[IDMapping],
+ helper: bool = True,
+ *,
+ proc: AtLocationLike | None = None,
+) -> None:
+ """Apply a given uid and gid mapping to the given process. Refer to
+ newidmap for details.
+ """
+ newgidmap(pid, gidmapping, helper, proc=proc)
+ newuidmap(pid, uidmapping, helper, proc=proc)
diff --git a/linuxnamespaces/syscalls.py b/linuxnamespaces/syscalls.py
index dd4a332..e9b0e44 100644
--- a/linuxnamespaces/syscalls.py
+++ b/linuxnamespaces/syscalls.py
@@ -12,17 +12,49 @@ import enum
import errno
import logging
import os
+import signal
import typing
+from .filedescriptor import FileDescriptor, FileDescriptorLike
from .atlocation import AtFlags, AtLocation, AtLocationLike, PathConvertible
-logger = logging.getLogger(__name__)
+_logger = logging.getLogger(__name__)
LIBC_SO = ctypes.CDLL(None, use_errno=True)
+if typing.TYPE_CHECKING:
+ CDataType = ctypes._CDataType # pylint: disable=protected-access
+else:
+ CDataType = typing.Any
+
+
+def _pad_fields(
+ fields: list[tuple[str, type[CDataType]]],
+ totalsize: int,
+ name: str,
+ padtype: type[CDataType] = ctypes.c_uint8,
+) -> list[tuple[str, type[CDataType]]]:
+ """Append a padding element to a ctypes.Structure _fields_ sequence such
+ that its total size matches a given value.
+ """
+ fieldssize = sum(ctypes.sizeof(ft) for _, ft in fields)
+ padsize = totalsize - fieldssize
+ if padsize < 0:
+ raise TypeError(
+ f"requested padding to {totalsize}, but fields consume {fieldssize}"
+ )
+ eltsize = ctypes.sizeof(padtype)
+ elements, remainder = divmod(padsize, eltsize)
+ if remainder:
+ raise TypeError(
+ f"padding {padsize} is not a multiple of the element size {eltsize}"
+ )
+ return fields + [(name, padtype * elements)]
+
+
class CloneFlags(enum.IntFlag):
"""This value may be supplied to
* unshare(2) flags
@@ -121,39 +153,38 @@ class MountFlags(enum.IntFlag):
# Map each flag to:
# * The flag value
# * Whether the flag value is negated
- # * Whether the flag must be negated
# * Whether the flag can be negated
__flagstrmap = {
- "acl": (POSIXACL, False, False, False),
- "async": (SYNCHRONOUS, True, False, False),
- "atime": (NOATIME, True, False, True),
- "bind": (BIND, False, False, False),
- "dev": (NODEV, True, False, True),
- "diratime": (NODIRATIME, True, False, True),
- "dirsync": (DIRSYNC, False, False, False),
- "exec": (NOEXEC, True, False, True),
- "iversion": (I_VERSION, False, False, True),
- "lazytime": (LAZYTIME, False, False, True),
- "loud": (SILENT, True, False, False),
- "mand": (MANDLOCK, False, False, True),
- "private": (PRIVATE, False, False, False),
- "rbind": (BIND | REC, False, False, False),
- "relatime": (RELATIME, False, False, True),
- "remount": (REMOUNT, False, False, True),
- "ro": (RDONLY, False, False, False),
- "rprivate": (PRIVATE | REC, False, False, False),
- "rshared": (SHARED | REC, False, False, False),
- "rslave": (SLAVE | REC, False, False, False),
- "runbindable": (UNBINDABLE | REC, False, False, False),
- "rw": (RDONLY, True, False, False),
- "shared": (SHARED, False, False, False),
- "silent": (SILENT, False, False, False),
- "slave": (SLAVE, False, False, False),
- "strictatime": (STRICTATIME, False, False, True),
- "suid": (NOSUID, True, False, True),
- "symfollow": (NOSYMFOLLOW, True, False, True),
- "sync": (SYNCHRONOUS, False, False, False),
- "unbindable": (UNBINDABLE, False, False, False),
+ "acl": (POSIXACL, False, False),
+ "async": (SYNCHRONOUS, True, False),
+ "atime": (NOATIME, True, True),
+ "bind": (BIND, False, False),
+ "dev": (NODEV, True, True),
+ "diratime": (NODIRATIME, True, True),
+ "dirsync": (DIRSYNC, False, False),
+ "exec": (NOEXEC, True, True),
+ "iversion": (I_VERSION, False, True),
+ "lazytime": (LAZYTIME, False, True),
+ "loud": (SILENT, True, False),
+ "mand": (MANDLOCK, False, True),
+ "private": (PRIVATE, False, False),
+ "rbind": (BIND | REC, False, False),
+ "relatime": (RELATIME, False, True),
+ "remount": (REMOUNT, False, True),
+ "ro": (RDONLY, False, False),
+ "rprivate": (PRIVATE | REC, False, False),
+ "rshared": (SHARED | REC, False, False),
+ "rslave": (SLAVE | REC, False, False),
+ "runbindable": (UNBINDABLE | REC, False, False),
+ "rw": (RDONLY, True, False),
+ "shared": (SHARED, False, False),
+ "silent": (SILENT, False, False),
+ "slave": (SLAVE, False, False),
+ "strictatime": (STRICTATIME, False, True),
+ "suid": (NOSUID, True, True),
+ "symfollow": (NOSYMFOLLOW, True, True),
+ "sync": (SYNCHRONOUS, False, False),
+ "unbindable": (UNBINDABLE, False, False),
}
def change(self, flagsstr: str) -> "MountFlags":
@@ -165,19 +196,23 @@ class MountFlags(enum.IntFlag):
for flagstr in flagsstr.split(","):
if not flagstr:
continue
- flag, negated, mustnegate, cannegate = self.__flagstrmap.get(
- flagstr.removeprefix("no"),
- (MountFlags.NONE, False, True, False),
- )
- if mustnegate <= flagstr.startswith("no") <= cannegate:
+ try:
+ flag, negated, cannegate = self.__flagstrmap[
+ flagstr.removeprefix("no")
+ ]
+ except KeyError:
+ raise ValueError(
+ f"not a valid mount flag: {flagstr!r}"
+ ) from None
+ else:
+ if flagstr.startswith("no") > cannegate:
+ raise ValueError(f"not a valid mount flag: {flagstr!r}")
if negated ^ flagstr.startswith("no"):
ret &= ~flag
else:
if flag & MountFlags.PROPAGATION_FLAGS:
ret &= ~MountFlags.PROPAGATION_FLAGS
ret |= flag
- else:
- raise ValueError(f"not a valid mount flag: {flagstr!r}")
return ret
@staticmethod
@@ -221,22 +256,25 @@ class MountFlags(enum.IntFlag):
reverse=True,
)
- def tostr(self) -> str:
- """Attempt to represent the flags in a comma-separated, textual way."""
+ def tonames(self) -> list[str]:
+ """Represent the flags as a sequence of list of flag names."""
if (self & MountFlags.PROPAGATION_FLAGS).bit_count() > 1:
raise ValueError("cannot represent conflicting propagation flags")
parts: list[str] = []
remain = self
for val, text in MountFlags.__flagvals:
- # Older mypy think MountFlags.__flagvals and thus text was of type
- # MountFlags.
+ # Older mypy wrongly deduces the type of MountFlags.__flagvals.
assert isinstance(text, str)
if remain & val == val:
parts.insert(0, text)
remain &= ~val
if remain:
raise ValueError("cannot represent flags {remain}")
- return ",".join(parts)
+ return parts
+
+ def tostr(self) -> str:
+ """Represent the flags in a comma-separated, textual way."""
+ return ",".join(self.tonames())
class MountSetattrFlags(enum.IntFlag):
@@ -328,15 +366,15 @@ class OpenTreeFlags(enum.IntFlag):
"""This value may be supplied to open_tree(2) as flags."""
NONE = 0
- OPEN_TREE_CLONE = 0x1
- OPEN_TREE_CLOEXEC = os.O_CLOEXEC
+ CLONE = 0x1
+ CLOEXEC = os.O_CLOEXEC
AT_SYMLINK_NOFOLLOW = 0x100
AT_NO_AUTOMOUNT = 0x800
AT_EMPTY_PATH = 0x1000
AT_RECURSIVE = 0x8000
ALL_FLAGS = (
- OPEN_TREE_CLONE
- | OPEN_TREE_CLOEXEC
+ CLONE
+ | CLOEXEC
| AT_SYMLINK_NOFOLLOW
| AT_NO_AUTOMOUNT
| AT_EMPTY_PATH
@@ -348,10 +386,47 @@ class PrctlOption(enum.IntEnum):
"""This value may be supplied to prctl(2) as option."""
PR_SET_PDEATHSIG = 1
+ PR_SET_DUMPABLE = 4
PR_SET_CHILD_SUBREAPER = 36
PR_CAP_AMBIENT = 47
+class SignalFDSigInfo(ctypes.Structure):
+ """Information about a received signal by reading from a signalfd(2)."""
+
+ _fields_ = _pad_fields(
+ [
+ ("ssi_signo", ctypes.c_uint32),
+ ("ssi_errno", ctypes.c_int32),
+ ("ssi_code", ctypes.c_int32),
+ ("ssi_pid", ctypes.c_uint32),
+ ("ssi_uid", ctypes.c_uint32),
+ ("ssi_fd", ctypes.c_int32),
+ ("ssi_tid", ctypes.c_uint32),
+ ("ssi_band", ctypes.c_uint32),
+ ("ssi_overrun", ctypes.c_uint32),
+ ("ssi_trapno", ctypes.c_uint32),
+ ("ssi_status", ctypes.c_int32),
+ ("ssi_int", ctypes.c_int32),
+ ("ssi_ptr", ctypes.c_uint64),
+ ("ssi_utime", ctypes.c_uint64),
+ ("ssi_stime", ctypes.c_uint64),
+ ("ssi_addr", ctypes.c_uint64),
+ ("ssi_addr_lsb", ctypes.c_uint16),
+ ],
+ 128,
+ "padding",
+ )
+
+
+class SignalFDFlags(enum.IntFlag):
+ """This value may be supplied as flags to signalfd(2)."""
+
+ NONE = 0
+ CLOEXEC = os.O_CLOEXEC
+ NONBLOCK = os.O_NONBLOCK
+
+
class UmountFlags(enum.IntFlag):
"""This value may be supplied to umount2(2) as flags."""
@@ -368,9 +443,9 @@ def call_libc(funcname: str, *args: typing.Any) -> int:
the function returns an integer that is non-negative on success. On
failure, an OSError with errno is raised.
"""
- logger.debug("calling libc function %s%r", funcname, args)
+ _logger.debug("calling libc function %s%r", funcname, args)
ret: int = LIBC_SO[funcname](*args)
- logger.debug("%s returned %d", funcname, ret)
+ _logger.debug("%s returned %d", funcname, ret)
if ret < 0:
err = ctypes.get_errno()
raise OSError(
@@ -428,7 +503,7 @@ class EventFD:
) -> None:
if flags & ~EventFDFlags.ALL_FLAGS:
raise ValueError("invalid flags for eventfd")
- self.fd = os.eventfd(initval, int(flags))
+ self.fd = FileDescriptor(os.eventfd(initval, int(flags)))
def read(self) -> int:
"""Decrease the value of the eventfd using eventfd_read."""
@@ -471,7 +546,7 @@ class EventFD:
raise ValueError("attempt to read from closed eventfd")
os.eventfd_write(self.fd, value)
- def fileno(self) -> int:
+ def fileno(self) -> FileDescriptor:
"""Return the underlying file descriptor."""
return self.fd
@@ -481,7 +556,7 @@ class EventFD:
try:
os.close(self.fd)
finally:
- self.fd = -1
+ self.fd = FileDescriptor(-1)
__del__ = close
@@ -489,7 +564,7 @@ class EventFD:
"""Return True unless the eventfd is closed."""
return self.fd >= 0
- def __enter__(self) -> "EventFD":
+ def __enter__(self) -> typing.Self:
"""When used as a context manager, the EventFD is closed on scope exit.
"""
return self
@@ -508,7 +583,7 @@ def mount(
target: PathConvertible,
filesystemtype: str | None,
flags: MountFlags = MountFlags.NONE,
- data: str | list[str] | None = None,
+ data: str | list[str] | dict[str, str | int | None] | None = None,
) -> None:
"""Python wrapper for mount(2)."""
if (flags & MountFlags.PROPAGATION_FLAGS).bit_count() > 1:
@@ -520,6 +595,11 @@ def mount(
)
):
raise ValueError("invalid flags for mount")
+ if isinstance(data, dict):
+ data = [
+ key if value is None else f"{key}={value}"
+ for key, value in data.items()
+ ]
if isinstance(data, list):
if any("," in s for s in data):
raise ValueError("data elements must not contain a comma")
@@ -540,7 +620,7 @@ def mount_setattr(
attr_set: MountAttrFlags = MountAttrFlags.NONE,
attr_clr: MountAttrFlags = MountAttrFlags.NONE,
propagation: int = 0,
- userns_fd: int = -1,
+ userns_fd: FileDescriptorLike = -1,
) -> None:
"""Python wrapper for mount_setattr(2)."""
filesystem = AtLocation(filesystem)
@@ -549,6 +629,8 @@ def mount_setattr(
flags |= MountSetattrFlags.AT_RECURSIVE
if attr_clr & MountAttrFlags.IDMAP:
raise ValueError("cannot clear the MOUNT_ATTR_IDMAP flag")
+ if not isinstance(userns_fd, int):
+ userns_fd = userns_fd.fileno()
attr = MountAttr(attr_set, attr_clr, propagation, userns_fd)
call_libc(
"mount_setattr",
@@ -613,7 +695,7 @@ def open_tree(
raise ValueError("invalid flags for open_tree")
if (
flags & OpenTreeFlags.AT_RECURSIVE
- and not flags & OpenTreeFlags.OPEN_TREE_CLONE
+ and not flags & OpenTreeFlags.CLONE
):
raise ValueError("invalid flags for open_tree")
if source.flags & AtFlags.AT_SYMLINK_NOFOLLOW:
@@ -670,6 +752,11 @@ def prctl_set_child_subreaper(enabled: bool = True) -> None:
prctl(PrctlOption.PR_SET_CHILD_SUBREAPER, int(enabled))
+def prctl_set_dumpable(enabled: bool) -> None:
+ """Set or clear the dumpable flag."""
+ prctl(PrctlOption.PR_SET_DUMPABLE, int(enabled))
+
+
def prctl_set_pdeathsig(signum: int) -> None:
"""Set the parent-death signal of the calling process."""
if signum < 0:
@@ -686,6 +773,163 @@ def setns(fd: int, nstype: CloneFlags = CloneFlags.NONE) -> None:
call_libc("setns", fd, int(nstype))
+class SignalFD:
+ """Represent a file descriptor returned from signalfd(2)."""
+
+ _ReadIterFut = asyncio.Future[tuple[list[SignalFDSigInfo], "_ReadIterFut"]]
+
+ def __init__(
+ self,
+ sigmask: typing.Iterable[signal.Signals],
+ flags: SignalFDFlags = SignalFDFlags.NONE,
+ ):
+ self.fd = SignalFD.__signalfd(FileDescriptor(-1), sigmask, flags)
+
+ @staticmethod
+ def __signalfd(
+ fd: FileDescriptor,
+ sigmask: typing.Iterable[signal.Signals],
+ flags: SignalFDFlags,
+ ) -> FileDescriptor:
+ """Python wrapper for signalfd(2)."""
+ bitsperlong = 8 * ctypes.sizeof(ctypes.c_ulong)
+ nval = 64 // bitsperlong
+ mask = [0] * nval
+ for sig in sigmask:
+ sigval = int(sig) - 1
+ mask[sigval // bitsperlong] |= 1 << (sigval % bitsperlong)
+ csigmask = (ctypes.c_ulong * nval)(*mask)
+ return FileDescriptor(call_libc("signalfd", fd, csigmask, int(flags)))
+
+ def readv(self, count: int) -> list[SignalFDSigInfo]:
+ """Read up to count signals from the signalfd."""
+ if count < 0:
+ raise ValueError("read count must be positive")
+ if self.fd < 0:
+ raise ValueError("attempt to read from closed signalfd")
+ res = [SignalFDSigInfo() for _ in range(count)]
+ cnt = os.readv(self.fd, res)
+ cnt //= ctypes.sizeof(SignalFDSigInfo)
+ return res[:cnt]
+
+ def read(self) -> SignalFDSigInfo:
+ """Read one signal from the signalfd."""
+ res = self.readv(1)
+ return res[0]
+
+ def __handle_read(
+ self, fd: int, fut: asyncio.Future[SignalFDSigInfo]
+ ) -> None:
+ try:
+ if fd != self.fd:
+ raise RuntimeError("SignalFD file descriptor changed")
+ try:
+ result = self.read()
+ except OSError as err:
+ if err.errno == errno.EAGAIN:
+ return
+ raise
+ except Exception as exc:
+ fut.get_loop().remove_reader(fd)
+ fut.set_exception(exc)
+ else:
+ fut.get_loop().remove_reader(fd)
+ fut.set_result(result)
+
+ def aread(self) -> typing.Awaitable[SignalFDSigInfo]:
+ """Asynchronously read one signal from the signalfd."""
+ if self.fd < 0:
+ raise ValueError("attempt to read from closed signalfd")
+ loop = asyncio.get_running_loop()
+ fut: asyncio.Future[SignalFDSigInfo] = loop.create_future()
+ loop.add_reader(self.fd, self.__handle_read, self.fd, fut)
+ return fut
+
+ def __handle_readiter(self, fd: int, fut: _ReadIterFut) -> None:
+ loop = fut.get_loop()
+ try:
+ if fd != self.fd:
+ raise RuntimeError("SignalFD file descriptor changed")
+ try:
+ # Attempt to read a full page worth of queued signals.
+ results = self.readv(32)
+ except OSError as err:
+ if err.errno == errno.EAGAIN:
+ return
+ raise
+ except Exception as exc:
+ loop.remove_reader(fd)
+ fut.set_exception(exc)
+ else:
+ nextfut: SignalFD._ReadIterFut = loop.create_future()
+ loop.add_reader(fd, self.__handle_readiter, self.fd, nextfut)
+ fut.set_result((results, nextfut))
+
+ async def areaditer(self) -> typing.AsyncIterator[SignalFDSigInfo]:
+ """Asynchronously read signals from the signalfd forever."""
+ if self.fd < 0:
+ raise ValueError("attempt to read from closed signalfd")
+ loop = asyncio.get_running_loop()
+ fut: SignalFD._ReadIterFut = loop.create_future()
+ loop.add_reader(self.fd, self.__handle_readiter, self.fd, fut)
+ while True:
+ results, fut = await fut
+ for result in results:
+ yield result
+
+ def fileno(self) -> FileDescriptor:
+ """Return the underlying file descriptor."""
+ return self.fd
+
+ def close(self) -> None:
+ """Close the underlying file descriptor."""
+ if self.fd >= 0:
+ try:
+ os.close(self.fd)
+ finally:
+ self.fd = FileDescriptor(-1)
+
+ __del__ = close
+
+ def __bool__(self) -> bool:
+ """Return True unless the signalfd is closed."""
+ return self.fd >= 0
+
+ def __enter__(self) -> typing.Self:
+ """When used as a context manager, the SignalFD is closed on scope
+ exit.
+ """
+ return self
+
+ def __exit__(
+ self,
+ exc_type: typing.Any,
+ exc_value: typing.Any,
+ traceback: typing.Any,
+ ) -> None:
+ self.close()
+
+
+class _SigqueueSigval(ctypes.Union):
+ _fields_ = [
+ ("sival_int", ctypes.c_int),
+ ("sival_ptr", ctypes.c_void_p),
+ ]
+
+
+def sigqueue(
+ pid: int, sig: signal.Signals, value: int | ctypes.c_void_p | None = None
+) -> None:
+ """Python wrapper for sigqueue(2)."""
+ if value is None:
+ sigval = _SigqueueSigval()
+ elif isinstance(value, int):
+ sigval = _SigqueueSigval(sival_int=value)
+ else:
+ sigval = _SigqueueSigval(sival_ptr=value)
+ call_libc("sigqueue", pid, int(sig), sigval)
+
+
def umount(
path: PathConvertible, flags: UmountFlags = UmountFlags.NONE
) -> None:
diff --git a/linuxnamespaces/systemd/__init__.py b/linuxnamespaces/systemd/__init__.py
index d8e7f86..84cb135 100644
--- a/linuxnamespaces/systemd/__init__.py
+++ b/linuxnamespaces/systemd/__init__.py
@@ -8,6 +8,48 @@ import sys
import typing
+_DBUS_INTEGER_BOUNDS = (
+ ("q", 0, 1 << 16),
+ ("n", -(1 << 15), 1 << 15),
+ ("u", 0, 1 << 32),
+ ("i", -(1 << 31), 1 << 31),
+ ("t", 0, 1 << 64),
+ ("x", -(1 << 63), 1 << 63),
+)
+
+
+def _guess_dbus_type(value: typing.Any) -> typing.Iterator[str]:
+ """Guess the type of a Python value in dbus. May yield multiple candidates.
+ """
+ if isinstance(value, bool):
+ yield "b"
+ elif isinstance(value, str):
+ yield "s"
+ elif isinstance(value, int):
+ found = False
+ for guess, low, high in _DBUS_INTEGER_BOUNDS:
+ if low <= value < high:
+ found = True
+ yield guess
+ if not found:
+ raise ValueError("integer out of bounds for dbus")
+ elif isinstance(value, float):
+ yield "d"
+ elif isinstance(value, list):
+ if not value:
+ raise ValueError("cannot guess dbus type for empty list")
+ types = [list(_guess_dbus_type(v)) for v in value]
+ found = False
+ for guess in types[0]:
+ if all(guess in guesses for guesses in types):
+ found = True
+ yield "a" + guess
+ if not found:
+ raise ValueError("could not determine homogeneous type of list")
+ else:
+ raise ValueError("failed to guess dbus type")
+
+
async def start_transient_unit(
unitname: str,
pids: list[int] | None = None,
@@ -20,14 +62,13 @@ async def start_transient_unit(
pids = [os.getpid()]
dbus_properties.append(("PIDs", ("au", pids)))
for key, value in ({} if properties is None else properties).items():
- if isinstance(value, bool):
- dbus_properties.append((key, ("b", value)))
- elif isinstance(value, str):
- dbus_properties.append((key, ("s", value)))
- else:
+ try:
+ guess = next(_guess_dbus_type(value))
+ except ValueError as err:
raise ValueError(
f"cannot infer dbus type for property {key} value"
- )
+ ) from err
+ dbus_properties.append((key, (guess, value)))
if dbusdriver in ("auto", "jeepney"):
try:
from .jeepney import start_transient_unit as jeepney_impl
diff --git a/linuxnamespaces/systemd/dbussy.py b/linuxnamespaces/systemd/dbussy.py
index 77410df..60b74fc 100644
--- a/linuxnamespaces/systemd/dbussy.py
+++ b/linuxnamespaces/systemd/dbussy.py
@@ -52,6 +52,7 @@ class SystemdJobWaiter:
try:
return self.jobs_removed[job]
except KeyError:
+ self.jobs_removed.clear()
return await asyncio.wait_for(self.job_done, timeout)
def __exit__(self, *exc_info: typing.Any) -> None:
@@ -72,10 +73,15 @@ async def start_transient_unit(
"""
bus = await ravel.session_bus_async()
with SystemdJobWaiter(bus) as wait:
+ systemd1 = bus["org.freedesktop.systemd1"]["/org/freedesktop/systemd1"]
result = await wait(
- bus["org.freedesktop.systemd1"]["/org/freedesktop/systemd1"]
- .get_interface("org.freedesktop.systemd1.Manager")
- .StartTransientUnit(unitname, "fail", properties, [])[0],
+ (
+ await (
+ await systemd1.get_async_interface(
+ "org.freedesktop.systemd1.Manager"
+ )
+ ).StartTransientUnit(unitname, "fail", properties, [])
+ )[0],
)
if result != "done":
raise OSError("StartTransientUnit failed: " + result)
diff --git a/linuxnamespaces/tarutils.py b/linuxnamespaces/tarutils.py
index 6285d5a..5ad60cd 100644
--- a/linuxnamespaces/tarutils.py
+++ b/linuxnamespaces/tarutils.py
@@ -31,8 +31,16 @@ class ZstdTarFile(tarfile.TarFile):
name: str,
mode: typing.Literal["r", "w", "x"] = "r",
fileobj: typing.BinaryIO | None = None,
+ *,
+ compresslevel: int | None = None,
+ threads: int | None = None,
**kwargs: typing.Any,
) -> tarfile.TarFile:
+ """Open a zstd compressed tar archive with the given name for readin or
+ writing. Appending is not supported. The class allows customizing the
+ compression level and the compression concurrency (default parallel)
+ while decompression ignores those arguments.
+ """
if mode not in ("r", "w", "x"):
raise ValueError("mode must be 'r', 'w' or 'x'")
openobj: str | typing.BinaryIO = name if fileobj is None else fileobj
@@ -45,11 +53,21 @@ class ZstdTarFile(tarfile.TarFile):
if mode == "r":
zfobj = zstandard.open(openobj, "rb")
else:
- zfobj = zstandard.open(
- openobj,
- mode + "b",
- cctx=zstandard.ZstdCompressor(write_checksum=True, threads=-1),
- )
+ if threads is None:
+ threads = -1
+ if compresslevel is not None:
+ if compresslevel > 22:
+ raise ValueError(
+ "invalid compression level {compresslevel}"
+ )
+ cctx = zstandard.ZstdCompressor(
+ write_checksum=True, threads=threads, level=compresslevel
+ )
+ else:
+ cctx = zstandard.ZstdCompressor(
+ write_checksum=True, threads=threads
+ )
+ zfobj = zstandard.open(openobj, mode + "b", cctx=cctx)
try:
tarobj = cls.taropen(name, mode, zfobj, **kwargs)
except (OSError, EOFError, zstandard.ZstdError) as exc:
diff --git a/pyproject.toml b/pyproject.toml
index 9855e72..9e90387 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -19,11 +19,22 @@ dbussy = ["dbussy"]
zstandard = ["zstandard"]
test = ["pytest", "pytest-forked", "pytest-subtests"]
+[build-system]
+requires = ["flit_core >=3.8"]
+build-backend = "flit_core.buildapi"
+
[tool.black]
line-length = 79
+[tool.flit.module]
+name = "linuxnamespaces"
+
+[tool.flit.sdist]
+include = ["conftest.py", "examples/", "README.md", "tests/"]
+
[tool.mypy]
strict = true
[tool.pylint]
max-line-length=79
+good-names = ["fd"]
diff --git a/tests/test_atlocation.py b/tests/test_atlocation.py
index 5d7286a..b107975 100644
--- a/tests/test_atlocation.py
+++ b/tests/test_atlocation.py
@@ -95,13 +95,8 @@ class AtLocationTest(unittest.TestCase):
if filetype == "symlink" and loctype != "emptypath":
follow_symlinks_values.append(False)
for follow_symlinks in follow_symlinks_values:
- # Mypy fails to see that loctype and filetype really are
- # literals rather than arbitrary strings.
atlocctx = self.create(
- loctype, # type: ignore[arg-type]
- filetype, # type: ignore[arg-type]
- "X",
- follow_symlinks,
+ loctype, filetype, "X", follow_symlinks
)
yield (filetype, atlocctx)
diff --git a/tests/test_simple.py b/tests/test_simple.py
index eb03384..114b922 100644
--- a/tests/test_simple.py
+++ b/tests/test_simple.py
@@ -5,6 +5,7 @@ import asyncio
import errno
import os
import pathlib
+import signal
import socket
import unittest
@@ -92,6 +93,26 @@ class AsnycioTest(unittest.IsolatedAsyncioTestCase):
efd.write()
self.assertEqual(await fut, 1)
+ async def test_signalfd(self) -> None:
+ testsig = signal.SIGUSR1
+ sfd = linuxnamespaces.SignalFD(
+ [testsig], linuxnamespaces.SignalFDFlags.NONBLOCK
+ )
+ self.addCleanup(sfd.close)
+ oldmask = signal.pthread_sigmask(signal.SIG_SETMASK, [testsig])
+ self.addCleanup(signal.pthread_sigmask, signal.SIG_SETMASK, oldmask)
+ fut = asyncio.ensure_future(sfd.aread())
+ await asyncio.sleep(0.000001) # Let the loop run
+ self.assertFalse(fut.done())
+ sigval = 123456789
+ mypid = os.getpid()
+ linuxnamespaces.sigqueue(mypid, testsig, sigval)
+ siginfo = await fut
+ self.assertEqual(siginfo.ssi_signo, testsig)
+ self.assertEqual(siginfo.ssi_pid, mypid)
+ self.assertEqual(siginfo.ssi_uid, os.getuid())
+ self.assertEqual(siginfo.ssi_int, sigval)
+
async def test_run_in_fork(self) -> None:
with linuxnamespaces.EventFD(
0, linuxnamespaces.EventFDFlags.NONBLOCK
@@ -105,9 +126,13 @@ class AsnycioTest(unittest.IsolatedAsyncioTestCase):
async def test_copyfd_file_sock(self) -> None:
sock1, sock2 = socket.socketpair()
- with sock1, sock2, linuxnamespaces.FileDescriptor(
- os.open("/etc/passwd", os.O_RDONLY)
- ) as rfd:
+ with (
+ sock1,
+ sock2,
+ linuxnamespaces.FileDescriptor(
+ os.open("/etc/passwd", os.O_RDONLY)
+ ) as rfd,
+ ):
fut = asyncio.ensure_future(
linuxnamespaces.async_copyfd(rfd, sock1.fileno(), 999)
)
@@ -118,9 +143,13 @@ class AsnycioTest(unittest.IsolatedAsyncioTestCase):
async def test_copyfd_file_pipe(self) -> None:
rfdp, wfdp = linuxnamespaces.FileDescriptor.pipe(blocking=False)
- with rfdp, wfdp, linuxnamespaces.FileDescriptor(
- os.open("/etc/passwd", os.O_RDONLY)
- ) as rfd:
+ with (
+ rfdp,
+ wfdp,
+ linuxnamespaces.FileDescriptor(
+ os.open("/etc/passwd", os.O_RDONLY)
+ ) as rfd,
+ ):
fut = asyncio.ensure_future(
linuxnamespaces.async_copyfd(rfd, wfdp, 999)
)
@@ -185,7 +214,7 @@ class UnshareTest(unittest.TestCase):
| linuxnamespaces.CloneFlags.NEWPID
)
linuxnamespaces.newuidmap(-1, [idmap], False)
- @linuxnamespaces.run_in_fork
+ @linuxnamespaces.run_in_fork.now
def setup() -> None:
self.assertEqual(os.getpid(), 1)
linuxnamespaces.mount("proc", "/proc", "proc")
@@ -210,7 +239,7 @@ class UnshareTest(unittest.TestCase):
)
linuxnamespaces.mount("tmpfs", "/mnt", "tmpfs", data="mode=0755")
os.mkdir("/mnt/dev")
- linuxnamespaces.populate_dev("/", "/mnt", pidns=False)
+ linuxnamespaces.populate_dev("/", "/mnt", pts="host")
self.assertTrue(os.access("/mnt/dev/null", os.W_OK))
pathlib.Path("/mnt/dev/null").write_text("")
@@ -278,7 +307,7 @@ class UnshareIdmapTest(unittest.TestCase):
os.setregid(0, 0)
linuxnamespaces.mount("tmpfs", "/mnt", "tmpfs")
os.mkdir("/mnt/dev")
- @linuxnamespaces.run_in_fork
+ @linuxnamespaces.run_in_fork.now
def test() -> None:
linuxnamespaces.populate_dev("/", "/mnt")
test()