#!/usr/bin/python3 # Copyright 2024 Helmut Grohne # SPDX-License-Identifier: GPL-3 """Unshare a cgroup (and user) namespace such that the entire cgroup hierarchy (inside the namespace) becomes writeable to the user. """ import asyncio import os import pathlib import sys try: import ravel except ImportError: ravel = None if __file__.split("/")[-2:-1] == ["examples"]: sys.path.insert(0, "/".join(__file__.split("/")[:-2])) import linuxnamespaces def get_cgroup(pid: int = -1) -> pathlib.PurePath: """Look up the cgroup that the given pid or the running process belongs to. """ return pathlib.PurePath( pathlib.Path( f"/proc/{pid}/cgroup" if pid > 0 else "/proc/self/cgroup" ).read_text().split(":", 2)[2].strip() ) async def start_transient_unit_with_ravel(pid: int) -> None: """Call the StartTransientUnit dbus method on the user manager for the given pid. """ bus = await ravel.session_bus_async() jobs_removed = {} scope_job = None systemd_path = "/org/freedesktop/systemd1" systemd_iface = "org.freedesktop.systemd1.Manager" scope_created = asyncio.get_running_loop().create_future() @ravel.signal(name="JobRemoved", in_signature="uoss") def handle_job_removed(_1, path, _2, result): nonlocal jobs_removed nonlocal scope_job nonlocal scope_created if scope_job is None: jobs_removed[path] = result elif path == scope_job: scope_created.set_result(result) bus.listen_signal( systemd_path, False, systemd_iface, "JobRemoved", handle_job_removed ) scope_job = ( bus["org.freedesktop.systemd1"][systemd_path] .get_interface(systemd_iface) .StartTransientUnit( f"cgroup-{pid}.scope", "fail", [("PIDs", ("au", [pid]))], [] ) )[0] if scope_job in jobs_removed: scope_created.set_result(jobs_removed[scope_job]) else: try: await asyncio.wait_for(scope_created, 60) except TimeoutError: print("Error: timed out waiting for StartTransientUnit") sys.exit(1) bus.unlisten_signal( systemd_path, False, systemd_iface, "JobRemoved", handle_job_removed ) if scope_created.result() != "done": print("Error: StartTransientUnit failed: " + scope_created.result()) sys.exit(1) def main() -> None: mycgroup = get_cgroup() if not os.access( pathlib.Path("/sys/fs/cgroup") / mycgroup.relative_to("/"), os.W_OK, ): # For some shells - notably from graphical desktop environments, the # hiearchy is immediatly writeable. For others, we may create a scope # unit. if ravel is not None: asyncio.get_event_loop().run_until_complete( start_transient_unit_with_ravel(os.getpid()) ) mycgroup = get_cgroup() else: # Re-execute ourselves via systemd-run. if ( mycgroup.name.startswith("run-") and mycgroup.name.endswith(".scope") ): print( "Error: We're running in a .scope cgroup, but it is not writeable. Giving up." ) sys.exit(1) os.execvp( "systemd-run", ["systemd-run", "--user", "--scope"] + sys.argv, ) print("Error: Failed to re-execute myself inside systemd-run.") sys.exit(1) linuxnamespaces.unshare_user_idmap( [linuxnamespaces.IDMapping(os.getuid(), os.getuid(), 1)], [linuxnamespaces.IDMapping(os.getgid(), os.getgid(), 1)], linuxnamespaces.CloneFlags.NEWUSER | linuxnamespaces.CloneFlags.NEWNS | linuxnamespaces.CloneFlags.NEWCGROUP, ) linuxnamespaces.populate_sys("/", "/", mycgroup) os.execlp(os.environ["SHELL"], os.environ["SHELL"]) if __name__ == "__main__": main()