#!/usr/bin/python3 # Copyright 2024 Helmut Grohne # SPDX-License-Identifier: GPL-3 """Unshare a cgroup (and user) namespace such that the entire cgroup hierarchy (inside the namespace) becomes writeable to the user. """ import asyncio import os import pathlib import sys import typing try: import ravel except ImportError: ravel = None if __file__.split("/")[-2:-1] == ["examples"]: sys.path.insert(0, "/".join(__file__.split("/")[:-2])) import linuxnamespaces def get_cgroup(pid: int = -1) -> pathlib.PurePath: """Look up the cgroup that the given pid or the running process belongs to. """ return pathlib.PurePath( pathlib.Path( f"/proc/{pid}/cgroup" if pid > 0 else "/proc/self/cgroup" ).read_text().split(":", 2)[2].strip() ) class SystemdJobWaiter: """Context manager for waiting for a systemd job to complete. Typical usage: with SystemdJobWaiter(bus) as wait: job = create_a_job_on(bus) result = await wait(job) """ systemd_path = "/org/freedesktop/systemd1" systemd_iface = "org.freedesktop.systemd1.Manager" def __init__(self, bus: ravel.Connection): self.bus = bus self.jobs_removed: dict[str, str] = {} self.target_job: str | None = None self.job_done = asyncio.get_running_loop().create_future() @ravel.signal(name="JobRemoved", in_signature="uoss") def _on_job_removed( self, _id: int, path: str, _unit: str, result: str ) -> None: if self.target_job is None: self.jobs_removed[path] = result elif self.target_job == path: self.job_done.set_result(result) def __enter__(self) -> "SystemdJobWaiter": self.bus.listen_signal( self.systemd_path, False, self.systemd_iface, "JobRemoved", self._on_job_removed, ) return self async def __call__(self, job: str, timeout: int | float = 60) -> str: assert self.target_job is None self.target_job = job try: return self.jobs_removed[job] except KeyError: return await asyncio.wait_for(self.job_done, timeout) def __exit__(self, _1: typing.Any, _2: typing.Any, _3: typing.Any) -> None: self.bus.unlisten_signal( self.systemd_path, False, self.systemd_iface, "JobRemoved", self._on_job_removed, ) async def start_transient_unit_with_ravel(pid: int) -> None: """Call the StartTransientUnit dbus method on the user manager for the given pid. """ bus = await ravel.session_bus_async() with SystemdJobWaiter(bus) as wait: scope_job = ( bus["org.freedesktop.systemd1"]["/org/freedesktop/systemd1"] .get_interface("org.freedesktop.systemd1.Manager") .StartTransientUnit( f"cgroup-{pid}.scope", "fail", [("PIDs", ("au", [pid])), ("Delegate", ("b", True))], [], ) )[0] result = await wait(scope_job) if result != "done": raise OSError("StartTransientUnit failed: " + result) def main() -> None: mycgroup = get_cgroup() if not os.access( pathlib.Path("/sys/fs/cgroup") / mycgroup.relative_to("/"), os.W_OK, ): # For some shells - notably from graphical desktop environments, the # hierarchy is immediately writeable. For others, we may create a scope # unit. if ravel is not None: asyncio.get_event_loop().run_until_complete( start_transient_unit_with_ravel(os.getpid()) ) mycgroup = get_cgroup() else: # Re-execute ourselves via systemd-run. if ( mycgroup.name.startswith("run-") and mycgroup.name.endswith(".scope") ): print( "Error: We're running in a .scope cgroup, but it is not writeable. Giving up." ) sys.exit(1) os.execvp( "systemd-run", [ "systemd-run", "--user", "--scope", "--property", "Delegate=true", *sys.argv, ], ) print("Error: Failed to re-execute myself inside systemd-run.") sys.exit(1) linuxnamespaces.unshare_user_idmap( [linuxnamespaces.IDMapping(os.getuid(), os.getuid(), 1)], [linuxnamespaces.IDMapping(os.getgid(), os.getgid(), 1)], linuxnamespaces.CloneFlags.NEWUSER | linuxnamespaces.CloneFlags.NEWNS | linuxnamespaces.CloneFlags.NEWCGROUP, ) linuxnamespaces.populate_sys("/", "/", mycgroup) os.execlp(os.environ["SHELL"], os.environ["SHELL"]) if __name__ == "__main__": main()