#!/usr/bin/python3 # Copyright 2024 Helmut Grohne # SPDX-License-Identifier: GPL-3 """Emulate schroot using namespaces sufficiently well that sbuild can deal with it but not any better. It assumes that ~/.cache/sbuild contains tars suitable for sbuild --chroot-mode=unshare. Additionally, those tars are expected to contain the non-essential passwd package. The actual sessions are stored in ~/.cache/unschroot. For using it with sbuild, your sbuildrc should contain: $chroot_mode = "schroot"; $schroot = "/path/to/unschroot"; State and sessions are retained via the filesystem in ~/.cache/unschroot between calls with no background processes or persistent namespaces. """ import argparse import grp import os import pathlib import pwd import shutil import signal import socket import stat import sys import tempfile import typing if __file__.split("/")[-2:-1] == ["examples"]: sys.path.insert(0, "/".join(__file__.split("/")[:-2])) import linuxnamespaces import linuxnamespaces.tarutils class TarFile( linuxnamespaces.tarutils.ZstdTarFile, linuxnamespaces.tarutils.XAttrTarFile ): pass def write_etc_hosts(root: os.PathLike[str] | str) -> None: etc_hosts = pathlib.Path(root) / "etc/hosts" if not etc_hosts.exists(): etc_hosts.write_text( """127.0.0.1 localhost 127.0.1.1 %s ::1 localhost ip6-localhost ip6-loopback """ % socket.gethostname(), encoding="ascii", ) def load_subids() -> ( tuple[linuxnamespaces.IDMapping, linuxnamespaces.IDMapping] ): return ( linuxnamespaces.IDAllocation.loadsubid("uid").allocatemap(65536), linuxnamespaces.IDAllocation.loadsubid("gid").allocatemap(65536), ) # Ignore $HOME as sbuild sets to something invalid HOME = pathlib.Path(pwd.getpwuid(os.getuid()).pw_dir) CACHE_SBUILD = HOME / ".cache/sbuild" CACHE_UNSCHROOT = HOME / ".cache/unschroot" CACHE_DIRECTORY_CHROOTS = HOME / ".cache/directory_chroots" class ChrootBase: namespace: str name: str def __init__(self) -> None: self.aliases: set[str] = set() def infodata(self) -> dict[str, str]: return { "Name": self.name, "Aliases": " ".join(sorted(self.aliases)), } def infostr(self) -> str: return f"--- {self.namespace} ---\n" + "".join( map("%s %s\n".__mod__, self.infodata().items()) ) class SourceChroot(ChrootBase): namespace = "Chroot" def newsession(self) -> "SessionChroot": raise NotImplementedError class SessionChroot(ChrootBase): namespace = "Session" def infodata(self) -> dict[str, str]: data = super().infodata() data["Session Purged"] = "true" data["Type"] = "unshare" return data def mount(self) -> pathlib.Path: raise NotImplementedError class TarSourceChroot(SourceChroot): def __init__(self, path: pathlib.Path): super().__init__() self.path = path self.name = path.name.split(".", 1)[0] + "-sbuild" def infodata(self) -> dict[str, str]: data = super().infodata() data["Type"] = "file" data["File"] = str(self.path) return data def newsession(self) -> "TarSessionChroot": CACHE_UNSCHROOT.mkdir(parents=True, exist_ok=True) session = TarSessionChroot( pathlib.Path(tempfile.mkdtemp(prefix="tar-", dir=CACHE_UNSCHROOT)), ) uidmap, gidmap = load_subids() mainsock, childsock = socket.socketpair() with TarFile.open(self.path, "r:*") as tarf: pid = os.fork() if pid == 0: mainsock.close() os.chdir(session.path) linuxnamespaces.unshare( linuxnamespaces.CloneFlags.NEWUSER | linuxnamespaces.CloneFlags.NEWNS, ) childsock.send(b"\0") childsock.recv(1) childsock.close() os.setgid(0) os.setuid(0) for tmem in tarf: if not tmem.name.startswith(("dev/", "./dev/")): tarf.extract(tmem, numeric_owner=True) write_etc_hosts(".") sys.exit(0) childsock.close() mainsock.recv(1) pid2 = os.fork() if pid2 == 0: linuxnamespaces.unshare_user_idmap( [uidmap, linuxnamespaces.IDMapping(65536, os.getuid(), 1)], [gidmap, linuxnamespaces.IDMapping(65536, os.getgid(), 1)], ) os.chown(session.path, 0, 0) session.path.chmod(0o755) sys.exit(0) linuxnamespaces.newidmaps(pid, [uidmap], [gidmap]) _, ret = os.waitpid(pid2, 0) assert ret == 0 mainsock.send(b"\0") mainsock.close() _, ret = os.waitpid(pid, 0) assert ret == 0 return session class TarSessionChroot(SessionChroot): def __init__(self, path: pathlib.Path): super().__init__() self.path = path self.name = path.name def mount(self) -> pathlib.Path: linuxnamespaces.bind_mount(self.path, "/mnt", recursive=True) return pathlib.Path("/mnt") class DirectorySourceChroot(SourceChroot): def __init__(self, path: pathlib.Path): super().__init__() self.path = path self.name = path.name + "-sbuild" def infodata(self) -> dict[str, str]: data = super().infodata() data["Type"] = "directory" data["Directory"] = str(self.path) return data def newsession(self) -> "DirectorySessionChroot": CACHE_UNSCHROOT.mkdir(parents=True, exist_ok=True) path = pathlib.Path( tempfile.mkdtemp( prefix=f"overlay-{self.name}-", dir=CACHE_UNSCHROOT ), ) session = DirectorySessionChroot(self, path) uidmap, gidmap = load_subids() pid = os.fork() if pid == 0: linuxnamespaces.unshare_user_idmap( [uidmap, linuxnamespaces.IDMapping(65536, os.getuid(), 1)], [gidmap, linuxnamespaces.IDMapping(65536, os.getgid(), 1)], ) os.setgid(0) os.setuid(0) os.chown(path, 0, 0) path.chmod(0o755) (path / "upper").mkdir() (path / "work").mkdir() if not (self.path / "etc/hosts").exists(): (path / "upper/etc").mkdir() write_etc_hosts(path / "upper") sys.exit(0) _, ret = os.waitpid(pid, 0) assert ret == 0 return session class DirectorySessionChroot(SessionChroot): def __init__(self, source: DirectorySourceChroot, path: pathlib.Path): super().__init__() self.source = source self.path = path self.name = path.name def infodata(self) -> dict[str, str]: data = super().infodata() data["Type"] = "directory" data["Directory"] = str(self.source.path) # It's a gross lie, but sbuild does not work without. It has to # actually exist and should not occur inside build logs. data["Location"] = str(self.source.path) return data def mount(self) -> pathlib.Path: mnt = "/mnt" linuxnamespaces.mount( "overlay", mnt, "overlay", data={ "lowerdir": str(self.source.path), "upperdir": str(self.path / "upper"), "workdir": str(self.path / "work"), "userxattr": None, }, ) return pathlib.Path(mnt) def scan_chroots() -> dict[str, ChrootBase]: chrootmap: dict[str, ChrootBase] = {} chroot: ChrootBase for loc, cls in ( (CACHE_SBUILD, TarSourceChroot), (CACHE_DIRECTORY_CHROOTS, DirectorySourceChroot), ): if loc.is_dir(): chroots = [] aliases: dict[str, set[str]] = {} for path in loc.iterdir(): if path.is_symlink(): alias = path.name.split(".", 1)[0] + "-sbuild" aliases.setdefault(str(path.readlink()), set()).add(alias) else: chroots.append(path) for path in chroots: chroot = cls(path) chrootaliases = aliases.get(path.name, set()) chroot.aliases.update(chrootaliases) if chroot.name not in chrootmap: chrootmap[chroot.name] = chroot for alias in chrootaliases: if alias not in chrootmap: chrootmap[alias] = chroot if CACHE_UNSCHROOT.is_dir(): for path in CACHE_UNSCHROOT.iterdir(): if path.name.startswith("tar-"): chroot = TarSessionChroot(path) if chroot.name not in chrootmap: chrootmap[chroot.name] = chroot elif path.name.startswith("overlay-"): base = "-".join(path.name.split("-")[1:-1]) if base not in chrootmap: continue source = chrootmap[base] assert isinstance(source, DirectorySourceChroot) chroot = DirectorySessionChroot(source, path) if chroot.name not in chrootmap: chrootmap[chroot.name] = chroot return chrootmap def do_info(args: argparse.Namespace) -> None: """Show information about selected chroots""" chrootmap = scan_chroots() chroots: typing.Iterable[ChrootBase] if args.chroot: chroots = [ chrootmap[ args.chroot.removeprefix("chroot:").removeprefix("session:") ], ] else: chroots = chrootmap.values() sys.stdout.write("\n".join(chroot.infostr() for chroot in chroots)) def do_begin_session(args: argparse.Namespace) -> None: """Begin a session; returns the session ID""" chrootmap = scan_chroots() source = chrootmap[args.chroot.removeprefix("chroot:")] assert isinstance(source, SourceChroot) session = source.newsession() print(session.name) def exec_perl_dumb_init(pid: int) -> typing.NoReturn: """Roughly implement dumb-init in perl: Wait for all children until we receive an exit from the given pid and forward its status. """ os.execlp( "perl", "perl", "-e", "$r=255<<8;" # exit 255 when we run out of children "do{" "$p=wait;" f"$r=$?,$p=0 if $p=={pid};" "}while($p>0);" "exit(0<$r<256?128|$r:$r>>8);", # sig -> 128+sig; exit -> exit ) def do_run_session(args: argparse.Namespace) -> None: """Run an existing session""" chrootmap = scan_chroots() session = chrootmap[args.chroot] assert isinstance(session, SessionChroot) uidmap, gidmap = load_subids() mainsock, childsock = socket.socketpair() pid = os.fork() pidfd: int if pid == 0: mainsock.close() for fd in (1, 2): if stat.S_ISFIFO(os.fstat(fd).st_mode): os.fchmod(fd, 0o666) ns = ( linuxnamespaces.CloneFlags.NEWUSER | linuxnamespaces.CloneFlags.NEWNS | linuxnamespaces.CloneFlags.NEWPID ) if args.isolate_network: ns |= linuxnamespaces.CloneFlags.NEWNET linuxnamespaces.unshare(ns) childsock.send(b"\0") childsock.recv(1) if os.fork() != 0: sys.exit(0) assert os.getpid() == 1 with linuxnamespaces.FileDescriptor(os.pidfd_open(1, 0)) as pidfd: socket.send_fds(childsock, [b"\0"], [pidfd]) os.setgid(0) os.setuid(0) root = session.mount() os.chdir(root) linuxnamespaces.populate_sys("/", ".", ns, devices=True) linuxnamespaces.populate_proc("/", ".", ns) linuxnamespaces.populate_dev( "/", ".", tun=bool(ns & linuxnamespaces.CloneFlags.NEWNET) ) linuxnamespaces.pivot_root(".", ".") linuxnamespaces.umount(".", linuxnamespaces.UmountFlags.DETACH) os.chdir("/") if ns & linuxnamespaces.CloneFlags.NEWNET: linuxnamespaces.enable_loopback_if() if args.user.isdigit(): spw = pwd.getpwuid(int(args.user)) else: spw = pwd.getpwnam(args.user) supplementary = [ sgr.gr_gid for sgr in grp.getgrall() if spw.pw_name in sgr.gr_mem ] childsock.recv(1) childsock.close() rfd, wfd = linuxnamespaces.FileDescriptor.pipe(inheritable=False) pid = os.fork() if pid == 0: wfd.close() if args.directory: os.chdir(args.directory) os.setgroups(supplementary) os.setgid(spw.pw_gid) os.setuid(spw.pw_uid) if "PATH" not in os.environ: if spw.pw_uid == 0: os.environ["PATH"] = "/usr/sbin:/sbin:/usr/bin:/bin" else: os.environ["PATH"] = "/usr/bin:/bin" if not args.command: args.command.append("bash") # Wait until Python has handed off to Perl. os.read(rfd, 1) os.execvp(args.command[0], args.command) else: rfd.close() linuxnamespaces.prctl_set_pdeathsig(signal.SIGKILL) os.close(0) # It is important that we now exec to get rid of our previous # execution context that carries pieces such as memory maps from # different namespaces that could allow escalating privileges. The # exec will close wfd and allow the target process to exec. exec_perl_dumb_init(pid) childsock.close() mainsock.recv(1) linuxnamespaces.newidmaps(pid, [uidmap], [gidmap]) linuxnamespaces.prctl_set_child_subreaper(True) mainsock.send(b"\0") _data, fds, _flags, _address = socket.recv_fds(mainsock, 1, 1) pidfd = fds[0] os.waitpid(pid, 0) linuxnamespaces.prctl_set_child_subreaper(False) mainsock.send(b"\0") wres = os.waitid(os.P_PIDFD, pidfd, os.WEXITED) assert wres is not None sys.exit(wres.si_status) def do_end_session(args: argparse.Namespace) -> None: """End an existing session""" chrootmap = scan_chroots() session = chrootmap[args.chroot] assert isinstance(session, (TarSessionChroot, DirectorySessionChroot)) uidmap = linuxnamespaces.IDAllocation.loadsubid("uid").allocatemap(65536) gidmap = linuxnamespaces.IDAllocation.loadsubid("gid").allocatemap(65536) linuxnamespaces.unshare_user_idmap( [uidmap, linuxnamespaces.IDMapping(65536, os.getuid(), 1)], [gidmap, linuxnamespaces.IDMapping(65536, os.getgid(), 1)], ) shutil.rmtree(session.path) def main() -> None: parser = argparse.ArgumentParser() group = parser.add_mutually_exclusive_group(required=True) for comm in ("info", "begin-session", "run-session", "end-session"): func = globals()["do_" + comm.replace("-", "_")] group.add_argument( f"-{comm[0]}", f"--{comm}", dest="subcommand", action="store_const", const=func, help=func.__doc__, ) parser.add_argument( "-c", "--chroot", dest="chroot", action="store", help="Use specified chroot", ) parser.add_argument("-d", "--directory", action="store") parser.add_argument("-p", "--preserve-environment", action="store_true") parser.add_argument("-q", "--quiet", action="store_true") parser.add_argument("-u", "--user", action="store", default=os.getlogin()) parser.add_argument("--isolate-network", action="store_true") parser.add_argument("command", nargs="*") args = parser.parse_args() assert args.subcommand is not None args.subcommand(args) if __name__ == "__main__": main()