summaryrefslogtreecommitdiff
path: root/examples/unschroot.py
diff options
context:
space:
mode:
authorHelmut Grohne <helmut@subdivi.de>2024-06-08 17:26:32 +0200
committerHelmut Grohne <helmut@subdivi.de>2024-06-08 17:26:32 +0200
commita1cc59818088bae661ecead3a3d769c7a9970d2d (patch)
treeb49618fba48130fe9522d24ba567063ff48b7f3f /examples/unschroot.py
parent50653db0e915eeae9a26e23cf759e5a1a03ab554 (diff)
downloadpython-linuxnamespaces-a1cc59818088bae661ecead3a3d769c7a9970d2d.tar.gz
add example "unschroot.py"
While this mostly provides the schroot API and adds its own semantics around ~/.cache/unschroot, please do not consider examples a stable interface but a room for experimentation and incompatible changes.
Diffstat (limited to 'examples/unschroot.py')
-rwxr-xr-xexamples/unschroot.py288
1 files changed, 288 insertions, 0 deletions
diff --git a/examples/unschroot.py b/examples/unschroot.py
new file mode 100755
index 0000000..5d847f4
--- /dev/null
+++ b/examples/unschroot.py
@@ -0,0 +1,288 @@
+#!/usr/bin/python3
+# Copyright 2024 Helmut Grohne <helmut@subdivi.de>
+# SPDX-License-Identifier: GPL-3
+
+"""Emulate schroot using namespaces sufficiently well that sbuild can deal with
+it but not any better. It assumes that ~/.cache/sbuild contains tars suitable
+for sbuild --chroot-mode=unshare. Additionally, those tars are expected to
+contain the non-essential passwd package. The actual sessions are stored in
+~/.cache/unschroot. For using it with sbuild, your sbuildrc should contain:
+
+ $chroot_mode = "schroot";
+ $schroot = "/path/to/unschroot";
+"""
+
+
+import argparse
+import functools
+import grp
+import itertools
+import os
+import pathlib
+import pwd
+import shutil
+import signal
+import socket
+import sys
+import tempfile
+import typing
+
+if __file__.split("/")[-2:-1] == ["examples"]:
+ sys.path.insert(0, "/".join(__file__.split("/")[:-2]))
+
+import linuxnamespaces
+import linuxnamespaces.tarutils
+
+
+class TarFile(
+ linuxnamespaces.tarutils.ZstdTarFile, linuxnamespaces.tarutils.XAttrTarFile
+):
+ pass
+
+
+class Chroot:
+ # Ignore $HOME as sbuild sets to something invalid
+ home = pathlib.Path(pwd.getpwuid(os.getuid()).pw_dir)
+ cache_sbuild = home / ".cache/sbuild"
+ cache_unschroot = home / ".cache/unschroot"
+
+ def __init__(self, path: pathlib.Path, aliases: set[str] | None = None):
+ self.path = path
+ self.aliases = set() if aliases is None else aliases
+
+ @functools.cached_property
+ def namespace(self) -> str:
+ if self.path.is_file():
+ return "Chroot"
+ if self.path.is_dir():
+ return "Session"
+ raise ValueError("invalid chroot object")
+
+ @functools.cached_property
+ def name(self) -> str:
+ suffix = "-sbuild" if self.namespace == "Chroot" else ""
+ return self.path.name.split(".", 1)[0] + suffix
+
+ def infostr(self) -> str:
+ lines = [
+ f"--- {self.namespace} ---",
+ f"Name {self.name}",
+ ]
+ if self.namespace == "Chroot":
+ lines.extend(["Type file", f"File {self.path}"])
+ if self.namespace == "Session":
+ lines.append(f"Location {self.path}")
+ if self.aliases:
+ lines.append("Aliases " + " ".join(sorted(self.aliases)))
+ return "".join(map("%s\n".__mod__, lines))
+
+ @classmethod
+ def searchchroot(cls, name: str) -> "Chroot":
+ name = name.removeprefix("chroot:")
+ name = name.removesuffix("-sbuild")
+ for path in cls.cache_sbuild.iterdir():
+ if path.name.startswith(name + ".t"):
+ return cls(path)
+ raise KeyError(name)
+
+ @classmethod
+ def searchsession(cls, name: str) -> "Chroot":
+ name = name.removeprefix("session:")
+ path = cls.cache_unschroot / name
+ if not path.is_dir():
+ raise KeyError(name)
+ return cls(path)
+
+ @classmethod
+ def newsession(cls) -> "Chroot":
+ cls.cache_unschroot.mkdir(parents=True, exist_ok=True)
+ return Chroot(
+ pathlib.Path(
+ tempfile.mkdtemp(prefix="chroot", dir=cls.cache_unschroot)
+ ),
+ )
+
+ @classmethod
+ def scan_sbuild(cls) -> typing.Iterator["Chroot"]:
+ if cls.cache_sbuild.is_dir():
+ chroots = []
+ aliases: dict[str, set[str]] = {}
+ for path in cls.cache_sbuild.iterdir():
+ if path.is_symlink():
+ alias = path.name.split(".", 1)[0] + "-sbuild"
+ aliases.setdefault(str(path.readlink()), set()).add(alias)
+ elif path.is_file():
+ chroots.append(path)
+ for path in chroots:
+ yield cls(path, aliases.get(path.name, set()))
+
+ @classmethod
+ def scan_unschroot(cls) -> typing.Iterator["Chroot"]:
+ if cls.cache_unschroot.is_dir():
+ yield from map(cls, cls.cache_unschroot.iterdir())
+
+
+def do_info(args: argparse.Namespace) -> None:
+ """Show information about selected chroots"""
+ chroots: typing.Iterable[Chroot]
+ if args.chroot:
+ try:
+ chroots = [Chroot.searchchroot(args.chroot)]
+ except KeyError:
+ chroots = [Chroot.searchsession(args.chroot)]
+ else:
+ chroots = itertools.chain(
+ Chroot.scan_sbuild(), Chroot.scan_unschroot()
+ )
+ sys.stdout.write("\n".join(chroot.infostr() for chroot in chroots))
+
+
+def do_begin_session(args: argparse.Namespace) -> None:
+ """Begin a session; returns the session ID"""
+ source = Chroot.searchchroot(args.chroot)
+ session = Chroot.newsession()
+ uidmap = linuxnamespaces.IDAllocation.loadsubid("uid").allocatemap(65536)
+ gidmap = linuxnamespaces.IDAllocation.loadsubid("gid").allocatemap(65536)
+ mainsock, childsock = socket.socketpair()
+ with TarFile.open(source.path, "r:*") as tarf:
+ pid = os.fork()
+ if pid == 0:
+ mainsock.close()
+ os.chdir(session.path)
+ linuxnamespaces.unshare(
+ linuxnamespaces.CloneFlags.NEWUSER
+ | linuxnamespaces.CloneFlags.NEWNS,
+ )
+ childsock.send(b"\0")
+ childsock.recv(1)
+ childsock.close()
+ os.setgid(0)
+ os.setuid(0)
+ for tmem in tarf:
+ if not tmem.name.startswith(("dev/", "./dev/")):
+ tarf.extract(tmem, numeric_owner=True)
+ sys.exit(0)
+ childsock.close()
+ mainsock.recv(1)
+ linuxnamespaces.newidmaps(pid, [uidmap], [gidmap])
+ linuxnamespaces.unshare_user_idmap(
+ [uidmap, linuxnamespaces.IDMapping(65536, os.getuid(), 1)],
+ [gidmap, linuxnamespaces.IDMapping(65536, os.getgid(), 1)],
+ )
+ os.chown(session.path, 0, 0)
+ session.path.chmod(0o755)
+ mainsock.send(b"\0")
+ mainsock.close()
+ _, ret = os.waitpid(pid, 0)
+ print(session.name)
+ sys.exit(ret)
+
+
+def do_run_session(args: argparse.Namespace) -> None:
+ """Run an existing session"""
+ session = Chroot.searchsession(args.chroot)
+ uidmap = linuxnamespaces.IDAllocation.loadsubid("uid").allocatemap(65536)
+ gidmap = linuxnamespaces.IDAllocation.loadsubid("gid").allocatemap(65536)
+ mainsock, childsock = socket.socketpair()
+ pid = os.fork()
+ if pid == 0:
+ mainsock.close()
+ os.chdir(session.path)
+ ns = (
+ linuxnamespaces.CloneFlags.NEWUSER
+ | linuxnamespaces.CloneFlags.NEWNS
+ | linuxnamespaces.CloneFlags.NEWPID
+ )
+ linuxnamespaces.unshare(ns)
+ childsock.send(b"\0")
+ childsock.recv(1)
+ if os.fork() != 0:
+ sys.exit(0)
+ assert os.getpid() == 1
+ with linuxnamespaces.FileDescriptor(os.pidfd_open(1, 0)) as pidfd:
+ socket.send_fds(childsock, [b"\0"], [pidfd])
+ os.setgid(0)
+ os.setuid(0)
+ linuxnamespaces.bind_mount(".", "/mnt", recursive=True)
+ os.chdir("/mnt")
+ linuxnamespaces.populate_sys("/", ".")
+ linuxnamespaces.populate_proc("/", ".", ns)
+ linuxnamespaces.populate_dev("/", ".")
+ linuxnamespaces.pivot_root(".", ".")
+ linuxnamespaces.umount(".", linuxnamespaces.UmountFlags.DETACH)
+ os.chdir(args.directory or "/")
+ if args.user.isdigit():
+ spw = pwd.getpwuid(int(args.user))
+ else:
+ spw = pwd.getpwnam(args.user)
+ supplementary = [
+ sgr.gr_gid for sgr in grp.getgrall() if spw.pw_name in sgr.gr_mem
+ ]
+ os.setgroups(supplementary)
+ os.setgid(spw.pw_gid)
+ os.setuid(spw.pw_uid)
+ if not args.command:
+ args.command.append("bash")
+ linuxnamespaces.prctl_set_pdeathsig(signal.SIGTERM)
+ if "PATH" not in os.environ:
+ if spw.pw_uid == 0:
+ os.environ["PATH"] = "/usr/sbin:/sbin:/usr/bin:/bin"
+ else:
+ os.environ["PATH"] = "/usr/bin:/bin"
+ os.execvp(args.command[0], args.command)
+ childsock.close()
+ mainsock.recv(1)
+ linuxnamespaces.newidmaps(pid, [uidmap], [gidmap])
+ linuxnamespaces.prctl_set_child_subreaper(True)
+ mainsock.send(b"\0")
+ _data, fds, _flags, _address = socket.recv_fds(mainsock, 1, 1)
+ pidfd = fds[0]
+ os.waitpid(pid, 0)
+ linuxnamespaces.prctl_set_child_subreaper(False)
+ sys.exit(os.waitid(os.P_PIDFD, pidfd, os.WEXITED).si_status)
+
+
+def do_end_session(args: argparse.Namespace) -> None:
+ """End an existing session"""
+ session = Chroot.searchsession(args.chroot)
+ uidmap = linuxnamespaces.IDAllocation.loadsubid("uid").allocatemap(65536)
+ gidmap = linuxnamespaces.IDAllocation.loadsubid("gid").allocatemap(65536)
+ linuxnamespaces.unshare_user_idmap(
+ [uidmap, linuxnamespaces.IDMapping(65536, os.getuid(), 1)],
+ [gidmap, linuxnamespaces.IDMapping(65536, os.getgid(), 1)],
+ )
+ shutil.rmtree(session.path)
+
+
+def main() -> None:
+ parser = argparse.ArgumentParser()
+ group = parser.add_mutually_exclusive_group(required=True)
+ for comm in ("info", "begin-session", "run-session", "end-session"):
+ func = globals()["do_" + comm.replace("-", "_")]
+ group.add_argument(
+ f"-{comm[0]}",
+ f"--{comm}",
+ dest="command",
+ action="store_const",
+ const=func,
+ help=func.__doc__,
+ )
+ parser.add_argument(
+ "-c",
+ "--chroot",
+ dest="chroot",
+ action="store",
+ help="Use specified chroot",
+ )
+ parser.add_argument("-d", "--directory", action="store")
+ parser.add_argument("-p", "--preserve-environment", action="store_true")
+ parser.add_argument("-q", "--quiet", action="store_true")
+ parser.add_argument("-u", "--user", action="store", default=os.getlogin())
+ parser.add_argument("command", nargs="*")
+ args = parser.parse_args()
+ assert args.command is not None
+ args.command(args)
+
+
+if __name__ == "__main__":
+ main()