summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHelmut Grohne <helmut@subdivi.de>2025-05-01 22:16:47 +0200
committerHelmut Grohne <helmut@subdivi.de>2025-05-01 22:16:47 +0200
commit426f547d5b542daf679fcce34d1e9739107d6987 (patch)
tree80ebe437e89e0d63d4ddc6d8fe6ef80b92a22562
parent6eaaa3bc9b8f54ebd06e8def57c172933b2f6131 (diff)
downloadpython-linuxnamespaces-426f547d5b542daf679fcce34d1e9739107d6987.tar.gz
examples/unschroot.py: support overlayfs-based directory chroots
Suggested-by: Christoph Berg <myon@debian.org>
-rwxr-xr-xexamples/unschroot.py385
1 files changed, 254 insertions, 131 deletions
diff --git a/examples/unschroot.py b/examples/unschroot.py
index 3d1c900..58e14b7 100755
--- a/examples/unschroot.py
+++ b/examples/unschroot.py
@@ -14,9 +14,7 @@ contain the non-essential passwd package. The actual sessions are stored in
import argparse
-import functools
import grp
-import itertools
import os
import pathlib
import pwd
@@ -41,156 +39,279 @@ class TarFile(
pass
-class Chroot:
- # Ignore $HOME as sbuild sets to something invalid
- home = pathlib.Path(pwd.getpwuid(os.getuid()).pw_dir)
- cache_sbuild = home / ".cache/sbuild"
- cache_unschroot = home / ".cache/unschroot"
+def write_etc_hosts(root: os.PathLike[str] | str) -> None:
+ etc_hosts = pathlib.Path(root) / "etc/hosts"
+ if not etc_hosts.exists():
+ etc_hosts.write_text(
+ """127.0.0.1 localhost
+127.0.1.1 %s
+::1 localhost ip6-localhost ip6-loopback
+"""
+ % socket.gethostname(),
+ encoding="ascii",
+ )
- def __init__(self, path: pathlib.Path, aliases: set[str] | None = None):
- self.path = path
- self.aliases = set() if aliases is None else aliases
- @functools.cached_property
- def namespace(self) -> str:
- if self.path.is_file():
- return "Chroot"
- if self.path.is_dir():
- return "Session"
- raise ValueError("invalid chroot object")
+def load_subids() -> (
+ tuple[linuxnamespaces.IDMapping, linuxnamespaces.IDMapping]
+):
+ return (
+ linuxnamespaces.IDAllocation.loadsubid("uid").allocatemap(65536),
+ linuxnamespaces.IDAllocation.loadsubid("gid").allocatemap(65536),
+ )
+
+
+# Ignore $HOME as sbuild sets to something invalid
+HOME = pathlib.Path(pwd.getpwuid(os.getuid()).pw_dir)
+CACHE_SBUILD = HOME / ".cache/sbuild"
+CACHE_UNSCHROOT = HOME / ".cache/unschroot"
+CACHE_DIRECTORY_CHROOTS = HOME / ".cache/directory_chroots"
+
- @functools.cached_property
- def name(self) -> str:
- suffix = "-sbuild" if self.namespace == "Chroot" else ""
- return self.path.name.split(".", 1)[0] + suffix
+class ChrootBase:
+ namespace: str
+ name: str
+
+ def __init__(self) -> None:
+ self.aliases: set[str] = set()
+
+ def infodata(self) -> dict[str, str]:
+ return {
+ "Name": self.name,
+ "Aliases": " ".join(sorted(self.aliases)),
+ }
def infostr(self) -> str:
- lines = [
- f"--- {self.namespace} ---",
- f"Name {self.name}",
- ]
- if self.namespace == "Chroot":
- lines.extend(["Type file", f"File {self.path}"])
- if self.namespace == "Session":
- lines.extend(
- [
- f"Location {self.path}",
- "Session Purged true",
- "Type unshare",
- ]
+ return f"--- {self.namespace} ---\n" + "".join(
+ map("%s %s\n".__mod__, self.infodata().items())
+ )
+
+
+class SourceChroot(ChrootBase):
+ namespace = "Chroot"
+
+ def newsession(self) -> "SessionChroot":
+ raise NotImplementedError
+
+
+class SessionChroot(ChrootBase):
+ namespace = "Session"
+
+ def infodata(self) -> dict[str, str]:
+ data = super().infodata()
+ data["Session Purged"] = "true"
+ data["Type"] = "unshare"
+ return data
+
+ def mount(self) -> pathlib.Path:
+ raise NotImplementedError
+
+
+class TarSourceChroot(SourceChroot):
+ def __init__(self, path: pathlib.Path):
+ super().__init__()
+ self.path = path
+ self.name = path.name.split(".", 1)[0] + "-sbuild"
+
+ def infodata(self) -> dict[str, str]:
+ data = super().infodata()
+ data["Type"] = "file"
+ data["File"] = str(self.path)
+ return data
+
+ def newsession(self) -> "TarSessionChroot":
+ CACHE_UNSCHROOT.mkdir(parents=True, exist_ok=True)
+ session = TarSessionChroot(
+ pathlib.Path(tempfile.mkdtemp(prefix="tar-", dir=CACHE_UNSCHROOT)),
+ )
+
+ uidmap, gidmap = load_subids()
+ mainsock, childsock = socket.socketpair()
+ with TarFile.open(self.path, "r:*") as tarf:
+ pid = os.fork()
+ if pid == 0:
+ mainsock.close()
+ os.chdir(session.path)
+ linuxnamespaces.unshare(
+ linuxnamespaces.CloneFlags.NEWUSER
+ | linuxnamespaces.CloneFlags.NEWNS,
+ )
+ childsock.send(b"\0")
+ childsock.recv(1)
+ childsock.close()
+ os.setgid(0)
+ os.setuid(0)
+ for tmem in tarf:
+ if not tmem.name.startswith(("dev/", "./dev/")):
+ tarf.extract(tmem, numeric_owner=True)
+ write_etc_hosts(".")
+ sys.exit(0)
+ childsock.close()
+ mainsock.recv(1)
+ pid2 = os.fork()
+ if pid2 == 0:
+ linuxnamespaces.unshare_user_idmap(
+ [uidmap, linuxnamespaces.IDMapping(65536, os.getuid(), 1)],
+ [gidmap, linuxnamespaces.IDMapping(65536, os.getgid(), 1)],
)
- lines.append("Aliases " + " ".join(sorted(self.aliases)))
- return "".join(map("%s\n".__mod__, lines))
-
- @classmethod
- def searchchroot(cls, name: str) -> "Chroot":
- name = name.removeprefix("chroot:")
- name = name.removesuffix("-sbuild")
- for path in cls.cache_sbuild.iterdir():
- if path.name.startswith(name + ".t"):
- return cls(path)
- raise KeyError(name)
-
- @classmethod
- def searchsession(cls, name: str) -> "Chroot":
- name = name.removeprefix("session:")
- path = cls.cache_unschroot / name
- if not path.is_dir():
- raise KeyError(name)
- return cls(path)
-
- @classmethod
- def newsession(cls) -> "Chroot":
- cls.cache_unschroot.mkdir(parents=True, exist_ok=True)
- return Chroot(
- pathlib.Path(
- tempfile.mkdtemp(prefix="chroot", dir=cls.cache_unschroot)
+ os.chown(session.path, 0, 0)
+ session.path.chmod(0o755)
+ sys.exit(0)
+ linuxnamespaces.newidmaps(pid, [uidmap], [gidmap])
+ _, ret = os.waitpid(pid2, 0)
+ assert ret == 0
+ mainsock.send(b"\0")
+ mainsock.close()
+ _, ret = os.waitpid(pid, 0)
+ assert ret == 0
+ return session
+
+
+class TarSessionChroot(SessionChroot):
+ def __init__(self, path: pathlib.Path):
+ super().__init__()
+ self.path = path
+ self.name = path.name
+
+ def mount(self) -> pathlib.Path:
+ linuxnamespaces.bind_mount(self.path, "/mnt", recursive=True)
+ return pathlib.Path("/mnt")
+
+
+class DirectorySourceChroot(SourceChroot):
+ def __init__(self, path: pathlib.Path):
+ super().__init__()
+ self.path = path
+ self.name = path.name + "-sbuild"
+
+ def infodata(self) -> dict[str, str]:
+ data = super().infodata()
+ data["Type"] = "directory"
+ data["Directory"] = str(self.path)
+ return data
+
+ def newsession(self) -> "DirectorySessionChroot":
+ CACHE_UNSCHROOT.mkdir(parents=True, exist_ok=True)
+ path = pathlib.Path(
+ tempfile.mkdtemp(
+ prefix=f"overlay-{self.name}-", dir=CACHE_UNSCHROOT
),
)
+ session = DirectorySessionChroot(self, path)
+ uidmap, gidmap = load_subids()
+ pid = os.fork()
+ if pid == 0:
+ linuxnamespaces.unshare_user_idmap(
+ [uidmap, linuxnamespaces.IDMapping(65536, os.getuid(), 1)],
+ [gidmap, linuxnamespaces.IDMapping(65536, os.getgid(), 1)],
+ )
+ os.setgid(0)
+ os.setuid(0)
+ os.chown(path, 0, 0)
+ path.chmod(0o755)
+ (path / "upper").mkdir()
+ (path / "work").mkdir()
+ if not (self.path / "etc/hosts").exists():
+ (path / "upper/etc").mkdir()
+ write_etc_hosts(path / "upper")
+ sys.exit(0)
+ _, ret = os.waitpid(pid, 0)
+ assert ret == 0
+ return session
+
+
+class DirectorySessionChroot(SessionChroot):
+ def __init__(self, source: DirectorySourceChroot, path: pathlib.Path):
+ super().__init__()
+ self.source = source
+ self.path = path
+ self.name = path.name
+
+ def infodata(self) -> dict[str, str]:
+ data = super().infodata()
+ data["Type"] = "directory"
+ data["Directory"] = str(self.source.path)
+ return data
+
+ def mount(self) -> pathlib.Path:
+ mnt = "/mnt"
+ linuxnamespaces.mount(
+ "overlay",
+ mnt,
+ "overlay",
+ data=[
+ "lowerdir=" + str(self.source.path),
+ "upperdir=" + str(self.path / "upper"),
+ "workdir=" + str(self.path / "work"),
+ "userxattr",
+ ],
+ )
+ return pathlib.Path(mnt)
+
- @classmethod
- def scan_sbuild(cls) -> typing.Iterator["Chroot"]:
- if cls.cache_sbuild.is_dir():
+def scan_chroots() -> dict[str, ChrootBase]:
+ chrootmap: dict[str, ChrootBase] = {}
+ chroot: ChrootBase
+ for loc, cls in (
+ (CACHE_SBUILD, TarSourceChroot),
+ (CACHE_DIRECTORY_CHROOTS, DirectorySourceChroot),
+ ):
+ if loc.is_dir():
chroots = []
aliases: dict[str, set[str]] = {}
- for path in cls.cache_sbuild.iterdir():
+ for path in loc.iterdir():
if path.is_symlink():
alias = path.name.split(".", 1)[0] + "-sbuild"
aliases.setdefault(str(path.readlink()), set()).add(alias)
- elif path.is_file():
+ else:
chroots.append(path)
for path in chroots:
- yield cls(path, aliases.get(path.name, set()))
-
- @classmethod
- def scan_unschroot(cls) -> typing.Iterator["Chroot"]:
- if cls.cache_unschroot.is_dir():
- yield from map(cls, cls.cache_unschroot.iterdir())
+ chroot = cls(path)
+ chroot.aliases.update(aliases.get(path.name, set()))
+ if chroot.name not in chrootmap:
+ chrootmap[chroot.name] = chroot
+
+ if CACHE_UNSCHROOT.is_dir():
+ for path in CACHE_UNSCHROOT.iterdir():
+ if path.name.startswith("tar-"):
+ chroot = TarSessionChroot(path)
+ if chroot.name not in chrootmap:
+ chrootmap[chroot.name] = chroot
+ elif path.name.startswith("overlay-"):
+ base = "-".join(path.name.split("-")[1:-1])
+ if base not in chrootmap:
+ continue
+ source = chrootmap[base]
+ assert isinstance(source, DirectorySourceChroot)
+ chroot = DirectorySessionChroot(source, path)
+ if chroot.name not in chrootmap:
+ chrootmap[chroot.name] = chroot
+
+ return chrootmap
def do_info(args: argparse.Namespace) -> None:
"""Show information about selected chroots"""
- chroots: typing.Iterable[Chroot]
+ chrootmap = scan_chroots()
+ chroots: typing.Iterable[ChrootBase]
if args.chroot:
- try:
- chroots = [Chroot.searchchroot(args.chroot)]
- except KeyError:
- chroots = [Chroot.searchsession(args.chroot)]
+ chroots = [
+ chrootmap[
+ args.chroot.removeprefix("chroot:").removeprefix("session:")
+ ],
+ ]
else:
- chroots = itertools.chain(
- Chroot.scan_sbuild(), Chroot.scan_unschroot()
- )
+ chroots = chrootmap.values()
sys.stdout.write("\n".join(chroot.infostr() for chroot in chroots))
def do_begin_session(args: argparse.Namespace) -> None:
"""Begin a session; returns the session ID"""
- source = Chroot.searchchroot(args.chroot)
- session = Chroot.newsession()
- uidmap = linuxnamespaces.IDAllocation.loadsubid("uid").allocatemap(65536)
- gidmap = linuxnamespaces.IDAllocation.loadsubid("gid").allocatemap(65536)
- mainsock, childsock = socket.socketpair()
- with TarFile.open(source.path, "r:*") as tarf:
- pid = os.fork()
- if pid == 0:
- mainsock.close()
- os.chdir(session.path)
- linuxnamespaces.unshare(
- linuxnamespaces.CloneFlags.NEWUSER
- | linuxnamespaces.CloneFlags.NEWNS,
- )
- childsock.send(b"\0")
- childsock.recv(1)
- childsock.close()
- os.setgid(0)
- os.setuid(0)
- for tmem in tarf:
- if not tmem.name.startswith(("dev/", "./dev/")):
- tarf.extract(tmem, numeric_owner=True)
- etc_hosts = pathlib.Path("./etc/hosts")
- if not etc_hosts.exists():
- etc_hosts.write_text(
- """127.0.0.1 localhost
-127.0.1.1 %s
-::1 localhost ip6-localhost ip6-loopback
-"""
- % socket.gethostname(),
- )
- sys.exit(0)
- childsock.close()
- mainsock.recv(1)
- linuxnamespaces.newidmaps(pid, [uidmap], [gidmap])
- linuxnamespaces.unshare_user_idmap(
- [uidmap, linuxnamespaces.IDMapping(65536, os.getuid(), 1)],
- [gidmap, linuxnamespaces.IDMapping(65536, os.getgid(), 1)],
- )
- os.chown(session.path, 0, 0)
- session.path.chmod(0o755)
- mainsock.send(b"\0")
- mainsock.close()
- _, ret = os.waitpid(pid, 0)
+ chrootmap = scan_chroots()
+ source = chrootmap[args.chroot]
+ assert isinstance(source, SourceChroot)
+ session = source.newsession()
print(session.name)
- sys.exit(ret)
def exec_perl_dumb_init(pid: int) -> typing.NoReturn:
@@ -212,9 +333,10 @@ def exec_perl_dumb_init(pid: int) -> typing.NoReturn:
def do_run_session(args: argparse.Namespace) -> None:
"""Run an existing session"""
- session = Chroot.searchsession(args.chroot)
- uidmap = linuxnamespaces.IDAllocation.loadsubid("uid").allocatemap(65536)
- gidmap = linuxnamespaces.IDAllocation.loadsubid("gid").allocatemap(65536)
+ chrootmap = scan_chroots()
+ session = chrootmap[args.chroot]
+ assert isinstance(session, SessionChroot)
+ uidmap, gidmap = load_subids()
mainsock, childsock = socket.socketpair()
pid = os.fork()
pidfd: int
@@ -223,7 +345,6 @@ def do_run_session(args: argparse.Namespace) -> None:
for fd in (1, 2):
if stat.S_ISFIFO(os.fstat(fd).st_mode):
os.fchmod(fd, 0o666)
- os.chdir(session.path)
ns = (
linuxnamespaces.CloneFlags.NEWUSER
| linuxnamespaces.CloneFlags.NEWNS
@@ -241,8 +362,8 @@ def do_run_session(args: argparse.Namespace) -> None:
socket.send_fds(childsock, [b"\0"], [pidfd])
os.setgid(0)
os.setuid(0)
- linuxnamespaces.bind_mount(".", "/mnt", recursive=True)
- os.chdir("/mnt")
+ root = session.mount()
+ os.chdir(root)
linuxnamespaces.populate_sys("/", ".", ns, devices=True)
linuxnamespaces.populate_proc("/", ".", ns)
linuxnamespaces.populate_dev(
@@ -308,7 +429,9 @@ def do_run_session(args: argparse.Namespace) -> None:
def do_end_session(args: argparse.Namespace) -> None:
"""End an existing session"""
- session = Chroot.searchsession(args.chroot)
+ chrootmap = scan_chroots()
+ session = chrootmap[args.chroot]
+ assert isinstance(session, (TarSessionChroot, DirectorySessionChroot))
uidmap = linuxnamespaces.IDAllocation.loadsubid("uid").allocatemap(65536)
gidmap = linuxnamespaces.IDAllocation.loadsubid("gid").allocatemap(65536)
linuxnamespaces.unshare_user_idmap(