summaryrefslogtreecommitdiff
path: root/examples/unschroot.py
diff options
context:
space:
mode:
Diffstat (limited to 'examples/unschroot.py')
-rwxr-xr-xexamples/unschroot.py353
1 files changed, 0 insertions, 353 deletions
diff --git a/examples/unschroot.py b/examples/unschroot.py
deleted file mode 100755
index 3d1c900..0000000
--- a/examples/unschroot.py
+++ /dev/null
@@ -1,353 +0,0 @@
-#!/usr/bin/python3
-# Copyright 2024 Helmut Grohne <helmut@subdivi.de>
-# SPDX-License-Identifier: GPL-3
-
-"""Emulate schroot using namespaces sufficiently well that sbuild can deal with
-it but not any better. It assumes that ~/.cache/sbuild contains tars suitable
-for sbuild --chroot-mode=unshare. Additionally, those tars are expected to
-contain the non-essential passwd package. The actual sessions are stored in
-~/.cache/unschroot. For using it with sbuild, your sbuildrc should contain:
-
- $chroot_mode = "schroot";
- $schroot = "/path/to/unschroot";
-"""
-
-
-import argparse
-import functools
-import grp
-import itertools
-import os
-import pathlib
-import pwd
-import shutil
-import signal
-import socket
-import stat
-import sys
-import tempfile
-import typing
-
-if __file__.split("/")[-2:-1] == ["examples"]:
- sys.path.insert(0, "/".join(__file__.split("/")[:-2]))
-
-import linuxnamespaces
-import linuxnamespaces.tarutils
-
-
-class TarFile(
- linuxnamespaces.tarutils.ZstdTarFile, linuxnamespaces.tarutils.XAttrTarFile
-):
- pass
-
-
-class Chroot:
- # Ignore $HOME as sbuild sets to something invalid
- home = pathlib.Path(pwd.getpwuid(os.getuid()).pw_dir)
- cache_sbuild = home / ".cache/sbuild"
- cache_unschroot = home / ".cache/unschroot"
-
- def __init__(self, path: pathlib.Path, aliases: set[str] | None = None):
- self.path = path
- self.aliases = set() if aliases is None else aliases
-
- @functools.cached_property
- def namespace(self) -> str:
- if self.path.is_file():
- return "Chroot"
- if self.path.is_dir():
- return "Session"
- raise ValueError("invalid chroot object")
-
- @functools.cached_property
- def name(self) -> str:
- suffix = "-sbuild" if self.namespace == "Chroot" else ""
- return self.path.name.split(".", 1)[0] + suffix
-
- def infostr(self) -> str:
- lines = [
- f"--- {self.namespace} ---",
- f"Name {self.name}",
- ]
- if self.namespace == "Chroot":
- lines.extend(["Type file", f"File {self.path}"])
- if self.namespace == "Session":
- lines.extend(
- [
- f"Location {self.path}",
- "Session Purged true",
- "Type unshare",
- ]
- )
- lines.append("Aliases " + " ".join(sorted(self.aliases)))
- return "".join(map("%s\n".__mod__, lines))
-
- @classmethod
- def searchchroot(cls, name: str) -> "Chroot":
- name = name.removeprefix("chroot:")
- name = name.removesuffix("-sbuild")
- for path in cls.cache_sbuild.iterdir():
- if path.name.startswith(name + ".t"):
- return cls(path)
- raise KeyError(name)
-
- @classmethod
- def searchsession(cls, name: str) -> "Chroot":
- name = name.removeprefix("session:")
- path = cls.cache_unschroot / name
- if not path.is_dir():
- raise KeyError(name)
- return cls(path)
-
- @classmethod
- def newsession(cls) -> "Chroot":
- cls.cache_unschroot.mkdir(parents=True, exist_ok=True)
- return Chroot(
- pathlib.Path(
- tempfile.mkdtemp(prefix="chroot", dir=cls.cache_unschroot)
- ),
- )
-
- @classmethod
- def scan_sbuild(cls) -> typing.Iterator["Chroot"]:
- if cls.cache_sbuild.is_dir():
- chroots = []
- aliases: dict[str, set[str]] = {}
- for path in cls.cache_sbuild.iterdir():
- if path.is_symlink():
- alias = path.name.split(".", 1)[0] + "-sbuild"
- aliases.setdefault(str(path.readlink()), set()).add(alias)
- elif path.is_file():
- chroots.append(path)
- for path in chroots:
- yield cls(path, aliases.get(path.name, set()))
-
- @classmethod
- def scan_unschroot(cls) -> typing.Iterator["Chroot"]:
- if cls.cache_unschroot.is_dir():
- yield from map(cls, cls.cache_unschroot.iterdir())
-
-
-def do_info(args: argparse.Namespace) -> None:
- """Show information about selected chroots"""
- chroots: typing.Iterable[Chroot]
- if args.chroot:
- try:
- chroots = [Chroot.searchchroot(args.chroot)]
- except KeyError:
- chroots = [Chroot.searchsession(args.chroot)]
- else:
- chroots = itertools.chain(
- Chroot.scan_sbuild(), Chroot.scan_unschroot()
- )
- sys.stdout.write("\n".join(chroot.infostr() for chroot in chroots))
-
-
-def do_begin_session(args: argparse.Namespace) -> None:
- """Begin a session; returns the session ID"""
- source = Chroot.searchchroot(args.chroot)
- session = Chroot.newsession()
- uidmap = linuxnamespaces.IDAllocation.loadsubid("uid").allocatemap(65536)
- gidmap = linuxnamespaces.IDAllocation.loadsubid("gid").allocatemap(65536)
- mainsock, childsock = socket.socketpair()
- with TarFile.open(source.path, "r:*") as tarf:
- pid = os.fork()
- if pid == 0:
- mainsock.close()
- os.chdir(session.path)
- linuxnamespaces.unshare(
- linuxnamespaces.CloneFlags.NEWUSER
- | linuxnamespaces.CloneFlags.NEWNS,
- )
- childsock.send(b"\0")
- childsock.recv(1)
- childsock.close()
- os.setgid(0)
- os.setuid(0)
- for tmem in tarf:
- if not tmem.name.startswith(("dev/", "./dev/")):
- tarf.extract(tmem, numeric_owner=True)
- etc_hosts = pathlib.Path("./etc/hosts")
- if not etc_hosts.exists():
- etc_hosts.write_text(
- """127.0.0.1 localhost
-127.0.1.1 %s
-::1 localhost ip6-localhost ip6-loopback
-"""
- % socket.gethostname(),
- )
- sys.exit(0)
- childsock.close()
- mainsock.recv(1)
- linuxnamespaces.newidmaps(pid, [uidmap], [gidmap])
- linuxnamespaces.unshare_user_idmap(
- [uidmap, linuxnamespaces.IDMapping(65536, os.getuid(), 1)],
- [gidmap, linuxnamespaces.IDMapping(65536, os.getgid(), 1)],
- )
- os.chown(session.path, 0, 0)
- session.path.chmod(0o755)
- mainsock.send(b"\0")
- mainsock.close()
- _, ret = os.waitpid(pid, 0)
- print(session.name)
- sys.exit(ret)
-
-
-def exec_perl_dumb_init(pid: int) -> typing.NoReturn:
- """Roughly implement dumb-init in perl: Wait for all children until we
- receive an exit from the given pid and forward its status.
- """
- os.execlp(
- "perl",
- "perl",
- "-e",
- "$r=255<<8;" # exit 255 when we run out of children
- "do{"
- "$p=wait;"
- f"$r=$?,$p=0 if $p=={pid};"
- "}while($p>0);"
- "exit(0<$r<256?128|$r:$r>>8);", # sig -> 128+sig; exit -> exit
- )
-
-
-def do_run_session(args: argparse.Namespace) -> None:
- """Run an existing session"""
- session = Chroot.searchsession(args.chroot)
- uidmap = linuxnamespaces.IDAllocation.loadsubid("uid").allocatemap(65536)
- gidmap = linuxnamespaces.IDAllocation.loadsubid("gid").allocatemap(65536)
- mainsock, childsock = socket.socketpair()
- pid = os.fork()
- pidfd: int
- if pid == 0:
- mainsock.close()
- for fd in (1, 2):
- if stat.S_ISFIFO(os.fstat(fd).st_mode):
- os.fchmod(fd, 0o666)
- os.chdir(session.path)
- ns = (
- linuxnamespaces.CloneFlags.NEWUSER
- | linuxnamespaces.CloneFlags.NEWNS
- | linuxnamespaces.CloneFlags.NEWPID
- )
- if args.isolate_network:
- ns |= linuxnamespaces.CloneFlags.NEWNET
- linuxnamespaces.unshare(ns)
- childsock.send(b"\0")
- childsock.recv(1)
- if os.fork() != 0:
- sys.exit(0)
- assert os.getpid() == 1
- with linuxnamespaces.FileDescriptor(os.pidfd_open(1, 0)) as pidfd:
- socket.send_fds(childsock, [b"\0"], [pidfd])
- os.setgid(0)
- os.setuid(0)
- linuxnamespaces.bind_mount(".", "/mnt", recursive=True)
- os.chdir("/mnt")
- linuxnamespaces.populate_sys("/", ".", ns, devices=True)
- linuxnamespaces.populate_proc("/", ".", ns)
- linuxnamespaces.populate_dev(
- "/", ".", tun=bool(ns & linuxnamespaces.CloneFlags.NEWNET)
- )
- linuxnamespaces.pivot_root(".", ".")
- linuxnamespaces.umount(".", linuxnamespaces.UmountFlags.DETACH)
- os.chdir("/")
- if ns & linuxnamespaces.CloneFlags.NEWNET:
- linuxnamespaces.enable_loopback_if()
- if args.user.isdigit():
- spw = pwd.getpwuid(int(args.user))
- else:
- spw = pwd.getpwnam(args.user)
- supplementary = [
- sgr.gr_gid for sgr in grp.getgrall() if spw.pw_name in sgr.gr_mem
- ]
-
- childsock.recv(1)
- childsock.close()
- rfd, wfd = linuxnamespaces.FileDescriptor.pipe(inheritable=False)
- pid = os.fork()
- if pid == 0:
- wfd.close()
- if args.directory:
- os.chdir(args.directory)
- os.setgroups(supplementary)
- os.setgid(spw.pw_gid)
- os.setuid(spw.pw_uid)
- if "PATH" not in os.environ:
- if spw.pw_uid == 0:
- os.environ["PATH"] = "/usr/sbin:/sbin:/usr/bin:/bin"
- else:
- os.environ["PATH"] = "/usr/bin:/bin"
- if not args.command:
- args.command.append("bash")
- # Wait until Python has handed off to Perl.
- os.read(rfd, 1)
- os.execvp(args.command[0], args.command)
- else:
- rfd.close()
- linuxnamespaces.prctl_set_pdeathsig(signal.SIGKILL)
- os.close(0)
- # It is important that we now exec to get rid of our previous
- # execution context that carries pieces such as memory maps from
- # different namespaces that could allow escalating privileges. The
- # exec will close wfd and allow the target process to exec.
- exec_perl_dumb_init(pid)
- childsock.close()
- mainsock.recv(1)
- linuxnamespaces.newidmaps(pid, [uidmap], [gidmap])
- linuxnamespaces.prctl_set_child_subreaper(True)
- mainsock.send(b"\0")
- _data, fds, _flags, _address = socket.recv_fds(mainsock, 1, 1)
- pidfd = fds[0]
- os.waitpid(pid, 0)
- linuxnamespaces.prctl_set_child_subreaper(False)
- mainsock.send(b"\0")
- wres = os.waitid(os.P_PIDFD, pidfd, os.WEXITED)
- assert wres is not None
- sys.exit(wres.si_status)
-
-
-def do_end_session(args: argparse.Namespace) -> None:
- """End an existing session"""
- session = Chroot.searchsession(args.chroot)
- uidmap = linuxnamespaces.IDAllocation.loadsubid("uid").allocatemap(65536)
- gidmap = linuxnamespaces.IDAllocation.loadsubid("gid").allocatemap(65536)
- linuxnamespaces.unshare_user_idmap(
- [uidmap, linuxnamespaces.IDMapping(65536, os.getuid(), 1)],
- [gidmap, linuxnamespaces.IDMapping(65536, os.getgid(), 1)],
- )
- shutil.rmtree(session.path)
-
-
-def main() -> None:
- parser = argparse.ArgumentParser()
- group = parser.add_mutually_exclusive_group(required=True)
- for comm in ("info", "begin-session", "run-session", "end-session"):
- func = globals()["do_" + comm.replace("-", "_")]
- group.add_argument(
- f"-{comm[0]}",
- f"--{comm}",
- dest="subcommand",
- action="store_const",
- const=func,
- help=func.__doc__,
- )
- parser.add_argument(
- "-c",
- "--chroot",
- dest="chroot",
- action="store",
- help="Use specified chroot",
- )
- parser.add_argument("-d", "--directory", action="store")
- parser.add_argument("-p", "--preserve-environment", action="store_true")
- parser.add_argument("-q", "--quiet", action="store_true")
- parser.add_argument("-u", "--user", action="store", default=os.getlogin())
- parser.add_argument("--isolate-network", action="store_true")
- parser.add_argument("command", nargs="*")
- args = parser.parse_args()
- assert args.subcommand is not None
- args.subcommand(args)
-
-
-if __name__ == "__main__":
- main()