diff options
-rwxr-xr-x | examples/unschroot.py | 385 |
1 files changed, 254 insertions, 131 deletions
diff --git a/examples/unschroot.py b/examples/unschroot.py index 3d1c900..58e14b7 100755 --- a/examples/unschroot.py +++ b/examples/unschroot.py @@ -14,9 +14,7 @@ contain the non-essential passwd package. The actual sessions are stored in import argparse -import functools import grp -import itertools import os import pathlib import pwd @@ -41,156 +39,279 @@ class TarFile( pass -class Chroot: - # Ignore $HOME as sbuild sets to something invalid - home = pathlib.Path(pwd.getpwuid(os.getuid()).pw_dir) - cache_sbuild = home / ".cache/sbuild" - cache_unschroot = home / ".cache/unschroot" +def write_etc_hosts(root: os.PathLike[str] | str) -> None: + etc_hosts = pathlib.Path(root) / "etc/hosts" + if not etc_hosts.exists(): + etc_hosts.write_text( + """127.0.0.1 localhost +127.0.1.1 %s +::1 localhost ip6-localhost ip6-loopback +""" + % socket.gethostname(), + encoding="ascii", + ) - def __init__(self, path: pathlib.Path, aliases: set[str] | None = None): - self.path = path - self.aliases = set() if aliases is None else aliases - @functools.cached_property - def namespace(self) -> str: - if self.path.is_file(): - return "Chroot" - if self.path.is_dir(): - return "Session" - raise ValueError("invalid chroot object") +def load_subids() -> ( + tuple[linuxnamespaces.IDMapping, linuxnamespaces.IDMapping] +): + return ( + linuxnamespaces.IDAllocation.loadsubid("uid").allocatemap(65536), + linuxnamespaces.IDAllocation.loadsubid("gid").allocatemap(65536), + ) + + +# Ignore $HOME as sbuild sets to something invalid +HOME = pathlib.Path(pwd.getpwuid(os.getuid()).pw_dir) +CACHE_SBUILD = HOME / ".cache/sbuild" +CACHE_UNSCHROOT = HOME / ".cache/unschroot" +CACHE_DIRECTORY_CHROOTS = HOME / ".cache/directory_chroots" + - @functools.cached_property - def name(self) -> str: - suffix = "-sbuild" if self.namespace == "Chroot" else "" - return self.path.name.split(".", 1)[0] + suffix +class ChrootBase: + namespace: str + name: str + + def __init__(self) -> None: + self.aliases: set[str] = set() + + def infodata(self) -> dict[str, str]: + return { + "Name": self.name, + "Aliases": " ".join(sorted(self.aliases)), + } def infostr(self) -> str: - lines = [ - f"--- {self.namespace} ---", - f"Name {self.name}", - ] - if self.namespace == "Chroot": - lines.extend(["Type file", f"File {self.path}"]) - if self.namespace == "Session": - lines.extend( - [ - f"Location {self.path}", - "Session Purged true", - "Type unshare", - ] + return f"--- {self.namespace} ---\n" + "".join( + map("%s %s\n".__mod__, self.infodata().items()) + ) + + +class SourceChroot(ChrootBase): + namespace = "Chroot" + + def newsession(self) -> "SessionChroot": + raise NotImplementedError + + +class SessionChroot(ChrootBase): + namespace = "Session" + + def infodata(self) -> dict[str, str]: + data = super().infodata() + data["Session Purged"] = "true" + data["Type"] = "unshare" + return data + + def mount(self) -> pathlib.Path: + raise NotImplementedError + + +class TarSourceChroot(SourceChroot): + def __init__(self, path: pathlib.Path): + super().__init__() + self.path = path + self.name = path.name.split(".", 1)[0] + "-sbuild" + + def infodata(self) -> dict[str, str]: + data = super().infodata() + data["Type"] = "file" + data["File"] = str(self.path) + return data + + def newsession(self) -> "TarSessionChroot": + CACHE_UNSCHROOT.mkdir(parents=True, exist_ok=True) + session = TarSessionChroot( + pathlib.Path(tempfile.mkdtemp(prefix="tar-", dir=CACHE_UNSCHROOT)), + ) + + uidmap, gidmap = load_subids() + mainsock, childsock = socket.socketpair() + with TarFile.open(self.path, "r:*") as tarf: + pid = os.fork() + if pid == 0: + mainsock.close() + os.chdir(session.path) + linuxnamespaces.unshare( + linuxnamespaces.CloneFlags.NEWUSER + | linuxnamespaces.CloneFlags.NEWNS, + ) + childsock.send(b"\0") + childsock.recv(1) + childsock.close() + os.setgid(0) + os.setuid(0) + for tmem in tarf: + if not tmem.name.startswith(("dev/", "./dev/")): + tarf.extract(tmem, numeric_owner=True) + write_etc_hosts(".") + sys.exit(0) + childsock.close() + mainsock.recv(1) + pid2 = os.fork() + if pid2 == 0: + linuxnamespaces.unshare_user_idmap( + [uidmap, linuxnamespaces.IDMapping(65536, os.getuid(), 1)], + [gidmap, linuxnamespaces.IDMapping(65536, os.getgid(), 1)], ) - lines.append("Aliases " + " ".join(sorted(self.aliases))) - return "".join(map("%s\n".__mod__, lines)) - - @classmethod - def searchchroot(cls, name: str) -> "Chroot": - name = name.removeprefix("chroot:") - name = name.removesuffix("-sbuild") - for path in cls.cache_sbuild.iterdir(): - if path.name.startswith(name + ".t"): - return cls(path) - raise KeyError(name) - - @classmethod - def searchsession(cls, name: str) -> "Chroot": - name = name.removeprefix("session:") - path = cls.cache_unschroot / name - if not path.is_dir(): - raise KeyError(name) - return cls(path) - - @classmethod - def newsession(cls) -> "Chroot": - cls.cache_unschroot.mkdir(parents=True, exist_ok=True) - return Chroot( - pathlib.Path( - tempfile.mkdtemp(prefix="chroot", dir=cls.cache_unschroot) + os.chown(session.path, 0, 0) + session.path.chmod(0o755) + sys.exit(0) + linuxnamespaces.newidmaps(pid, [uidmap], [gidmap]) + _, ret = os.waitpid(pid2, 0) + assert ret == 0 + mainsock.send(b"\0") + mainsock.close() + _, ret = os.waitpid(pid, 0) + assert ret == 0 + return session + + +class TarSessionChroot(SessionChroot): + def __init__(self, path: pathlib.Path): + super().__init__() + self.path = path + self.name = path.name + + def mount(self) -> pathlib.Path: + linuxnamespaces.bind_mount(self.path, "/mnt", recursive=True) + return pathlib.Path("/mnt") + + +class DirectorySourceChroot(SourceChroot): + def __init__(self, path: pathlib.Path): + super().__init__() + self.path = path + self.name = path.name + "-sbuild" + + def infodata(self) -> dict[str, str]: + data = super().infodata() + data["Type"] = "directory" + data["Directory"] = str(self.path) + return data + + def newsession(self) -> "DirectorySessionChroot": + CACHE_UNSCHROOT.mkdir(parents=True, exist_ok=True) + path = pathlib.Path( + tempfile.mkdtemp( + prefix=f"overlay-{self.name}-", dir=CACHE_UNSCHROOT ), ) + session = DirectorySessionChroot(self, path) + uidmap, gidmap = load_subids() + pid = os.fork() + if pid == 0: + linuxnamespaces.unshare_user_idmap( + [uidmap, linuxnamespaces.IDMapping(65536, os.getuid(), 1)], + [gidmap, linuxnamespaces.IDMapping(65536, os.getgid(), 1)], + ) + os.setgid(0) + os.setuid(0) + os.chown(path, 0, 0) + path.chmod(0o755) + (path / "upper").mkdir() + (path / "work").mkdir() + if not (self.path / "etc/hosts").exists(): + (path / "upper/etc").mkdir() + write_etc_hosts(path / "upper") + sys.exit(0) + _, ret = os.waitpid(pid, 0) + assert ret == 0 + return session + + +class DirectorySessionChroot(SessionChroot): + def __init__(self, source: DirectorySourceChroot, path: pathlib.Path): + super().__init__() + self.source = source + self.path = path + self.name = path.name + + def infodata(self) -> dict[str, str]: + data = super().infodata() + data["Type"] = "directory" + data["Directory"] = str(self.source.path) + return data + + def mount(self) -> pathlib.Path: + mnt = "/mnt" + linuxnamespaces.mount( + "overlay", + mnt, + "overlay", + data=[ + "lowerdir=" + str(self.source.path), + "upperdir=" + str(self.path / "upper"), + "workdir=" + str(self.path / "work"), + "userxattr", + ], + ) + return pathlib.Path(mnt) + - @classmethod - def scan_sbuild(cls) -> typing.Iterator["Chroot"]: - if cls.cache_sbuild.is_dir(): +def scan_chroots() -> dict[str, ChrootBase]: + chrootmap: dict[str, ChrootBase] = {} + chroot: ChrootBase + for loc, cls in ( + (CACHE_SBUILD, TarSourceChroot), + (CACHE_DIRECTORY_CHROOTS, DirectorySourceChroot), + ): + if loc.is_dir(): chroots = [] aliases: dict[str, set[str]] = {} - for path in cls.cache_sbuild.iterdir(): + for path in loc.iterdir(): if path.is_symlink(): alias = path.name.split(".", 1)[0] + "-sbuild" aliases.setdefault(str(path.readlink()), set()).add(alias) - elif path.is_file(): + else: chroots.append(path) for path in chroots: - yield cls(path, aliases.get(path.name, set())) - - @classmethod - def scan_unschroot(cls) -> typing.Iterator["Chroot"]: - if cls.cache_unschroot.is_dir(): - yield from map(cls, cls.cache_unschroot.iterdir()) + chroot = cls(path) + chroot.aliases.update(aliases.get(path.name, set())) + if chroot.name not in chrootmap: + chrootmap[chroot.name] = chroot + + if CACHE_UNSCHROOT.is_dir(): + for path in CACHE_UNSCHROOT.iterdir(): + if path.name.startswith("tar-"): + chroot = TarSessionChroot(path) + if chroot.name not in chrootmap: + chrootmap[chroot.name] = chroot + elif path.name.startswith("overlay-"): + base = "-".join(path.name.split("-")[1:-1]) + if base not in chrootmap: + continue + source = chrootmap[base] + assert isinstance(source, DirectorySourceChroot) + chroot = DirectorySessionChroot(source, path) + if chroot.name not in chrootmap: + chrootmap[chroot.name] = chroot + + return chrootmap def do_info(args: argparse.Namespace) -> None: """Show information about selected chroots""" - chroots: typing.Iterable[Chroot] + chrootmap = scan_chroots() + chroots: typing.Iterable[ChrootBase] if args.chroot: - try: - chroots = [Chroot.searchchroot(args.chroot)] - except KeyError: - chroots = [Chroot.searchsession(args.chroot)] + chroots = [ + chrootmap[ + args.chroot.removeprefix("chroot:").removeprefix("session:") + ], + ] else: - chroots = itertools.chain( - Chroot.scan_sbuild(), Chroot.scan_unschroot() - ) + chroots = chrootmap.values() sys.stdout.write("\n".join(chroot.infostr() for chroot in chroots)) def do_begin_session(args: argparse.Namespace) -> None: """Begin a session; returns the session ID""" - source = Chroot.searchchroot(args.chroot) - session = Chroot.newsession() - uidmap = linuxnamespaces.IDAllocation.loadsubid("uid").allocatemap(65536) - gidmap = linuxnamespaces.IDAllocation.loadsubid("gid").allocatemap(65536) - mainsock, childsock = socket.socketpair() - with TarFile.open(source.path, "r:*") as tarf: - pid = os.fork() - if pid == 0: - mainsock.close() - os.chdir(session.path) - linuxnamespaces.unshare( - linuxnamespaces.CloneFlags.NEWUSER - | linuxnamespaces.CloneFlags.NEWNS, - ) - childsock.send(b"\0") - childsock.recv(1) - childsock.close() - os.setgid(0) - os.setuid(0) - for tmem in tarf: - if not tmem.name.startswith(("dev/", "./dev/")): - tarf.extract(tmem, numeric_owner=True) - etc_hosts = pathlib.Path("./etc/hosts") - if not etc_hosts.exists(): - etc_hosts.write_text( - """127.0.0.1 localhost -127.0.1.1 %s -::1 localhost ip6-localhost ip6-loopback -""" - % socket.gethostname(), - ) - sys.exit(0) - childsock.close() - mainsock.recv(1) - linuxnamespaces.newidmaps(pid, [uidmap], [gidmap]) - linuxnamespaces.unshare_user_idmap( - [uidmap, linuxnamespaces.IDMapping(65536, os.getuid(), 1)], - [gidmap, linuxnamespaces.IDMapping(65536, os.getgid(), 1)], - ) - os.chown(session.path, 0, 0) - session.path.chmod(0o755) - mainsock.send(b"\0") - mainsock.close() - _, ret = os.waitpid(pid, 0) + chrootmap = scan_chroots() + source = chrootmap[args.chroot] + assert isinstance(source, SourceChroot) + session = source.newsession() print(session.name) - sys.exit(ret) def exec_perl_dumb_init(pid: int) -> typing.NoReturn: @@ -212,9 +333,10 @@ def exec_perl_dumb_init(pid: int) -> typing.NoReturn: def do_run_session(args: argparse.Namespace) -> None: """Run an existing session""" - session = Chroot.searchsession(args.chroot) - uidmap = linuxnamespaces.IDAllocation.loadsubid("uid").allocatemap(65536) - gidmap = linuxnamespaces.IDAllocation.loadsubid("gid").allocatemap(65536) + chrootmap = scan_chroots() + session = chrootmap[args.chroot] + assert isinstance(session, SessionChroot) + uidmap, gidmap = load_subids() mainsock, childsock = socket.socketpair() pid = os.fork() pidfd: int @@ -223,7 +345,6 @@ def do_run_session(args: argparse.Namespace) -> None: for fd in (1, 2): if stat.S_ISFIFO(os.fstat(fd).st_mode): os.fchmod(fd, 0o666) - os.chdir(session.path) ns = ( linuxnamespaces.CloneFlags.NEWUSER | linuxnamespaces.CloneFlags.NEWNS @@ -241,8 +362,8 @@ def do_run_session(args: argparse.Namespace) -> None: socket.send_fds(childsock, [b"\0"], [pidfd]) os.setgid(0) os.setuid(0) - linuxnamespaces.bind_mount(".", "/mnt", recursive=True) - os.chdir("/mnt") + root = session.mount() + os.chdir(root) linuxnamespaces.populate_sys("/", ".", ns, devices=True) linuxnamespaces.populate_proc("/", ".", ns) linuxnamespaces.populate_dev( @@ -308,7 +429,9 @@ def do_run_session(args: argparse.Namespace) -> None: def do_end_session(args: argparse.Namespace) -> None: """End an existing session""" - session = Chroot.searchsession(args.chroot) + chrootmap = scan_chroots() + session = chrootmap[args.chroot] + assert isinstance(session, (TarSessionChroot, DirectorySessionChroot)) uidmap = linuxnamespaces.IDAllocation.loadsubid("uid").allocatemap(65536) gidmap = linuxnamespaces.IDAllocation.loadsubid("gid").allocatemap(65536) linuxnamespaces.unshare_user_idmap( |