#!/usr/bin/python3 # Copyright 2025 Helmut Grohne # SPDX-License-Identifier: GPL-3 """Emulate schroot using namespaces and a background session process sufficiently well that sbuild can deal with it but not any better. For using it with sbuild, your sbuildrc should contain: $chroot_mode = "schroot"; $schroot = "/path/to/unschroot"; It may automatically discover chroots from ~/.cache/sbuild in the layout that sbuild's unshare mode consumes, but you may also create a ~/.config/unschroot.ini to apply more detailed configuration. State and sessions are retained via a background process and namespaces as well as a varlink socket below $RUNTIME_DIR/unschroot. """ import argparse import asyncio import collections.abc import configparser import contextlib import errno import functools import itertools import os import pathlib import pwd import signal import socket import stat import sys import tempfile import typing import uuid import platformdirs if __file__.split("/")[-2:-1] == ["examples"]: sys.path.insert(0, "/".join(__file__.split("/")[:-2])) import asyncvarlink import asyncvarlink.serviceinterface import linuxnamespaces import linuxnamespaces.tarutils class TarFile( linuxnamespaces.tarutils.ZstdTarFile, linuxnamespaces.tarutils.XAttrTarFile ): """A TarFile subclass that handles both zstd compressed archives and extended attributes. """ # Ignore $HOME as sbuild sets to something invalid HOME = pathlib.Path(pwd.getpwuid(os.getuid()).pw_dir) UNSCHROOT_CONFIG = HOME / ".config/unschroot.ini" CACHE_SBUILD = HOME / ".cache/sbuild" CACHE_DIRECTORY_CHROOTS = HOME / ".cache/directory_chroots" _P = typing.ParamSpec("_P") _T = typing.TypeVar("_T") def async_as_sync( func: typing.Callable[_P, typing.Coroutine[typing.Any, typing.Any, _T]] ) -> typing.Callable[_P, _T]: """Turn an async function into a sync one by running it its own loop.""" @functools.wraps(func) def wrapped(*args: _P.args, **kwargs: _P.kwargs) -> _T: return asyncio.run(func(*args, **kwargs)) return wrapped def run_here(func: typing.Callable[[], _T]) -> _T: """A decorator to run the given function right where it is defined once. It replaces the functionwith its call result. """ return func() def runtime_path() -> pathlib.Path: """Return the location where IPC sockets are to be stored.""" return platformdirs.user_runtime_path("unschroot") def path_below(path: str) -> pathlib.Path: """Take a relative or absolute path, anchor it in / and return a relative version of it. """ parts: list[str] = [] for part in pathlib.PurePath("/", path).relative_to("/").parts: if part == ".." and parts: parts.pop() else: parts.append(part) return pathlib.Path(*parts) class ChrootBase: """Base class for chroots from a schroot way of looking at them.""" namespace: str def __init__(self, name: str): self.name = name def aliases(self) -> set[str]: """Return the set of alternative names recorded for this chroot.""" raise NotImplementedError def infodata(self) -> dict[str, str]: """Return a mapping with information needed for schroot -i.""" return { "Name": self.name, "Aliases": " ".join(sorted(self.aliases())), } def infostr(self) -> str: """Construct the schroot -i output for this chroot.""" return f"--- {self.namespace} ---\n" + "".join( map("%s %s\n".__mod__, self.infodata().items()) ) class SourceChroot(ChrootBase): """Represent a schroot source chroot to be instantiated into a session.""" namespace = "Chroot" def __init__(self, name: str, config: collections.abc.Mapping[str, str]): super().__init__(name) self.config = config def aliases(self) -> set[str]: try: aliasstr = self.config["aliases"] except KeyError: return set() return set(map(str.strip, aliasstr.split(","))) def infodata(self) -> dict[str, str]: data = super().infodata() if ( self.config.get("rootfstype") in ("none", None) and self.config.get("rootfsextract") ): data["Type"] = "file" data["File"] = self.config["rootfsextract"] elif ( self.config["rootfstype"] == "bind" and self.config.get("rootfsdir") ): data["Type"] = "directory" data["Directory"] = self.config["rootfsdir"] elif ( self.config["rootfstype"].startswith("fuse.") and self.config.get("rootfsdev") ): data["Type"] = "file" data["File"] = self.config["rootfsdev"] else: assert False, f"unexpected chroot configuration {self.config!r}" return data async def mount_fuse( self, proxy: asyncvarlink.VarlinkInterfaceProxy, device: str, target: str, subtype: str, options: list[str], ) -> None: """Mount a single fuse filesystem in the proxied process.""" driver = { "bind": "bindfs", "erofs": "erofsfuse", "ext4": "fuse2fs", "squashfs": "squashfuse", }[subtype] with ( await proxy.MountFuse( source=device, target=target, options={ "rootmode": "040755", "user_id": 0, "group_id": 0, "allow_other": None, }, fstype=subtype, ) as mountres, mountres["fusefd"] as fusefd, ): @linuxnamespaces.run_in_fork.now def _() -> None: close_all_but([0, 1, 2, fusefd]) os.execvp( driver, [ driver, *itertools.chain.from_iterable( zip(itertools.repeat("-o"), options) ), str(pathlib.Path(device).expanduser()), f"/dev/fd/{fusefd.fileno()}", ], ) async def mount( self, proxy: asyncvarlink.VarlinkInterfaceProxy, backingdir: str | None, ) -> None: """Create the root filesystem and chdir the supervisor to it.""" match self.config.get("backingstore"): case "tmpfs" | None: await proxy.Chdir(path="/opt") await proxy.MountTmpfs(options={"mode": "0755"}) case "directory": await proxy.Chdir(path=backingdir) case _: raise NotImplementedError("unsupported backingstore") await proxy.Mkdir(path="lower") match self.config.get("rootfstype", "").split("."): case ["none"] | [""]: pass case ["bind"]: await proxy.BindMount( source=str( pathlib.Path(self.config["rootfsdir"]).expanduser() ), target="lower", readonly=configparser.ConfigParser.BOOLEAN_STATES[ self.config.get("overlayfs", "false").lower() ], ) case ["fuse", subtype]: await self.mount_fuse( proxy, self.config["rootfsdev"], "lower", subtype, [] ) case _: raise NotImplementedError("unsupported rootfstype") if self.config.get("overlayfs"): assert self.config.get("rootfstype") not in ("none", None) await proxy.Mkdir(path="work") await proxy.Mkdir(path="upper") await proxy.Mkdir(path="mnt") # Mount to a subdir such that we may chdir("..") await proxy.Mkdir(path="mnt/mnt") await proxy.MountOverlayfs( lower="lower", upper="upper", work="work", target="mnt/mnt", ) await proxy.Chdir(path="mnt/mnt") if self.config.get("rootfsextract"): tar = pathlib.Path(self.config["rootfsextract"]) extra = {} if tar.suffix in (".tzst", ".tzstd", ".zst", ".zstd"): extra["comptype"] = "zst" with tar.expanduser().open("rb") as tarf: await proxy.ExtractTar(tar=tarf, **extra) for mount in self.config.get("mounts", "").split(): [device, target, fstypestr, *options] = mount.split(",") fstype = fstypestr.split(".", 1) match fstype: case ['bind']: readonly = False if options == ["ro"]: readonly = True elif options: raise NotImplementedError( "unsupported bind mount option" ) await proxy.BindMount( source=str(pathlib.Path(device).expanduser()), target=target, readonly=readonly, ) case ['fuse', subtype]: await self.mount_fuse( proxy, device, target, subtype, options ) class SessionChroot(ChrootBase): """Represent a schroot session. It's name is the basename of the IPC socket. """ namespace = "Session" def aliases(self) -> set[str]: return set() def infodata(self) -> dict[str, str]: data = super().infodata() data["Session Purged"] = "true" data["Type"] = "unshare" # It's a gross lie. It has to exist as a directory or sbuild won't # work. data["Location"] = "/opt" return data def load_config() -> configparser.ConfigParser: config = configparser.ConfigParser(interpolation=None, delimiters=("=",)) config.read([UNSCHROOT_CONFIG]) return config def scan_chroots() -> dict[str, ChrootBase]: """Scan chroots: * ~/.config/unschroot.ini trumps * ~/.cache/sbuild automatic tar-based * ~/.cache/directory_chroots automatic overlay * sessions """ config = load_config() chrootmap: dict[str, ChrootBase] = {} chroot: ChrootBase chrootconfig: collections.abc.Mapping[str, str] for name, chrootconfig in config.items(): if name == "DEFAULT": continue chroot = SourceChroot(name, chrootconfig) chrootmap[name] = chroot for loc in (CACHE_SBUILD, CACHE_DIRECTORY_CHROOTS): if loc.is_dir(): chroots = [] aliases: dict[str, set[str]] = {} for path in loc.iterdir(): if path.is_symlink(): alias = path.name.split(".", 1)[0] aliases.setdefault(str(path.readlink()), set()).add(alias) else: chroots.append(path) for path in chroots: if loc == CACHE_SBUILD: chrootconfig = { "rootfstype": "none", "rootfsextract": str(path), } else: chrootconfig = { "rootfstype": "directory", "rootfsdir": str(path), } chrootaliases = aliases.get(path.name, set()) if aliases: chrootconfig["aliases"] = ",".join(sorted(chrootaliases)) chroot = SourceChroot(path.name.split(".", 1)[0], chrootconfig) if chroot.name not in chrootmap: chrootmap[chroot.name] = chroot for chroot in list(chrootmap.values()): for alias in chroot.aliases(): if alias not in chrootmap: chrootmap[alias] = chroot rtdir = runtime_path() if rtdir.is_dir(): for sock in rtdir.iterdir(): if sock.name not in chrootmap: chrootmap[sock.name] = SessionChroot(sock.name) return chrootmap def getpwchroot( user: str | int, rootdir: pathlib.Path = pathlib.Path("/") ) -> pwd.struct_passwd: """Look up the passwd record for a given user (name or uid) in the passwd database of the given root directory. Deliberately use only the plain files avoiding LDAP and NIS to allow working with a chroot. Similar to getpwnam and getpwduid, raise a KeyError if the user cannot be found. """ if isinstance(user, str) and user.isdigit(): user = int(user) with (rootdir / "etc/passwd").open("r", encoding="utf8") as passwdf: for line in passwdf: parts = line.split(":") # Skip over invalid records and bad data if len(parts) != 7: continue try: uid = int(parts[2]) gid = int(parts[3]) except ValueError: continue if uid < 0 or gid < 0 or uid > 0x1FFFFFFF or gid > 0x1FFFFFFF: continue if user == parts[0] if isinstance(user, str) else user == uid: return pwd.struct_passwd((*parts[:2], uid, gid, *parts[4:])) raise KeyError(user) def remap_fds(fds: list[linuxnamespaces.FileDescriptor | None]) -> None: """Change the current processes' file descriptors such that each of the given file descriptors is renumbered to its list index and every other fiel descriptor is closed. """ # Renumber fds such that every entry is at least as large as its index. nextfd = max(max(filter(None, fds), default=0) + 1, len(fds)) for targetfd, sourcefd in enumerate(list(fds)): if sourcefd is not None and sourcefd < targetfd: fds[targetfd] = sourcefd.dup2(nextfd) nextfd += 1 for targetfd, sourcefd in enumerate(fds): if sourcefd is None: try: os.close(targetfd) except OSError as err: if err.errno != errno.EBADFD: raise elif sourcefd != targetfd: sourcefd.dup2(targetfd) os.closerange(len(fds), 0x7FFFFFFF) def close_all_but( fds: typing.Iterable[linuxnamespaces.FileDescriptorLike], ) -> None: """Close all file descriptors but the ones gives.""" nextfd = 0 for fd in sorted(map(linuxnamespaces.FileDescriptor, fds)): if nextfd >= fd: nextfd += 1 else: os.closerange(nextfd, fd) nextfd = fd + 1 if nextfd < 0x7FFFFFFF: os.closerange(nextfd, 0x7FFFFFFF) def clean_directory( directory: pathlib.Path, statres: os.stat_result | None = None ) -> None: """Recursively delete/umount the given directory.""" if statres is None: statres = directory.stat() for entry in list(directory.iterdir()): while True: est = entry.lstat() if statres.st_dev == est.st_dev: break linuxnamespaces.umount(entry, linuxnamespaces.UmountFlags.DETACH) if stat.S_ISDIR(est.st_mode): clean_directory(entry, est) else: entry.unlink() directory.rmdir() class ContainerError( asyncvarlink.TypedVarlinkErrorReply, interface="de.subdivi.unschroot" ): class Parameters: message: str class ContainerSupervisor(asyncvarlink.VarlinkInterface): name = "de.subdivi.unschroot.Container" def __init__(self) -> None: self.terminate_future = asyncio.get_running_loop().create_future() self.cleanup_directories: list[pathlib.Path] = [] @asyncvarlink.varlinkmethod def Terminate(self) -> None: """Terminate the container.""" if not self.terminate_future.done(): self.terminate_future.set_result(None) for directory in self.cleanup_directories: clean_directory(directory) @asyncvarlink.varlinkmethod def AddCleanup(self, *, directory: str) -> None: """Register the given directory for deletion at termination.""" self.cleanup_directories.append(pathlib.Path(directory)) @asyncvarlink.varlinkmethod def MountTmpfs( self, target: str = "/", options: dict[str, str | None] | None = None, ) -> None: """Mount a tmpfs to the given location. The target path is understood as rooted in the backing store.""" target_path = path_below(target) linuxnamespaces.mount( "tmpfs", target_path, "tmpfs", data=options, ) if target_path == pathlib.Path("."): os.chdir(os.getcwd()) @asyncvarlink.varlinkmethod async def ExtractTar( self, tar: asyncvarlink.FileDescriptor, comptype: str = "*" ) -> None: """Extract an opened tar archive into the working directory.""" # tarfile is synchronous, but this method is async. Rather than block, # fork a process and wait for it. @linuxnamespaces.async_run_in_fork.now def extraction_process() -> None: mode = "r:zst" if comptype == "zst" else "r|*" with TarFile.open( fileobj=os.fdopen(tar.fileno(), "rb"), mode=mode ) as tarf: for tmem in tarf: tarf.extract(tmem, numeric_owner=True) await extraction_process.wait() @asyncvarlink.varlinkmethod def Mkdir(self, path: str, mode: int = 0o755) -> None: """Create a directory with given mode.""" path_below(path).mkdir(mode=mode) @asyncvarlink.varlinkmethod def WriteFile(self, path: str, content: str, mode: int = 0o644) -> None: """Create a file with given content and mode.""" dest = path_below(path) dest.write_text(content, encoding="utf-8") dest.chmod(mode) @asyncvarlink.varlinkmethod def BindMount( self, source: str, target: str, readonly: bool = False ) -> None: """Bind mount the source location to the target location. The target location is anchored at the container root. """ target_path = path_below(target) linuxnamespaces.bind_mount(source, target_path, readonly=readonly) if target_path == pathlib.Path("."): os.chdir(os.getcwd()) @asyncvarlink.varlinkmethod def Chdir(self, path: str) -> None: """Change the working directory. The working directory defines the container root filesystem for many other methods. """ os.chdir(path) @asyncvarlink.varlinkmethod def Unshare(self, namespaces: int) -> None: """Invoke the unshare syscall to create new namespaces.""" linuxnamespaces.unshare(linuxnamespaces.CloneFlags(namespaces)) class ForkResult(typing.TypedDict): pid: int pidfd: asyncvarlink.FileDescriptor socket: asyncvarlink.FileDescriptor @asyncvarlink.varlinkmethod async def Fork(self) -> ForkResult: """Create a child process to be configured as container payload. The result includes its pidfd and a varlink socket for communication. """ parent_sock, child_sock = socket.socketpair() try: pid = os.fork() if pid == 0: try: asyncio.set_event_loop(None) parent_sock.close() @run_here @async_as_sync async def _() -> None: interface = ContainerSupervisor() protocol = asyncvarlink.VarlinkInterfaceServerProtocol( create_registry(interface) ) protocol.connection_lost = lambda _: os._exit(0) protocol.eof_received = lambda: os._exit(0) with contextlib.closing( asyncvarlink.VarlinkTransport( asyncio.get_running_loop(), child_sock, child_sock, protocol, ) ): await interface.terminate_future except SystemExit as err: os._exit(err.code) except: os._exit(1) assert False, "unreachable" pidfd = os.pidfd_open(pid) except: parent_sock.close() raise finally: child_sock.close() return ContainerSupervisor.ForkResult( pid=pid, pidfd=asyncvarlink.FileDescriptor(pidfd), socket=asyncvarlink.FileDescriptor(parent_sock), ) @asyncvarlink.varlinkmethod def Newidmaps(self, *, pid: int) -> None: """Perform an identity mapping of uids and gids on the target process with no helpers. The mapping process is assumed to be sufficiently privileged. """ identitymap = [linuxnamespaces.IDMapping(0, 0, 65536)] linuxnamespaces.newidmaps(pid, identitymap, identitymap, helper=False) @asyncvarlink.varlinkmethod(return_parameter="pidfd") def Exec( self, *, command: list[str], fds: list[asyncvarlink.FileDescriptor | None] | None = None, enable_loopback_if: bool = False, user: str | None = None, cwd: str | None = None, ) -> asyncvarlink.FileDescriptor: """Turn the corrent supervisor process (should be run from a fork) into the container payload. It's actually another fork that ends up doing the exec after this process calling Terminate on this setup process. """ if user is None: uid, gid = 0, 0 else: try: record = getpwchroot(user, pathlib.Path(".")) except KeyError as err: raise ContainerError( message=f"user {user} does not exist" ) from err uid = record.pw_uid gid = record.pw_gid # In order for pivot_root to work, the new root must be a mount point, # but as we unshared both a user and mount namespace, the working # directory no longer is a mount point. linuxnamespaces.bind_mount(".", ".", recursive=True) os.chdir("../" + os.path.basename(os.getcwd())) # In order to be able to mount a real proc later, the original /proc # must be visible somewhere inside our container. Temporarily mount it # to /bin. linuxnamespaces.bind_mount("/proc", "bin", recursive=True) linuxnamespaces.mount( "devpts", "dev/pts", "devpts", linuxnamespaces.MountFlags.NOSUID | linuxnamespaces.MountFlags.NOEXEC, "gid=5,mode=0620,ptmxmode=0666", ) linuxnamespaces.pivot_root(".", ".") linuxnamespaces.umount(".", linuxnamespaces.UmountFlags.DETACH) if enable_loopback_if: linuxnamespaces.enable_loopback_if() # This pipe will be accessible to the container, but it's only used for # synchronization. rpipe, wpipe = linuxnamespaces.FileDescriptor.pipe(inheritable=False) @linuxnamespaces.run_in_fork.now def init_process() -> None: wpipe.close() # Now that we have forked and thus entered our PID namespace, we # may mount /proc. linuxnamespaces.mount( "proc", "proc", "proc", linuxnamespaces.MountFlags.NOSUID | linuxnamespaces.MountFlags.NODEV | linuxnamespaces.MountFlags.NOEXEC, ) # Get rid of the mount that granted us mounting /proc. linuxnamespaces.umount("bin", linuxnamespaces.UmountFlags.DETACH) # Drop privileges. if gid != 0: os.setgid(gid) if uid != 0: os.setuid(uid) else: orig_path = os.environ.get("PATH", "") if not orig_path: os.environ["PATH"] = "/usr/sbin:/sbin:/usr/bin:/bin" elif ":/usr/sbin:" not in f":{orig_path}:": os.environ["PATH"] = orig_path + ":/usr/sbin" if cwd: os.chdir(cwd) # Wait for parent exit and reparenting. os.read(rpipe, 1) rpipe.close() remap_fds( [ linuxnamespaces.FileDescriptor(fd) if fd else None for fd in (fds or []) ] ) # The container may change this, but it's still useful for # robustness when it does not. linuxnamespaces.prctl_set_pdeathsig(signal.SIGKILL) os.execvp(command[0], command) # The caller should call Terminate next. Doing so will close the wpipe # and thus allow the child process to proceed. return asyncvarlink.FileDescriptor(os.pidfd_open(init_process.pid)) @asyncvarlink.varlinkmethod(return_parameter="status") async def Waitpidfd(self, *, pidfd: asyncvarlink.FileDescriptor) -> int: """Wait for the process identified by the given pidfd to exit and returns its exit code. """ res = await linuxnamespaces.async_waitpidfd(pidfd.fileno(), os.WEXITED) assert res is not None return res.si_status @asyncvarlink.varlinkmethod def Setpriv( self, uid: int | None = None, gid: int | None = None, groups: list[int] | None = None, dumpable: bool | None = None, ) -> None: """Change the uid/gid/supplementary groups and the dumpable flag.""" if groups is not None: os.setgroups(groups) if gid is not None: os.setgid(gid) if uid is not None: os.setuid(uid) if dumpable is not None: linuxnamespaces.prctl_set_dumpable(dumpable) @asyncvarlink.varlinkmethod def MountSpecials(self) -> None: """Mount /dev without /dev/pts and /sys.""" linuxnamespaces.populate_dev("/", ".", pts="defer") linuxnamespaces.populate_sys( "/", ".", namespaces=linuxnamespaces.CloneFlags.NONE ) @asyncvarlink.varlinkmethod def MountOverlayfs( self, *, lower: str, upper: str, work: str, target: str, ) -> None: """Mount an overlay filesystem.""" linuxnamespaces.mount( "overlay", target, "overlay", data={ "lowerdir": lower, "upperdir": upper, "workdir": work, "userxattr": None, }, ) @asyncvarlink.varlinkmethod(return_parameter="fusefd") def MountFuse( self, target: str = "/", options: dict[str, str | int | None] | None = None, source: str = "none", fstype: str | None = None, ) -> asyncvarlink.FileDescriptor: """Mount a fuse filesystem and return the controlling file descriptor. """ target_path = pathlib.Path( (pathlib.PurePath("/") / target).relative_to("/") ) if options is None: options = {} flags = linuxnamespaces.MountFlags.NONE if options.get("ro", True) is None: flags |= linuxnamespaces.MountFlags.RDONLY del options["ro"] fusefd = asyncvarlink.FileDescriptor(os.open("/dev/fuse", os.O_RDWR)) try: options["fd"] = fusefd.fileno() linuxnamespaces.mount( source, target_path, "fuse" if fstype is None else f"fuse.{fstype}", flags, data=options, ) except: fusefd.close() raise return fusefd def create_registry( *interfaces: asyncvarlink.VarlinkInterface, ) -> asyncvarlink.VarlinkInterfaceRegistry: registry = asyncvarlink.VarlinkInterfaceRegistry() registry.register_interface( asyncvarlink.serviceinterface.VarlinkServiceInterface( "subdivi.de", "unschroot", "0.0", "url", registry ), ) for interface in interfaces: registry.register_interface(interface) return registry def do_info(args: argparse.Namespace) -> None: """Show information about selected chroots""" chrootmap = scan_chroots() chroots: typing.Iterable[ChrootBase] if args.chroot: chroots = [ chrootmap[ args.chroot.removeprefix("chroot:").removeprefix("session:") ], ] else: chroots = chrootmap.values() sys.stdout.write("\n".join(chroot.infostr() for chroot in chroots)) async def supervisor_main(session: str, csock: socket.socket) -> None: # We do double forking, collect secondary children for waitid. linuxnamespaces.prctl_set_child_subreaper() async with contextlib.AsyncExitStack() as stack: interface = ContainerSupervisor() registry = create_registry(interface) setup_transport = asyncvarlink.VarlinkTransport( asyncio.get_running_loop(), csock, csock, registry.protocol_factory(), ) stack.callback(setup_transport.close) sockpath = runtime_path() / session sockpath.parent.mkdir(mode=0o700, exist_ok=True) server = stack.enter_async_context( await asyncvarlink.create_unix_server( registry.protocol_factory, sockpath ), ) stack.callback(server.close) await interface.terminate_future def do_begin_session(args: argparse.Namespace) -> None: """Begin a session; returns the session ID""" session = args.session_name or str(uuid.uuid4()) chrootmap = scan_chroots() source = chrootmap[args.chroot.removeprefix("chroot:")] assert isinstance(source, SourceChroot) uidmap = [ linuxnamespaces.IDAllocation.loadsubid("uid").allocatemap(65536), linuxnamespaces.IDMapping(65536, os.getuid(), 1), ] gidmap = [ linuxnamespaces.IDAllocation.loadsubid("gid").allocatemap(65536), linuxnamespaces.IDMapping(65536, os.getgid(), 1), ] # Create an extra socket to avoid ENOENT when connecting. psock, csock = socket.socketpair( socket.AF_UNIX, socket.SOCK_STREAM | socket.SOCK_NONBLOCK ) @linuxnamespaces.run_in_fork.now def supervisor_process() -> None: # This child is the container supervisor process reachable via IPC. # It will not be part of the PID namespaces, but it'll fork into them. psock.close() close_all_but([2, csock]) asyncio.run(supervisor_main(session, csock)) csock.close() @run_here @async_as_sync async def _() -> None: assert isinstance(source, SourceChroot) # assert again for mypy protocol = asyncvarlink.VarlinkClientProtocol() with contextlib.closing( asyncvarlink.VarlinkTransport( asyncio.get_running_loop(), psock, psock, protocol ), ): await asyncio.sleep(0) proxy = protocol.make_proxy(ContainerSupervisor) common_namespaces = ( linuxnamespaces.CloneFlags.NEWUSER | linuxnamespaces.CloneFlags.NEWNS ) await proxy.Unshare(namespaces=int(common_namespaces)) linuxnamespaces.newidmaps(supervisor_process.pid, uidmap, gidmap) tdir: str | None = None if source.config.get("backingstore") == "directory": tdir = tempfile.mkdtemp(prefix="unshroot") await proxy.AddCleanup(directory=tdir) await proxy.Setpriv(uid=0, gid=0, groups=[0], dumpable=True) await source.mount(proxy, tdir) await proxy.MountSpecials() print(session) @async_as_sync async def do_run_session(args: argparse.Namespace) -> None: """Run an existing session""" sockpath = runtime_path() / args.chroot transport, protocol = await asyncvarlink.connect_unix_varlink( asyncvarlink.VarlinkClientProtocol, sockpath, ) with contextlib.ExitStack() as stack: stack.callback(transport.close) assert isinstance(protocol, asyncvarlink.VarlinkClientProtocol) proxy = protocol.make_proxy(ContainerSupervisor) namespaces = ( linuxnamespaces.CloneFlags.NEWUSER | linuxnamespaces.CloneFlags.NEWNS | linuxnamespaces.CloneFlags.NEWIPC | linuxnamespaces.CloneFlags.NEWPID ) if args.isolate_network: namespaces |= linuxnamespaces.CloneFlags.NEWNET with await proxy.Fork() as proc: vsock = proc["socket"].take() protocol2 = asyncvarlink.VarlinkClientProtocol() stack.callback( asyncvarlink.VarlinkTransport( asyncio.get_running_loop(), vsock, vsock, protocol2 ).close ) proxy2 = protocol2.make_proxy(ContainerSupervisor) await asyncio.sleep(0) # wait for connection_made await proxy2.Unshare(namespaces=namespaces) await proxy.Newidmaps(pid=proc["pid"]) proc2 = stack.enter_context( await proxy2.Exec( command=args.command, fds=[ asyncvarlink.FileDescriptor(0), asyncvarlink.FileDescriptor(1), asyncvarlink.FileDescriptor(2), ], enable_loopback_if=args.isolate_network, user=args.user, cwd=args.directory, ), ) stack.enter_context(proc2["pidfd"]) await protocol2.call( asyncvarlink.VarlinkMethodCall( "de.subdivi.unschroot.Container.Terminate", {}, oneway=True ) ) sys.exit((await proxy.Waitpidfd(pidfd=proc2["pidfd"]))["status"]) @async_as_sync async def do_end_session(args: argparse.Namespace) -> None: """End an existing session""" sockpath = runtime_path() / args.chroot transport, protocol = await asyncvarlink.connect_unix_varlink( asyncvarlink.VarlinkClientProtocol, sockpath, ) with contextlib.closing(transport): assert isinstance(protocol, asyncvarlink.VarlinkClientProtocol) proxy = protocol.make_proxy(ContainerSupervisor) await proxy.Terminate() def main() -> None: parser = argparse.ArgumentParser() group = parser.add_mutually_exclusive_group(required=True) for comm in ("info", "begin-session", "run-session", "end-session"): func = globals()["do_" + comm.replace("-", "_")] group.add_argument( f"-{comm[0]}", f"--{comm}", dest="subcommand", action="store_const", const=func, help=func.__doc__, ) parser.add_argument( "-c", "--chroot", dest="chroot", action="store", help="Use specified chroot", ) parser.add_argument("-d", "--directory", action="store") parser.add_argument("-n", "--session-name", action="store", default=None) parser.add_argument("-p", "--preserve-environment", action="store_true") parser.add_argument("-q", "--quiet", action="store_true") parser.add_argument("-u", "--user", action="store", default=os.getlogin()) parser.add_argument("--isolate-network", action="store_true") parser.add_argument("command", nargs="*") args = parser.parse_args() assert args.subcommand is not None args.subcommand(args) if __name__ == "__main__": main()