summaryrefslogtreecommitdiff
path: root/examples/chroottar.py
blob: ed494b2f8ca74e5ad7c29d06e7f4ce62e71c5586 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
#!/usr/bin/python3
# Copyright 2024 Helmut Grohne <helmut@subdivi.de>
# SPDX-License-Identifier: GPL-3

"""Extract a given tarball into a temporary location and chroot into it inside
a user and mount namespace.
"""

import os
import pathlib
import sys
import tarfile
import tempfile

if __file__.split("/")[-2:-1] == ["examples"]:
    sys.path.insert(0, "/".join(__file__.split("/")[:-2]))

import linuxnamespaces


class TarFile(tarfile.TarFile):
    """Subclass of tarfile.TarFile that can read zstd compressed archives."""

    OPEN_METH = {"zst": "zstopen"} | tarfile.TarFile.OPEN_METH

    @classmethod
    def zstopen(
        cls, name: str, mode: str = "r", fileobj: None = None
    ) -> tarfile.TarFile:
        if mode != "r":
            raise NotImplementedError("zst only implmented for reading")
        if fileobj is not None:
            raise NotImplementedError("zst does not support a fileobj")
        try:
            import zstandard
        except ImportError:
            raise tarfile.CompressionError("zstandard module not available")
        zfobj = zstandard.open(name, "rb")
        try:
            tarobj = cls.taropen(name, "r", zfobj)
        except (OSError, EOFError, zstandard.ZstdError) as exc:
            zfobj.close()
            raise tarfile.ReadError("not a zst file") from exc
        except:
            zfobj.close()
            raise
        return tarobj


def main() -> None:
    basetar = pathlib.Path(sys.argv[1])
    assert basetar.exists()
    uidmap = linuxnamespaces.IDAllocation.loadsubid("uid").allocatemap(65536)
    gidmap = linuxnamespaces.IDAllocation.loadsubid("gid").allocatemap(65536)
    with tempfile.TemporaryDirectory() as tdir:
        unshareevent = linuxnamespaces.EventFD()
        setupevent = linuxnamespaces.EventFD()
        pid = os.fork()
        if pid == 0:
            with TarFile.open(basetar, "r:*") as tarf:
                os.chdir(tdir)
                linuxnamespaces.unshare(
                    linuxnamespaces.CloneFlags.NEWUSER
                    | linuxnamespaces.CloneFlags.NEWNS
                )
                unshareevent.write(1)
                setupevent.read()
                unshareevent.close()
                setupevent.close()
                os.setreuid(0, 0)
                os.setregid(0, 0)
                os.setgroups([])
                for tmem in tarf:
                    if tmem.name.removeprefix("./").startswith("dev/"):
                        continue
                    tarf.extract(tmem, numeric_owner=True)
            linuxnamespaces.bind_mount(".", "/mnt", recursive=True)
            os.chdir("/mnt")
            linuxnamespaces.bind_mount("/proc", "proc", recursive=True)
            linuxnamespaces.bind_mount("/sys", "sys", recursive=True)
            linuxnamespaces.populate_dev("/", ".", pidns=False, tun=False)
            linuxnamespaces.pivot_root(".", ".")
            linuxnamespaces.umount(".", linuxnamespaces.UmountFlags.DETACH)
            os.execlp(os.environ["SHELL"], os.environ["SHELL"])
            os._exit(1)

        unshareevent.read()
        unshareevent.close()
        linuxnamespaces.newidmaps(pid, [uidmap], [gidmap])
        linuxnamespaces.unshare_user_idmap(
            [uidmap, linuxnamespaces.IDMapping(65536, os.getuid(), 1)],
            [gidmap, linuxnamespaces.IDMapping(65536, os.getgid(), 1)],
        )
        os.chown(tdir, 0, 0)
        os.chmod(tdir, 0o755)
        setupevent.write()
        setupevent.close()
        _, ret = os.waitpid(pid, 0)
    sys.exit(ret)


if __name__ == "__main__":
    main()