diff options
Diffstat (limited to 'examples/unschroot.py')
-rwxr-xr-x | examples/unschroot.py | 353 |
1 files changed, 0 insertions, 353 deletions
diff --git a/examples/unschroot.py b/examples/unschroot.py deleted file mode 100755 index 3d1c900..0000000 --- a/examples/unschroot.py +++ /dev/null @@ -1,353 +0,0 @@ -#!/usr/bin/python3 -# Copyright 2024 Helmut Grohne <helmut@subdivi.de> -# SPDX-License-Identifier: GPL-3 - -"""Emulate schroot using namespaces sufficiently well that sbuild can deal with -it but not any better. It assumes that ~/.cache/sbuild contains tars suitable -for sbuild --chroot-mode=unshare. Additionally, those tars are expected to -contain the non-essential passwd package. The actual sessions are stored in -~/.cache/unschroot. For using it with sbuild, your sbuildrc should contain: - - $chroot_mode = "schroot"; - $schroot = "/path/to/unschroot"; -""" - - -import argparse -import functools -import grp -import itertools -import os -import pathlib -import pwd -import shutil -import signal -import socket -import stat -import sys -import tempfile -import typing - -if __file__.split("/")[-2:-1] == ["examples"]: - sys.path.insert(0, "/".join(__file__.split("/")[:-2])) - -import linuxnamespaces -import linuxnamespaces.tarutils - - -class TarFile( - linuxnamespaces.tarutils.ZstdTarFile, linuxnamespaces.tarutils.XAttrTarFile -): - pass - - -class Chroot: - # Ignore $HOME as sbuild sets to something invalid - home = pathlib.Path(pwd.getpwuid(os.getuid()).pw_dir) - cache_sbuild = home / ".cache/sbuild" - cache_unschroot = home / ".cache/unschroot" - - def __init__(self, path: pathlib.Path, aliases: set[str] | None = None): - self.path = path - self.aliases = set() if aliases is None else aliases - - @functools.cached_property - def namespace(self) -> str: - if self.path.is_file(): - return "Chroot" - if self.path.is_dir(): - return "Session" - raise ValueError("invalid chroot object") - - @functools.cached_property - def name(self) -> str: - suffix = "-sbuild" if self.namespace == "Chroot" else "" - return self.path.name.split(".", 1)[0] + suffix - - def infostr(self) -> str: - lines = [ - f"--- {self.namespace} ---", - f"Name {self.name}", - ] - if self.namespace == "Chroot": - lines.extend(["Type file", f"File {self.path}"]) - if self.namespace == "Session": - lines.extend( - [ - f"Location {self.path}", - "Session Purged true", - "Type unshare", - ] - ) - lines.append("Aliases " + " ".join(sorted(self.aliases))) - return "".join(map("%s\n".__mod__, lines)) - - @classmethod - def searchchroot(cls, name: str) -> "Chroot": - name = name.removeprefix("chroot:") - name = name.removesuffix("-sbuild") - for path in cls.cache_sbuild.iterdir(): - if path.name.startswith(name + ".t"): - return cls(path) - raise KeyError(name) - - @classmethod - def searchsession(cls, name: str) -> "Chroot": - name = name.removeprefix("session:") - path = cls.cache_unschroot / name - if not path.is_dir(): - raise KeyError(name) - return cls(path) - - @classmethod - def newsession(cls) -> "Chroot": - cls.cache_unschroot.mkdir(parents=True, exist_ok=True) - return Chroot( - pathlib.Path( - tempfile.mkdtemp(prefix="chroot", dir=cls.cache_unschroot) - ), - ) - - @classmethod - def scan_sbuild(cls) -> typing.Iterator["Chroot"]: - if cls.cache_sbuild.is_dir(): - chroots = [] - aliases: dict[str, set[str]] = {} - for path in cls.cache_sbuild.iterdir(): - if path.is_symlink(): - alias = path.name.split(".", 1)[0] + "-sbuild" - aliases.setdefault(str(path.readlink()), set()).add(alias) - elif path.is_file(): - chroots.append(path) - for path in chroots: - yield cls(path, aliases.get(path.name, set())) - - @classmethod - def scan_unschroot(cls) -> typing.Iterator["Chroot"]: - if cls.cache_unschroot.is_dir(): - yield from map(cls, cls.cache_unschroot.iterdir()) - - -def do_info(args: argparse.Namespace) -> None: - """Show information about selected chroots""" - chroots: typing.Iterable[Chroot] - if args.chroot: - try: - chroots = [Chroot.searchchroot(args.chroot)] - except KeyError: - chroots = [Chroot.searchsession(args.chroot)] - else: - chroots = itertools.chain( - Chroot.scan_sbuild(), Chroot.scan_unschroot() - ) - sys.stdout.write("\n".join(chroot.infostr() for chroot in chroots)) - - -def do_begin_session(args: argparse.Namespace) -> None: - """Begin a session; returns the session ID""" - source = Chroot.searchchroot(args.chroot) - session = Chroot.newsession() - uidmap = linuxnamespaces.IDAllocation.loadsubid("uid").allocatemap(65536) - gidmap = linuxnamespaces.IDAllocation.loadsubid("gid").allocatemap(65536) - mainsock, childsock = socket.socketpair() - with TarFile.open(source.path, "r:*") as tarf: - pid = os.fork() - if pid == 0: - mainsock.close() - os.chdir(session.path) - linuxnamespaces.unshare( - linuxnamespaces.CloneFlags.NEWUSER - | linuxnamespaces.CloneFlags.NEWNS, - ) - childsock.send(b"\0") - childsock.recv(1) - childsock.close() - os.setgid(0) - os.setuid(0) - for tmem in tarf: - if not tmem.name.startswith(("dev/", "./dev/")): - tarf.extract(tmem, numeric_owner=True) - etc_hosts = pathlib.Path("./etc/hosts") - if not etc_hosts.exists(): - etc_hosts.write_text( - """127.0.0.1 localhost -127.0.1.1 %s -::1 localhost ip6-localhost ip6-loopback -""" - % socket.gethostname(), - ) - sys.exit(0) - childsock.close() - mainsock.recv(1) - linuxnamespaces.newidmaps(pid, [uidmap], [gidmap]) - linuxnamespaces.unshare_user_idmap( - [uidmap, linuxnamespaces.IDMapping(65536, os.getuid(), 1)], - [gidmap, linuxnamespaces.IDMapping(65536, os.getgid(), 1)], - ) - os.chown(session.path, 0, 0) - session.path.chmod(0o755) - mainsock.send(b"\0") - mainsock.close() - _, ret = os.waitpid(pid, 0) - print(session.name) - sys.exit(ret) - - -def exec_perl_dumb_init(pid: int) -> typing.NoReturn: - """Roughly implement dumb-init in perl: Wait for all children until we - receive an exit from the given pid and forward its status. - """ - os.execlp( - "perl", - "perl", - "-e", - "$r=255<<8;" # exit 255 when we run out of children - "do{" - "$p=wait;" - f"$r=$?,$p=0 if $p=={pid};" - "}while($p>0);" - "exit(0<$r<256?128|$r:$r>>8);", # sig -> 128+sig; exit -> exit - ) - - -def do_run_session(args: argparse.Namespace) -> None: - """Run an existing session""" - session = Chroot.searchsession(args.chroot) - uidmap = linuxnamespaces.IDAllocation.loadsubid("uid").allocatemap(65536) - gidmap = linuxnamespaces.IDAllocation.loadsubid("gid").allocatemap(65536) - mainsock, childsock = socket.socketpair() - pid = os.fork() - pidfd: int - if pid == 0: - mainsock.close() - for fd in (1, 2): - if stat.S_ISFIFO(os.fstat(fd).st_mode): - os.fchmod(fd, 0o666) - os.chdir(session.path) - ns = ( - linuxnamespaces.CloneFlags.NEWUSER - | linuxnamespaces.CloneFlags.NEWNS - | linuxnamespaces.CloneFlags.NEWPID - ) - if args.isolate_network: - ns |= linuxnamespaces.CloneFlags.NEWNET - linuxnamespaces.unshare(ns) - childsock.send(b"\0") - childsock.recv(1) - if os.fork() != 0: - sys.exit(0) - assert os.getpid() == 1 - with linuxnamespaces.FileDescriptor(os.pidfd_open(1, 0)) as pidfd: - socket.send_fds(childsock, [b"\0"], [pidfd]) - os.setgid(0) - os.setuid(0) - linuxnamespaces.bind_mount(".", "/mnt", recursive=True) - os.chdir("/mnt") - linuxnamespaces.populate_sys("/", ".", ns, devices=True) - linuxnamespaces.populate_proc("/", ".", ns) - linuxnamespaces.populate_dev( - "/", ".", tun=bool(ns & linuxnamespaces.CloneFlags.NEWNET) - ) - linuxnamespaces.pivot_root(".", ".") - linuxnamespaces.umount(".", linuxnamespaces.UmountFlags.DETACH) - os.chdir("/") - if ns & linuxnamespaces.CloneFlags.NEWNET: - linuxnamespaces.enable_loopback_if() - if args.user.isdigit(): - spw = pwd.getpwuid(int(args.user)) - else: - spw = pwd.getpwnam(args.user) - supplementary = [ - sgr.gr_gid for sgr in grp.getgrall() if spw.pw_name in sgr.gr_mem - ] - - childsock.recv(1) - childsock.close() - rfd, wfd = linuxnamespaces.FileDescriptor.pipe(inheritable=False) - pid = os.fork() - if pid == 0: - wfd.close() - if args.directory: - os.chdir(args.directory) - os.setgroups(supplementary) - os.setgid(spw.pw_gid) - os.setuid(spw.pw_uid) - if "PATH" not in os.environ: - if spw.pw_uid == 0: - os.environ["PATH"] = "/usr/sbin:/sbin:/usr/bin:/bin" - else: - os.environ["PATH"] = "/usr/bin:/bin" - if not args.command: - args.command.append("bash") - # Wait until Python has handed off to Perl. - os.read(rfd, 1) - os.execvp(args.command[0], args.command) - else: - rfd.close() - linuxnamespaces.prctl_set_pdeathsig(signal.SIGKILL) - os.close(0) - # It is important that we now exec to get rid of our previous - # execution context that carries pieces such as memory maps from - # different namespaces that could allow escalating privileges. The - # exec will close wfd and allow the target process to exec. - exec_perl_dumb_init(pid) - childsock.close() - mainsock.recv(1) - linuxnamespaces.newidmaps(pid, [uidmap], [gidmap]) - linuxnamespaces.prctl_set_child_subreaper(True) - mainsock.send(b"\0") - _data, fds, _flags, _address = socket.recv_fds(mainsock, 1, 1) - pidfd = fds[0] - os.waitpid(pid, 0) - linuxnamespaces.prctl_set_child_subreaper(False) - mainsock.send(b"\0") - wres = os.waitid(os.P_PIDFD, pidfd, os.WEXITED) - assert wres is not None - sys.exit(wres.si_status) - - -def do_end_session(args: argparse.Namespace) -> None: - """End an existing session""" - session = Chroot.searchsession(args.chroot) - uidmap = linuxnamespaces.IDAllocation.loadsubid("uid").allocatemap(65536) - gidmap = linuxnamespaces.IDAllocation.loadsubid("gid").allocatemap(65536) - linuxnamespaces.unshare_user_idmap( - [uidmap, linuxnamespaces.IDMapping(65536, os.getuid(), 1)], - [gidmap, linuxnamespaces.IDMapping(65536, os.getgid(), 1)], - ) - shutil.rmtree(session.path) - - -def main() -> None: - parser = argparse.ArgumentParser() - group = parser.add_mutually_exclusive_group(required=True) - for comm in ("info", "begin-session", "run-session", "end-session"): - func = globals()["do_" + comm.replace("-", "_")] - group.add_argument( - f"-{comm[0]}", - f"--{comm}", - dest="subcommand", - action="store_const", - const=func, - help=func.__doc__, - ) - parser.add_argument( - "-c", - "--chroot", - dest="chroot", - action="store", - help="Use specified chroot", - ) - parser.add_argument("-d", "--directory", action="store") - parser.add_argument("-p", "--preserve-environment", action="store_true") - parser.add_argument("-q", "--quiet", action="store_true") - parser.add_argument("-u", "--user", action="store", default=os.getlogin()) - parser.add_argument("--isolate-network", action="store_true") - parser.add_argument("command", nargs="*") - args = parser.parse_args() - assert args.subcommand is not None - args.subcommand(args) - - -if __name__ == "__main__": - main() |