#!/usr/bin/python3 # Copyright 2024 Helmut Grohne # SPDX-License-Identifier: GPL-3 """Unshare a cgroup (and user) namespace such that the entire cgroup hierarchy (inside the namespace) becomes writeable to the user. """ import os import pathlib import sys import time try: import ravel except ImportError: ravel = None if __file__.split("/")[-2:-1] == ["examples"]: sys.path.insert(0, "/".join(__file__.split("/")[:-2])) import linuxnamespaces def get_cgroup(pid: int = -1) -> pathlib.PurePath: """Look up the cgroup that the given pid or the running process belongs to. """ return pathlib.PurePath( pathlib.Path( f"/proc/{pid}/cgroup" if pid > 0 else "/proc/self/cgroup" ).read_text().split(":", 2)[2].strip() ) def start_transient_unit_with_ravel(pid: int) -> None: """Call the StartTransientUnit dbus method on the user manager for the given pid. """ bus = ravel.session_bus() jobs_removed = {} systemd_path = "/org/freedesktop/systemd1" systemd_iface = "org.freedesktop.systemd1.Manager" @ravel.signal(name="JobRemoved", in_signature="uoss") def handle_job_removed(_1, path, _2, result): nonlocal jobs_removed jobs_removed[path] = result bus.listen_signal( systemd_path, False, systemd_iface, "JobRemoved", handle_job_removed ) unitpath = ( bus["org.freedesktop.systemd1"][systemd_path] .get_interface(systemd_iface) .StartTransientUnit( f"cgroup-{pid}.scope", "fail", [("PIDs", ("au", [pid]))], [] ) )[0] now = time.monotonic() deadline = now + 60 while unitpath not in jobs_removed: if not bus.connection.read_write_dispatch(timeout=deadline - now): break now = time.monotonic() if now > deadline: break bus.unlisten_signal( systemd_path, False, systemd_iface, "JobRemoved", handle_job_removed ) if unitpath not in jobs_removed: print("Error: timed out waiting for StartTransientUnit") sys.exit(1) if jobs_removed[unitpath] != "done": print("Error: StartTransientUnit failed: " + jobs_removed[unitpath]) sys.exit(1) def main() -> None: mycgroup = get_cgroup() mycgroupdir = pathlib.Path("/sys/fs/cgroup") / mycgroup.relative_to("/") if not os.access(mycgroupdir, os.W_OK): # For some shells - notably from graphical desktop environments, the # hiearchy is immediatly writeable. For others, we may create a scope # unit. if ravel is not None: start_transient_unit_with_ravel(os.getpid()) mycgroup = get_cgroup() mycgroupdir = pathlib.Path( "/sys/fs/cgroup" ) / mycgroup.relative_to("/") else: # Re-execute ourselves via systemd-run. if ( mycgroup.name.startswith("run-") and mycgroup.name.endswith(".scope") ): print( "Error: We're running in a .scope cgroup, but it is not writeable. Giving up." ) sys.exit(1) os.execvp( "systemd-run", ["systemd-run", "--user", "--scope"] + sys.argv, ) print("Error: Failed to re-execute myself inside systemd-run.") sys.exit(1) linuxnamespaces.unshare_user_idmap( [linuxnamespaces.IDMapping(os.getuid(), os.getuid(), 1)], [linuxnamespaces.IDMapping(os.getgid(), os.getgid(), 1)], linuxnamespaces.CloneFlags.NEWUSER | linuxnamespaces.CloneFlags.NEWNS | linuxnamespaces.CloneFlags.NEWCGROUP, ) cgroupfd = linuxnamespaces.open_tree( mycgroupdir, linuxnamespaces.OpenTreeFlags.OPEN_TREE_CLONE | linuxnamespaces.OpenTreeFlags.AT_RECURSIVE, ) linuxnamespaces.mount("tmpfs", "/sys", "tmpfs", data="mode=0755") os.mkdir("/sys/fs") os.mkdir("/sys/fs/cgroup") linuxnamespaces.mount( "tmpfs", "/sys", "tmpfs", linuxnamespaces.MountFlags.REMOUNT | linuxnamespaces.MountFlags.RDONLY | linuxnamespaces.MountFlags.NOEXEC | linuxnamespaces.MountFlags.NOSUID | linuxnamespaces.MountFlags.NODEV, "mode=0755", ) linuxnamespaces.move_mount(cgroupfd, "/sys/fs/cgroup") cgroupfd.close() os.execlp(os.environ["SHELL"], os.environ["SHELL"]) if __name__ == "__main__": main()