1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
|
#!/usr/bin/python3
# Copyright 2024 Helmut Grohne <helmut@subdivi.de>
# SPDX-License-Identifier: GPL-3
"""Extract a given tarball into a temporary location and chroot into it inside
a user and mount namespace.
"""
import os
import pathlib
import sys
import tarfile
import tempfile
if __file__.split("/")[-2:-1] == ["examples"]:
sys.path.insert(0, "/".join(__file__.split("/")[:-2]))
import linuxnamespaces
class TarFile(tarfile.TarFile):
"""Subclass of tarfile.TarFile that can read zstd compressed archives."""
OPEN_METH = {"zst": "zstopen"} | tarfile.TarFile.OPEN_METH
@classmethod
def zstopen(
cls, name: str, mode: str = "r", fileobj: None = None
) -> tarfile.TarFile:
if mode != "r":
raise NotImplementedError("zst only implmented for reading")
if fileobj is not None:
raise NotImplementedError("zst does not support a fileobj")
try:
import zstandard
except ImportError:
raise tarfile.CompressionError("zstandard module not available")
zfobj = zstandard.open(name, "rb")
try:
tarobj = cls.taropen(name, "r", zfobj)
except (OSError, EOFError) as exc:
zfobj.close()
raise tarfile.ReadError("not a zst file") from exc
except:
zfobj.close()
raise
return tarobj
def main() -> None:
basetar = pathlib.Path(sys.argv[1])
assert basetar.exists()
uidmap = linuxnamespaces.IDAllocation.loadsubid("uid").allocatemap(65536)
gidmap = linuxnamespaces.IDAllocation.loadsubid("gid").allocatemap(65536)
with tempfile.TemporaryDirectory() as tdir:
with TarFile.open(basetar, "r:*") as tarf:
os.chdir(tdir)
pid = os.getpid()
@linuxnamespaces.run_in_fork
def setup() -> None:
linuxnamespaces.newidmaps(pid, [uidmap], [gidmap])
# Craft a namespace that allows us to chown from our current
# user to the first uid/gid from the final mapping, i.e. root.
linuxnamespaces.unshare_user_idmap(
[
linuxnamespaces.IDMapping(0, os.getuid(), 1),
linuxnamespaces.IDMapping(1, uidmap.outerstart, 1),
],
[
linuxnamespaces.IDMapping(0, os.getgid(), 1),
linuxnamespaces.IDMapping(1, gidmap.outerstart, 1),
],
)
os.chown(".", 1, 1)
linuxnamespaces.unshare(linuxnamespaces.CloneFlags.NEWUSER)
setup()
os.setreuid(0, 0)
os.setregid(0, 0)
# "." is now owned by 0:0 inside the namespace.
for tmem in tarf:
if tmem.name.removeprefix("./").startswith("dev/"):
continue
tarf.extract(tmem, numeric_owner=True)
pid = os.fork()
if pid == 0:
linuxnamespaces.unshare(linuxnamespaces.CloneFlags.NEWNS)
linuxnamespaces.bind_mount(".", "/mnt", recursive=True)
os.chdir("/mnt")
linuxnamespaces.bind_mount("/proc", "proc", recursive=True)
linuxnamespaces.bind_mount("/sys", "sys", recursive=True)
linuxnamespaces.populate_dev("/", ".", pidns=False, tun=False)
linuxnamespaces.pivot_root(".", ".")
linuxnamespaces.umount(".", linuxnamespaces.UmountFlags.DETACH)
os.execlp(os.environ["SHELL"], os.environ["SHELL"])
_, ret = os.waitpid(pid, 0)
sys.exit(ret)
if __name__ == "__main__":
main()
|