summaryrefslogtreecommitdiff
path: root/examples/unschroot.py
diff options
context:
space:
mode:
Diffstat (limited to 'examples/unschroot.py')
-rwxr-xr-xexamples/unschroot.py483
1 files changed, 0 insertions, 483 deletions
diff --git a/examples/unschroot.py b/examples/unschroot.py
deleted file mode 100755
index 43c4ea3..0000000
--- a/examples/unschroot.py
+++ /dev/null
@@ -1,483 +0,0 @@
-#!/usr/bin/python3
-# Copyright 2024 Helmut Grohne <helmut@subdivi.de>
-# SPDX-License-Identifier: GPL-3
-
-"""Emulate schroot using namespaces sufficiently well that sbuild can deal with
-it but not any better. It assumes that ~/.cache/sbuild contains tars suitable
-for sbuild --chroot-mode=unshare. Additionally, those tars are expected to
-contain the non-essential passwd package. The actual sessions are stored in
-~/.cache/unschroot. For using it with sbuild, your sbuildrc should contain:
-
- $chroot_mode = "schroot";
- $schroot = "/path/to/unschroot";
-"""
-
-
-import argparse
-import grp
-import os
-import pathlib
-import pwd
-import shutil
-import signal
-import socket
-import stat
-import sys
-import tempfile
-import typing
-
-if __file__.split("/")[-2:-1] == ["examples"]:
- sys.path.insert(0, "/".join(__file__.split("/")[:-2]))
-
-import linuxnamespaces
-import linuxnamespaces.tarutils
-
-
-class TarFile(
- linuxnamespaces.tarutils.ZstdTarFile, linuxnamespaces.tarutils.XAttrTarFile
-):
- pass
-
-
-def write_etc_hosts(root: os.PathLike[str] | str) -> None:
- etc_hosts = pathlib.Path(root) / "etc/hosts"
- if not etc_hosts.exists():
- etc_hosts.write_text(
- """127.0.0.1 localhost
-127.0.1.1 %s
-::1 localhost ip6-localhost ip6-loopback
-"""
- % socket.gethostname(),
- encoding="ascii",
- )
-
-
-def load_subids() -> (
- tuple[linuxnamespaces.IDMapping, linuxnamespaces.IDMapping]
-):
- return (
- linuxnamespaces.IDAllocation.loadsubid("uid").allocatemap(65536),
- linuxnamespaces.IDAllocation.loadsubid("gid").allocatemap(65536),
- )
-
-
-# Ignore $HOME as sbuild sets to something invalid
-HOME = pathlib.Path(pwd.getpwuid(os.getuid()).pw_dir)
-CACHE_SBUILD = HOME / ".cache/sbuild"
-CACHE_UNSCHROOT = HOME / ".cache/unschroot"
-CACHE_DIRECTORY_CHROOTS = HOME / ".cache/directory_chroots"
-
-
-class ChrootBase:
- namespace: str
- name: str
-
- def __init__(self) -> None:
- self.aliases: set[str] = set()
-
- def infodata(self) -> dict[str, str]:
- return {
- "Name": self.name,
- "Aliases": " ".join(sorted(self.aliases)),
- }
-
- def infostr(self) -> str:
- return f"--- {self.namespace} ---\n" + "".join(
- map("%s %s\n".__mod__, self.infodata().items())
- )
-
-
-class SourceChroot(ChrootBase):
- namespace = "Chroot"
-
- def newsession(self) -> "SessionChroot":
- raise NotImplementedError
-
-
-class SessionChroot(ChrootBase):
- namespace = "Session"
-
- def infodata(self) -> dict[str, str]:
- data = super().infodata()
- data["Session Purged"] = "true"
- data["Type"] = "unshare"
- return data
-
- def mount(self) -> pathlib.Path:
- raise NotImplementedError
-
-
-class TarSourceChroot(SourceChroot):
- def __init__(self, path: pathlib.Path):
- super().__init__()
- self.path = path
- self.name = path.name.split(".", 1)[0] + "-sbuild"
-
- def infodata(self) -> dict[str, str]:
- data = super().infodata()
- data["Type"] = "file"
- data["File"] = str(self.path)
- return data
-
- def newsession(self) -> "TarSessionChroot":
- CACHE_UNSCHROOT.mkdir(parents=True, exist_ok=True)
- session = TarSessionChroot(
- pathlib.Path(tempfile.mkdtemp(prefix="tar-", dir=CACHE_UNSCHROOT)),
- )
-
- uidmap, gidmap = load_subids()
- mainsock, childsock = socket.socketpair()
- with TarFile.open(self.path, "r:*") as tarf:
- pid = os.fork()
- if pid == 0:
- mainsock.close()
- os.chdir(session.path)
- linuxnamespaces.unshare(
- linuxnamespaces.CloneFlags.NEWUSER
- | linuxnamespaces.CloneFlags.NEWNS,
- )
- childsock.send(b"\0")
- childsock.recv(1)
- childsock.close()
- os.setgid(0)
- os.setuid(0)
- for tmem in tarf:
- if not tmem.name.startswith(("dev/", "./dev/")):
- tarf.extract(tmem, numeric_owner=True)
- write_etc_hosts(".")
- sys.exit(0)
- childsock.close()
- mainsock.recv(1)
- pid2 = os.fork()
- if pid2 == 0:
- linuxnamespaces.unshare_user_idmap(
- [uidmap, linuxnamespaces.IDMapping(65536, os.getuid(), 1)],
- [gidmap, linuxnamespaces.IDMapping(65536, os.getgid(), 1)],
- )
- os.chown(session.path, 0, 0)
- session.path.chmod(0o755)
- sys.exit(0)
- linuxnamespaces.newidmaps(pid, [uidmap], [gidmap])
- _, ret = os.waitpid(pid2, 0)
- assert ret == 0
- mainsock.send(b"\0")
- mainsock.close()
- _, ret = os.waitpid(pid, 0)
- assert ret == 0
- return session
-
-
-class TarSessionChroot(SessionChroot):
- def __init__(self, path: pathlib.Path):
- super().__init__()
- self.path = path
- self.name = path.name
-
- def mount(self) -> pathlib.Path:
- linuxnamespaces.bind_mount(self.path, "/mnt", recursive=True)
- return pathlib.Path("/mnt")
-
-
-class DirectorySourceChroot(SourceChroot):
- def __init__(self, path: pathlib.Path):
- super().__init__()
- self.path = path
- self.name = path.name + "-sbuild"
-
- def infodata(self) -> dict[str, str]:
- data = super().infodata()
- data["Type"] = "directory"
- data["Directory"] = str(self.path)
- return data
-
- def newsession(self) -> "DirectorySessionChroot":
- CACHE_UNSCHROOT.mkdir(parents=True, exist_ok=True)
- path = pathlib.Path(
- tempfile.mkdtemp(
- prefix=f"overlay-{self.name}-", dir=CACHE_UNSCHROOT
- ),
- )
- session = DirectorySessionChroot(self, path)
- uidmap, gidmap = load_subids()
- pid = os.fork()
- if pid == 0:
- linuxnamespaces.unshare_user_idmap(
- [uidmap, linuxnamespaces.IDMapping(65536, os.getuid(), 1)],
- [gidmap, linuxnamespaces.IDMapping(65536, os.getgid(), 1)],
- )
- os.setgid(0)
- os.setuid(0)
- os.chown(path, 0, 0)
- path.chmod(0o755)
- (path / "upper").mkdir()
- (path / "work").mkdir()
- if not (self.path / "etc/hosts").exists():
- (path / "upper/etc").mkdir()
- write_etc_hosts(path / "upper")
- sys.exit(0)
- _, ret = os.waitpid(pid, 0)
- assert ret == 0
- return session
-
-
-class DirectorySessionChroot(SessionChroot):
- def __init__(self, source: DirectorySourceChroot, path: pathlib.Path):
- super().__init__()
- self.source = source
- self.path = path
- self.name = path.name
-
- def infodata(self) -> dict[str, str]:
- data = super().infodata()
- data["Type"] = "directory"
- data["Directory"] = str(self.source.path)
- # It's a gross lie, but sbuild does not work without. It has to
- # actually exist and should not occur inside build logs.
- data["Location"] = str(self.source.path)
- return data
-
- def mount(self) -> pathlib.Path:
- mnt = "/mnt"
- linuxnamespaces.mount(
- "overlay",
- mnt,
- "overlay",
- data={
- "lowerdir": str(self.source.path),
- "upperdir": str(self.path / "upper"),
- "workdir": str(self.path / "work"),
- "userxattr": None,
- },
- )
- return pathlib.Path(mnt)
-
-
-def scan_chroots() -> dict[str, ChrootBase]:
- chrootmap: dict[str, ChrootBase] = {}
- chroot: ChrootBase
- for loc, cls in (
- (CACHE_SBUILD, TarSourceChroot),
- (CACHE_DIRECTORY_CHROOTS, DirectorySourceChroot),
- ):
- if loc.is_dir():
- chroots = []
- aliases: dict[str, set[str]] = {}
- for path in loc.iterdir():
- if path.is_symlink():
- alias = path.name.split(".", 1)[0] + "-sbuild"
- aliases.setdefault(str(path.readlink()), set()).add(alias)
- else:
- chroots.append(path)
- for path in chroots:
- chroot = cls(path)
- chrootaliases = aliases.get(path.name, set())
- chroot.aliases.update(chrootaliases)
- if chroot.name not in chrootmap:
- chrootmap[chroot.name] = chroot
- for alias in chrootaliases:
- if alias not in chrootmap:
- chrootmap[alias] = chroot
-
- if CACHE_UNSCHROOT.is_dir():
- for path in CACHE_UNSCHROOT.iterdir():
- if path.name.startswith("tar-"):
- chroot = TarSessionChroot(path)
- if chroot.name not in chrootmap:
- chrootmap[chroot.name] = chroot
- elif path.name.startswith("overlay-"):
- base = "-".join(path.name.split("-")[1:-1])
- if base not in chrootmap:
- continue
- source = chrootmap[base]
- assert isinstance(source, DirectorySourceChroot)
- chroot = DirectorySessionChroot(source, path)
- if chroot.name not in chrootmap:
- chrootmap[chroot.name] = chroot
-
- return chrootmap
-
-
-def do_info(args: argparse.Namespace) -> None:
- """Show information about selected chroots"""
- chrootmap = scan_chroots()
- chroots: typing.Iterable[ChrootBase]
- if args.chroot:
- chroots = [
- chrootmap[
- args.chroot.removeprefix("chroot:").removeprefix("session:")
- ],
- ]
- else:
- chroots = chrootmap.values()
- sys.stdout.write("\n".join(chroot.infostr() for chroot in chroots))
-
-
-def do_begin_session(args: argparse.Namespace) -> None:
- """Begin a session; returns the session ID"""
- chrootmap = scan_chroots()
- source = chrootmap[args.chroot.removeprefix("chroot:")]
- assert isinstance(source, SourceChroot)
- session = source.newsession()
- print(session.name)
-
-
-def exec_perl_dumb_init(pid: int) -> typing.NoReturn:
- """Roughly implement dumb-init in perl: Wait for all children until we
- receive an exit from the given pid and forward its status.
- """
- os.execlp(
- "perl",
- "perl",
- "-e",
- "$r=255<<8;" # exit 255 when we run out of children
- "do{"
- "$p=wait;"
- f"$r=$?,$p=0 if $p=={pid};"
- "}while($p>0);"
- "exit(0<$r<256?128|$r:$r>>8);", # sig -> 128+sig; exit -> exit
- )
-
-
-def do_run_session(args: argparse.Namespace) -> None:
- """Run an existing session"""
- chrootmap = scan_chroots()
- session = chrootmap[args.chroot]
- assert isinstance(session, SessionChroot)
- uidmap, gidmap = load_subids()
- mainsock, childsock = socket.socketpair()
- pid = os.fork()
- pidfd: int
- if pid == 0:
- mainsock.close()
- for fd in (1, 2):
- if stat.S_ISFIFO(os.fstat(fd).st_mode):
- os.fchmod(fd, 0o666)
- ns = (
- linuxnamespaces.CloneFlags.NEWUSER
- | linuxnamespaces.CloneFlags.NEWNS
- | linuxnamespaces.CloneFlags.NEWPID
- )
- if args.isolate_network:
- ns |= linuxnamespaces.CloneFlags.NEWNET
- linuxnamespaces.unshare(ns)
- childsock.send(b"\0")
- childsock.recv(1)
- if os.fork() != 0:
- sys.exit(0)
- assert os.getpid() == 1
- with linuxnamespaces.FileDescriptor(os.pidfd_open(1, 0)) as pidfd:
- socket.send_fds(childsock, [b"\0"], [pidfd])
- os.setgid(0)
- os.setuid(0)
- root = session.mount()
- os.chdir(root)
- linuxnamespaces.populate_sys("/", ".", ns, devices=True)
- linuxnamespaces.populate_proc("/", ".", ns)
- linuxnamespaces.populate_dev(
- "/", ".", tun=bool(ns & linuxnamespaces.CloneFlags.NEWNET)
- )
- linuxnamespaces.pivot_root(".", ".")
- linuxnamespaces.umount(".", linuxnamespaces.UmountFlags.DETACH)
- os.chdir("/")
- if ns & linuxnamespaces.CloneFlags.NEWNET:
- linuxnamespaces.enable_loopback_if()
- if args.user.isdigit():
- spw = pwd.getpwuid(int(args.user))
- else:
- spw = pwd.getpwnam(args.user)
- supplementary = [
- sgr.gr_gid for sgr in grp.getgrall() if spw.pw_name in sgr.gr_mem
- ]
-
- childsock.recv(1)
- childsock.close()
- rfd, wfd = linuxnamespaces.FileDescriptor.pipe(inheritable=False)
- pid = os.fork()
- if pid == 0:
- wfd.close()
- if args.directory:
- os.chdir(args.directory)
- os.setgroups(supplementary)
- os.setgid(spw.pw_gid)
- os.setuid(spw.pw_uid)
- if "PATH" not in os.environ:
- if spw.pw_uid == 0:
- os.environ["PATH"] = "/usr/sbin:/sbin:/usr/bin:/bin"
- else:
- os.environ["PATH"] = "/usr/bin:/bin"
- if not args.command:
- args.command.append("bash")
- # Wait until Python has handed off to Perl.
- os.read(rfd, 1)
- os.execvp(args.command[0], args.command)
- else:
- rfd.close()
- linuxnamespaces.prctl_set_pdeathsig(signal.SIGKILL)
- os.close(0)
- # It is important that we now exec to get rid of our previous
- # execution context that carries pieces such as memory maps from
- # different namespaces that could allow escalating privileges. The
- # exec will close wfd and allow the target process to exec.
- exec_perl_dumb_init(pid)
- childsock.close()
- mainsock.recv(1)
- linuxnamespaces.newidmaps(pid, [uidmap], [gidmap])
- linuxnamespaces.prctl_set_child_subreaper(True)
- mainsock.send(b"\0")
- _data, fds, _flags, _address = socket.recv_fds(mainsock, 1, 1)
- pidfd = fds[0]
- os.waitpid(pid, 0)
- linuxnamespaces.prctl_set_child_subreaper(False)
- mainsock.send(b"\0")
- wres = os.waitid(os.P_PIDFD, pidfd, os.WEXITED)
- assert wres is not None
- sys.exit(wres.si_status)
-
-
-def do_end_session(args: argparse.Namespace) -> None:
- """End an existing session"""
- chrootmap = scan_chroots()
- session = chrootmap[args.chroot]
- assert isinstance(session, (TarSessionChroot, DirectorySessionChroot))
- uidmap = linuxnamespaces.IDAllocation.loadsubid("uid").allocatemap(65536)
- gidmap = linuxnamespaces.IDAllocation.loadsubid("gid").allocatemap(65536)
- linuxnamespaces.unshare_user_idmap(
- [uidmap, linuxnamespaces.IDMapping(65536, os.getuid(), 1)],
- [gidmap, linuxnamespaces.IDMapping(65536, os.getgid(), 1)],
- )
- shutil.rmtree(session.path)
-
-
-def main() -> None:
- parser = argparse.ArgumentParser()
- group = parser.add_mutually_exclusive_group(required=True)
- for comm in ("info", "begin-session", "run-session", "end-session"):
- func = globals()["do_" + comm.replace("-", "_")]
- group.add_argument(
- f"-{comm[0]}",
- f"--{comm}",
- dest="subcommand",
- action="store_const",
- const=func,
- help=func.__doc__,
- )
- parser.add_argument(
- "-c",
- "--chroot",
- dest="chroot",
- action="store",
- help="Use specified chroot",
- )
- parser.add_argument("-d", "--directory", action="store")
- parser.add_argument("-p", "--preserve-environment", action="store_true")
- parser.add_argument("-q", "--quiet", action="store_true")
- parser.add_argument("-u", "--user", action="store", default=os.getlogin())
- parser.add_argument("--isolate-network", action="store_true")
- parser.add_argument("command", nargs="*")
- args = parser.parse_args()
- assert args.subcommand is not None
- args.subcommand(args)
-
-
-if __name__ == "__main__":
- main()