#!/usr/bin/python3 # Copyright 2024 Helmut Grohne # SPDX-License-Identifier: GPL-3 """Emulate schroot using namespaces sufficiently well that sbuild can deal with it but not any better. It assumes that ~/.cache/sbuild contains tars suitable for sbuild --chroot-mode=unshare. Additionally, those tars are expected to contain the non-essential passwd package. The actual sessions are stored in ~/.cache/unschroot. For using it with sbuild, your sbuildrc should contain: $chroot_mode = "schroot"; $schroot = "/path/to/unschroot"; """ import argparse import functools import grp import itertools import os import pathlib import pwd import shutil import signal import socket import stat import sys import tempfile import typing if __file__.split("/")[-2:-1] == ["examples"]: sys.path.insert(0, "/".join(__file__.split("/")[:-2])) import linuxnamespaces import linuxnamespaces.tarutils class TarFile( linuxnamespaces.tarutils.ZstdTarFile, linuxnamespaces.tarutils.XAttrTarFile ): pass class Chroot: # Ignore $HOME as sbuild sets to something invalid home = pathlib.Path(pwd.getpwuid(os.getuid()).pw_dir) cache_sbuild = home / ".cache/sbuild" cache_unschroot = home / ".cache/unschroot" def __init__(self, path: pathlib.Path, aliases: set[str] | None = None): self.path = path self.aliases = set() if aliases is None else aliases @functools.cached_property def namespace(self) -> str: if self.path.is_file(): return "Chroot" if self.path.is_dir(): return "Session" raise ValueError("invalid chroot object") @functools.cached_property def name(self) -> str: suffix = "-sbuild" if self.namespace == "Chroot" else "" return self.path.name.split(".", 1)[0] + suffix def infostr(self) -> str: lines = [ f"--- {self.namespace} ---", f"Name {self.name}", ] if self.namespace == "Chroot": lines.extend(["Type file", f"File {self.path}"]) if self.namespace == "Session": lines.extend( [ f"Location {self.path}", "Session Purged true", "Type unshare", ] ) lines.append("Aliases " + " ".join(sorted(self.aliases))) return "".join(map("%s\n".__mod__, lines)) @classmethod def searchchroot(cls, name: str) -> "Chroot": name = name.removeprefix("chroot:") name = name.removesuffix("-sbuild") for path in cls.cache_sbuild.iterdir(): if path.name.startswith(name + ".t"): return cls(path) raise KeyError(name) @classmethod def searchsession(cls, name: str) -> "Chroot": name = name.removeprefix("session:") path = cls.cache_unschroot / name if not path.is_dir(): raise KeyError(name) return cls(path) @classmethod def newsession(cls) -> "Chroot": cls.cache_unschroot.mkdir(parents=True, exist_ok=True) return Chroot( pathlib.Path( tempfile.mkdtemp(prefix="chroot", dir=cls.cache_unschroot) ), ) @classmethod def scan_sbuild(cls) -> typing.Iterator["Chroot"]: if cls.cache_sbuild.is_dir(): chroots = [] aliases: dict[str, set[str]] = {} for path in cls.cache_sbuild.iterdir(): if path.is_symlink(): alias = path.name.split(".", 1)[0] + "-sbuild" aliases.setdefault(str(path.readlink()), set()).add(alias) elif path.is_file(): chroots.append(path) for path in chroots: yield cls(path, aliases.get(path.name, set())) @classmethod def scan_unschroot(cls) -> typing.Iterator["Chroot"]: if cls.cache_unschroot.is_dir(): yield from map(cls, cls.cache_unschroot.iterdir()) def do_info(args: argparse.Namespace) -> None: """Show information about selected chroots""" chroots: typing.Iterable[Chroot] if args.chroot: try: chroots = [Chroot.searchchroot(args.chroot)] except KeyError: chroots = [Chroot.searchsession(args.chroot)] else: chroots = itertools.chain( Chroot.scan_sbuild(), Chroot.scan_unschroot() ) sys.stdout.write("\n".join(chroot.infostr() for chroot in chroots)) def do_begin_session(args: argparse.Namespace) -> None: """Begin a session; returns the session ID""" source = Chroot.searchchroot(args.chroot) session = Chroot.newsession() uidmap = linuxnamespaces.IDAllocation.loadsubid("uid").allocatemap(65536) gidmap = linuxnamespaces.IDAllocation.loadsubid("gid").allocatemap(65536) mainsock, childsock = socket.socketpair() with TarFile.open(source.path, "r:*") as tarf: pid = os.fork() if pid == 0: mainsock.close() os.chdir(session.path) linuxnamespaces.unshare( linuxnamespaces.CloneFlags.NEWUSER | linuxnamespaces.CloneFlags.NEWNS, ) childsock.send(b"\0") childsock.recv(1) childsock.close() os.setgid(0) os.setuid(0) for tmem in tarf: if not tmem.name.startswith(("dev/", "./dev/")): tarf.extract(tmem, numeric_owner=True) etc_hosts = pathlib.Path("./etc/hosts") if not etc_hosts.exists(): etc_hosts.write_text( """127.0.0.1 localhost 127.0.1.1 %s ::1 localhost ip6-localhost ip6-loopback """ % socket.gethostname(), ) sys.exit(0) childsock.close() mainsock.recv(1) linuxnamespaces.newidmaps(pid, [uidmap], [gidmap]) linuxnamespaces.unshare_user_idmap( [uidmap, linuxnamespaces.IDMapping(65536, os.getuid(), 1)], [gidmap, linuxnamespaces.IDMapping(65536, os.getgid(), 1)], ) os.chown(session.path, 0, 0) session.path.chmod(0o755) mainsock.send(b"\0") mainsock.close() _, ret = os.waitpid(pid, 0) print(session.name) sys.exit(ret) def exec_perl_dumb_init(pid: int) -> typing.NoReturn: """Roughly implement dumb-init in perl: Wait for all children until we receive an exit from the given pid and forward its status. """ os.execlp( "perl", "perl", "-e", "$r=255<<8;" # exit 255 when we run out of children "do{" "$p=wait;" f"$r=$?,$p=0 if $p=={pid};" "}while($p>0);" "exit(0<$r<256?128|$r:$r>>8);", # sig -> 128+sig; exit -> exit ) def do_run_session(args: argparse.Namespace) -> None: """Run an existing session""" session = Chroot.searchsession(args.chroot) uidmap = linuxnamespaces.IDAllocation.loadsubid("uid").allocatemap(65536) gidmap = linuxnamespaces.IDAllocation.loadsubid("gid").allocatemap(65536) mainsock, childsock = socket.socketpair() pid = os.fork() pidfd: int if pid == 0: mainsock.close() for fd in (1, 2): if stat.S_ISFIFO(os.fstat(fd).st_mode): os.fchmod(fd, 0o666) os.chdir(session.path) ns = ( linuxnamespaces.CloneFlags.NEWUSER | linuxnamespaces.CloneFlags.NEWNS | linuxnamespaces.CloneFlags.NEWPID ) if args.isolate_network: ns |= linuxnamespaces.CloneFlags.NEWNET linuxnamespaces.unshare(ns) childsock.send(b"\0") childsock.recv(1) if os.fork() != 0: sys.exit(0) assert os.getpid() == 1 with linuxnamespaces.FileDescriptor(os.pidfd_open(1, 0)) as pidfd: socket.send_fds(childsock, [b"\0"], [pidfd]) os.setgid(0) os.setuid(0) linuxnamespaces.bind_mount(".", "/mnt", recursive=True) os.chdir("/mnt") linuxnamespaces.populate_sys("/", ".", ns, devices=True) linuxnamespaces.populate_proc("/", ".", ns) linuxnamespaces.populate_dev( "/", ".", tun=bool(ns & linuxnamespaces.CloneFlags.NEWNET) ) linuxnamespaces.pivot_root(".", ".") linuxnamespaces.umount(".", linuxnamespaces.UmountFlags.DETACH) os.chdir("/") if ns & linuxnamespaces.CloneFlags.NEWNET: linuxnamespaces.enable_loopback_if() if args.user.isdigit(): spw = pwd.getpwuid(int(args.user)) else: spw = pwd.getpwnam(args.user) supplementary = [ sgr.gr_gid for sgr in grp.getgrall() if spw.pw_name in sgr.gr_mem ] childsock.recv(1) childsock.close() rfd, wfd = linuxnamespaces.FileDescriptor.pipe(inheritable=False) pid = os.fork() if pid == 0: wfd.close() if args.directory: os.chdir(args.directory) os.setgroups(supplementary) os.setgid(spw.pw_gid) os.setuid(spw.pw_uid) if "PATH" not in os.environ: if spw.pw_uid == 0: os.environ["PATH"] = "/usr/sbin:/sbin:/usr/bin:/bin" else: os.environ["PATH"] = "/usr/bin:/bin" if not args.command: args.command.append("bash") # Wait until Python has handed off to Perl. os.read(rfd, 1) os.execvp(args.command[0], args.command) else: rfd.close() linuxnamespaces.prctl_set_pdeathsig(signal.SIGKILL) os.close(0) # It is important that we now exec to get rid of our previous # execution context that carries pieces such as memory maps from # different namespaces that could allow escalating privileges. The # exec will close wfd and allow the target process to exec. exec_perl_dumb_init(pid) childsock.close() mainsock.recv(1) linuxnamespaces.newidmaps(pid, [uidmap], [gidmap]) linuxnamespaces.prctl_set_child_subreaper(True) mainsock.send(b"\0") _data, fds, _flags, _address = socket.recv_fds(mainsock, 1, 1) pidfd = fds[0] os.waitpid(pid, 0) linuxnamespaces.prctl_set_child_subreaper(False) mainsock.send(b"\0") wres = os.waitid(os.P_PIDFD, pidfd, os.WEXITED) assert wres is not None sys.exit(wres.si_status) def do_end_session(args: argparse.Namespace) -> None: """End an existing session""" session = Chroot.searchsession(args.chroot) uidmap = linuxnamespaces.IDAllocation.loadsubid("uid").allocatemap(65536) gidmap = linuxnamespaces.IDAllocation.loadsubid("gid").allocatemap(65536) linuxnamespaces.unshare_user_idmap( [uidmap, linuxnamespaces.IDMapping(65536, os.getuid(), 1)], [gidmap, linuxnamespaces.IDMapping(65536, os.getgid(), 1)], ) shutil.rmtree(session.path) def main() -> None: parser = argparse.ArgumentParser() group = parser.add_mutually_exclusive_group(required=True) for comm in ("info", "begin-session", "run-session", "end-session"): func = globals()["do_" + comm.replace("-", "_")] group.add_argument( f"-{comm[0]}", f"--{comm}", dest="subcommand", action="store_const", const=func, help=func.__doc__, ) parser.add_argument( "-c", "--chroot", dest="chroot", action="store", help="Use specified chroot", ) parser.add_argument("-d", "--directory", action="store") parser.add_argument("-p", "--preserve-environment", action="store_true") parser.add_argument("-q", "--quiet", action="store_true") parser.add_argument("-u", "--user", action="store", default=os.getlogin()) parser.add_argument("--isolate-network", action="store_true") parser.add_argument("command", nargs="*") args = parser.parse_args() assert args.subcommand is not None args.subcommand(args) if __name__ == "__main__": main()