From a1cc59818088bae661ecead3a3d769c7a9970d2d Mon Sep 17 00:00:00 2001 From: Helmut Grohne Date: Sat, 8 Jun 2024 17:26:32 +0200 Subject: add example "unschroot.py" While this mostly provides the schroot API and adds its own semantics around ~/.cache/unschroot, please do not consider examples a stable interface but a room for experimentation and incompatible changes. --- examples/unschroot.py | 288 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 288 insertions(+) create mode 100755 examples/unschroot.py diff --git a/examples/unschroot.py b/examples/unschroot.py new file mode 100755 index 0000000..5d847f4 --- /dev/null +++ b/examples/unschroot.py @@ -0,0 +1,288 @@ +#!/usr/bin/python3 +# Copyright 2024 Helmut Grohne +# SPDX-License-Identifier: GPL-3 + +"""Emulate schroot using namespaces sufficiently well that sbuild can deal with +it but not any better. It assumes that ~/.cache/sbuild contains tars suitable +for sbuild --chroot-mode=unshare. Additionally, those tars are expected to +contain the non-essential passwd package. The actual sessions are stored in +~/.cache/unschroot. For using it with sbuild, your sbuildrc should contain: + + $chroot_mode = "schroot"; + $schroot = "/path/to/unschroot"; +""" + + +import argparse +import functools +import grp +import itertools +import os +import pathlib +import pwd +import shutil +import signal +import socket +import sys +import tempfile +import typing + +if __file__.split("/")[-2:-1] == ["examples"]: + sys.path.insert(0, "/".join(__file__.split("/")[:-2])) + +import linuxnamespaces +import linuxnamespaces.tarutils + + +class TarFile( + linuxnamespaces.tarutils.ZstdTarFile, linuxnamespaces.tarutils.XAttrTarFile +): + pass + + +class Chroot: + # Ignore $HOME as sbuild sets to something invalid + home = pathlib.Path(pwd.getpwuid(os.getuid()).pw_dir) + cache_sbuild = home / ".cache/sbuild" + cache_unschroot = home / ".cache/unschroot" + + def __init__(self, path: pathlib.Path, aliases: set[str] | None = None): + self.path = path + self.aliases = set() if aliases is None else aliases + + @functools.cached_property + def namespace(self) -> str: + if self.path.is_file(): + return "Chroot" + if self.path.is_dir(): + return "Session" + raise ValueError("invalid chroot object") + + @functools.cached_property + def name(self) -> str: + suffix = "-sbuild" if self.namespace == "Chroot" else "" + return self.path.name.split(".", 1)[0] + suffix + + def infostr(self) -> str: + lines = [ + f"--- {self.namespace} ---", + f"Name {self.name}", + ] + if self.namespace == "Chroot": + lines.extend(["Type file", f"File {self.path}"]) + if self.namespace == "Session": + lines.append(f"Location {self.path}") + if self.aliases: + lines.append("Aliases " + " ".join(sorted(self.aliases))) + return "".join(map("%s\n".__mod__, lines)) + + @classmethod + def searchchroot(cls, name: str) -> "Chroot": + name = name.removeprefix("chroot:") + name = name.removesuffix("-sbuild") + for path in cls.cache_sbuild.iterdir(): + if path.name.startswith(name + ".t"): + return cls(path) + raise KeyError(name) + + @classmethod + def searchsession(cls, name: str) -> "Chroot": + name = name.removeprefix("session:") + path = cls.cache_unschroot / name + if not path.is_dir(): + raise KeyError(name) + return cls(path) + + @classmethod + def newsession(cls) -> "Chroot": + cls.cache_unschroot.mkdir(parents=True, exist_ok=True) + return Chroot( + pathlib.Path( + tempfile.mkdtemp(prefix="chroot", dir=cls.cache_unschroot) + ), + ) + + @classmethod + def scan_sbuild(cls) -> typing.Iterator["Chroot"]: + if cls.cache_sbuild.is_dir(): + chroots = [] + aliases: dict[str, set[str]] = {} + for path in cls.cache_sbuild.iterdir(): + if path.is_symlink(): + alias = path.name.split(".", 1)[0] + "-sbuild" + aliases.setdefault(str(path.readlink()), set()).add(alias) + elif path.is_file(): + chroots.append(path) + for path in chroots: + yield cls(path, aliases.get(path.name, set())) + + @classmethod + def scan_unschroot(cls) -> typing.Iterator["Chroot"]: + if cls.cache_unschroot.is_dir(): + yield from map(cls, cls.cache_unschroot.iterdir()) + + +def do_info(args: argparse.Namespace) -> None: + """Show information about selected chroots""" + chroots: typing.Iterable[Chroot] + if args.chroot: + try: + chroots = [Chroot.searchchroot(args.chroot)] + except KeyError: + chroots = [Chroot.searchsession(args.chroot)] + else: + chroots = itertools.chain( + Chroot.scan_sbuild(), Chroot.scan_unschroot() + ) + sys.stdout.write("\n".join(chroot.infostr() for chroot in chroots)) + + +def do_begin_session(args: argparse.Namespace) -> None: + """Begin a session; returns the session ID""" + source = Chroot.searchchroot(args.chroot) + session = Chroot.newsession() + uidmap = linuxnamespaces.IDAllocation.loadsubid("uid").allocatemap(65536) + gidmap = linuxnamespaces.IDAllocation.loadsubid("gid").allocatemap(65536) + mainsock, childsock = socket.socketpair() + with TarFile.open(source.path, "r:*") as tarf: + pid = os.fork() + if pid == 0: + mainsock.close() + os.chdir(session.path) + linuxnamespaces.unshare( + linuxnamespaces.CloneFlags.NEWUSER + | linuxnamespaces.CloneFlags.NEWNS, + ) + childsock.send(b"\0") + childsock.recv(1) + childsock.close() + os.setgid(0) + os.setuid(0) + for tmem in tarf: + if not tmem.name.startswith(("dev/", "./dev/")): + tarf.extract(tmem, numeric_owner=True) + sys.exit(0) + childsock.close() + mainsock.recv(1) + linuxnamespaces.newidmaps(pid, [uidmap], [gidmap]) + linuxnamespaces.unshare_user_idmap( + [uidmap, linuxnamespaces.IDMapping(65536, os.getuid(), 1)], + [gidmap, linuxnamespaces.IDMapping(65536, os.getgid(), 1)], + ) + os.chown(session.path, 0, 0) + session.path.chmod(0o755) + mainsock.send(b"\0") + mainsock.close() + _, ret = os.waitpid(pid, 0) + print(session.name) + sys.exit(ret) + + +def do_run_session(args: argparse.Namespace) -> None: + """Run an existing session""" + session = Chroot.searchsession(args.chroot) + uidmap = linuxnamespaces.IDAllocation.loadsubid("uid").allocatemap(65536) + gidmap = linuxnamespaces.IDAllocation.loadsubid("gid").allocatemap(65536) + mainsock, childsock = socket.socketpair() + pid = os.fork() + if pid == 0: + mainsock.close() + os.chdir(session.path) + ns = ( + linuxnamespaces.CloneFlags.NEWUSER + | linuxnamespaces.CloneFlags.NEWNS + | linuxnamespaces.CloneFlags.NEWPID + ) + linuxnamespaces.unshare(ns) + childsock.send(b"\0") + childsock.recv(1) + if os.fork() != 0: + sys.exit(0) + assert os.getpid() == 1 + with linuxnamespaces.FileDescriptor(os.pidfd_open(1, 0)) as pidfd: + socket.send_fds(childsock, [b"\0"], [pidfd]) + os.setgid(0) + os.setuid(0) + linuxnamespaces.bind_mount(".", "/mnt", recursive=True) + os.chdir("/mnt") + linuxnamespaces.populate_sys("/", ".") + linuxnamespaces.populate_proc("/", ".", ns) + linuxnamespaces.populate_dev("/", ".") + linuxnamespaces.pivot_root(".", ".") + linuxnamespaces.umount(".", linuxnamespaces.UmountFlags.DETACH) + os.chdir(args.directory or "/") + if args.user.isdigit(): + spw = pwd.getpwuid(int(args.user)) + else: + spw = pwd.getpwnam(args.user) + supplementary = [ + sgr.gr_gid for sgr in grp.getgrall() if spw.pw_name in sgr.gr_mem + ] + os.setgroups(supplementary) + os.setgid(spw.pw_gid) + os.setuid(spw.pw_uid) + if not args.command: + args.command.append("bash") + linuxnamespaces.prctl_set_pdeathsig(signal.SIGTERM) + if "PATH" not in os.environ: + if spw.pw_uid == 0: + os.environ["PATH"] = "/usr/sbin:/sbin:/usr/bin:/bin" + else: + os.environ["PATH"] = "/usr/bin:/bin" + os.execvp(args.command[0], args.command) + childsock.close() + mainsock.recv(1) + linuxnamespaces.newidmaps(pid, [uidmap], [gidmap]) + linuxnamespaces.prctl_set_child_subreaper(True) + mainsock.send(b"\0") + _data, fds, _flags, _address = socket.recv_fds(mainsock, 1, 1) + pidfd = fds[0] + os.waitpid(pid, 0) + linuxnamespaces.prctl_set_child_subreaper(False) + sys.exit(os.waitid(os.P_PIDFD, pidfd, os.WEXITED).si_status) + + +def do_end_session(args: argparse.Namespace) -> None: + """End an existing session""" + session = Chroot.searchsession(args.chroot) + uidmap = linuxnamespaces.IDAllocation.loadsubid("uid").allocatemap(65536) + gidmap = linuxnamespaces.IDAllocation.loadsubid("gid").allocatemap(65536) + linuxnamespaces.unshare_user_idmap( + [uidmap, linuxnamespaces.IDMapping(65536, os.getuid(), 1)], + [gidmap, linuxnamespaces.IDMapping(65536, os.getgid(), 1)], + ) + shutil.rmtree(session.path) + + +def main() -> None: + parser = argparse.ArgumentParser() + group = parser.add_mutually_exclusive_group(required=True) + for comm in ("info", "begin-session", "run-session", "end-session"): + func = globals()["do_" + comm.replace("-", "_")] + group.add_argument( + f"-{comm[0]}", + f"--{comm}", + dest="command", + action="store_const", + const=func, + help=func.__doc__, + ) + parser.add_argument( + "-c", + "--chroot", + dest="chroot", + action="store", + help="Use specified chroot", + ) + parser.add_argument("-d", "--directory", action="store") + parser.add_argument("-p", "--preserve-environment", action="store_true") + parser.add_argument("-q", "--quiet", action="store_true") + parser.add_argument("-u", "--user", action="store", default=os.getlogin()) + parser.add_argument("command", nargs="*") + args = parser.parse_args() + assert args.command is not None + args.command(args) + + +if __name__ == "__main__": + main() -- cgit v1.2.3