From b0874c6086f19809b1adf7f5e7a755ac6f146c9e Mon Sep 17 00:00:00 2001 From: Helmut Grohne Date: Tue, 23 Apr 2024 07:11:48 +0200 Subject: lift the dbus functionality from the cgroup example --- examples/cgroup.py | 203 +++-------------------------------------------------- 1 file changed, 11 insertions(+), 192 deletions(-) (limited to 'examples/cgroup.py') diff --git a/examples/cgroup.py b/examples/cgroup.py index 90978dc..28a1f7c 100755 --- a/examples/cgroup.py +++ b/examples/cgroup.py @@ -7,26 +7,15 @@ """ import asyncio -import contextlib import os import pathlib import sys -import typing - -try: - import jeepney.io.asyncio -except ImportError: - jeepney = None - -try: - import ravel -except ImportError: - ravel = None if __file__.split("/")[-2:-1] == ["examples"]: sys.path.insert(0, "/".join(__file__.split("/")[:-2])) import linuxnamespaces +import linuxnamespaces.systemd def get_cgroup(pid: int = -1) -> pathlib.PurePath: @@ -40,158 +29,6 @@ def get_cgroup(pid: int = -1) -> pathlib.PurePath: ) -if jeepney is not None: - @contextlib.asynccontextmanager - async def jeepney_listen_signal( - router: jeepney.io.asyncio.DBusRouter, matchrule: jeepney.MatchRule, - ) -> jeepney.io.asyncio.FilterHandle: - """Call AddMatch/RemoveMatch on context entry/exit and give filtered - queue. - """ - jeepney.wrappers.unwrap_msg( - await router.send_and_get_reply( - jeepney.bus_messages.message_bus.AddMatch(matchrule) - ), - ) - try: - with router.filter(matchrule) as queue: - yield queue - finally: - jeepney.wrappers.unwrap_msg( - await router.send_and_get_reply( - jeepney.bus_messages.message_bus.RemoveMatch(matchrule) - ), - ) - - - async def start_transient_unit_with_jeepney(pid: int) -> None: - """Call the StartTransientUnit dbus method on the user manager for the - given pid. - """ - async with ( - jeepney.io.asyncio.open_dbus_router() as router, - jeepney_listen_signal( - router, - jeepney.MatchRule( - type="signal", - interface="org.freedesktop.systemd1.Manager", - member="JobRemoved", - path="/org/freedesktop/systemd1", - ), - ) as queue, - ): - (scope_job,) = jeepney.wrappers.unwrap_msg( - await router.send_and_get_reply( - jeepney.new_method_call( - jeepney.DBusAddress( - "/org/freedesktop/systemd1", - bus_name="org.freedesktop.systemd1", - interface="org.freedesktop.systemd1.Manager", - ), - "StartTransientUnit", - "ssa(sv)a(sa(sv))", - ( - f"cgroup-{pid}.scope", - "fail", - [ - ("PIDs", ("au", [pid])), - ("Delegate", ("b", True)), - ], - [], - ), - ), - ), - ) - loop = asyncio.get_running_loop() - deadline = loop.time() + 60 - while True: - message = jeepney.wrappers.unwrap_msg( - await asyncio.wait_for(queue.get(), deadline - loop.time()) - ) - if message[1] != scope_job: - continue - if message[3] != "done": - raise OSError("StartTransientUnit failed: " + message[3]) - return - - -if ravel is not None: - class RavelSystemdJobWaiter: - """Context manager for waiting for a systemd job to complete. - Typical usage: - - with RavelSystemdJobWaiter(bus) as wait: - job = create_a_job_on(bus) - result = await wait(job) - """ - - systemd_path = "/org/freedesktop/systemd1" - systemd_iface = "org.freedesktop.systemd1.Manager" - - def __init__(self, bus: ravel.Connection): - self.bus = bus - self.jobs_removed: dict[str, str] = {} - self.target_job: str | None = None - self.job_done = asyncio.get_running_loop().create_future() - - @ravel.signal(name="JobRemoved", in_signature="uoss") - def _on_job_removed( - self, _id: int, path: str, _unit: str, result: str - ) -> None: - if self.target_job is None: - self.jobs_removed[path] = result - elif self.target_job == path: - self.job_done.set_result(result) - - def __enter__(self) -> "RavelSystemdJobWaiter": - self.bus.listen_signal( - self.systemd_path, - False, - self.systemd_iface, - "JobRemoved", - self._on_job_removed, - ) - return self - - async def __call__(self, job: str, timeout: int | float = 60) -> str: - assert self.target_job is None - self.target_job = job - try: - return self.jobs_removed[job] - except KeyError: - return await asyncio.wait_for(self.job_done, timeout) - - def __exit__(self, *exc_info: typing.Any) -> None: - self.bus.unlisten_signal( - self.systemd_path, - False, - self.systemd_iface, - "JobRemoved", - self._on_job_removed, - ) - - - async def start_transient_unit_with_ravel(pid: int) -> None: - """Call the StartTransientUnit dbus method on the user manager for the - given pid. - """ - bus = await ravel.session_bus_async() - with RavelSystemdJobWaiter(bus) as wait: - scope_job = ( - bus["org.freedesktop.systemd1"]["/org/freedesktop/systemd1"] - .get_interface("org.freedesktop.systemd1.Manager") - .StartTransientUnit( - f"cgroup-{pid}.scope", - "fail", - [("PIDs", ("au", [pid])), ("Delegate", ("b", True))], - [], - ) - )[0] - result = await wait(scope_job) - if result != "done": - raise OSError("StartTransientUnit failed: " + result) - - def main() -> None: mycgroup = get_cgroup() if not os.access( @@ -201,35 +38,17 @@ def main() -> None: # For some shells - notably from graphical desktop environments, the # hierarchy is immediately writeable. For others, we may create a scope # unit. - if jeepney is not None: - asyncio.run(start_transient_unit_with_jeepney(os.getpid())) - mycgroup = get_cgroup() - elif ravel is not None: - asyncio.run(start_transient_unit_with_ravel(os.getpid())) - mycgroup = get_cgroup() - else: - # Re-execute ourselves via systemd-run. - if ( - mycgroup.name.startswith("run-") - and mycgroup.name.endswith(".scope") - ): - print( - "Error: We're running in a .scope cgroup, but it is not writeable. Giving up." - ) - sys.exit(1) - os.execvp( - "systemd-run", - [ - "systemd-run", - "--user", - "--scope", - "--property", - "Delegate=true", - *sys.argv, - ], + try: + asyncio.run( + linuxnamespaces.systemd.start_transient_unit( + f"cgroup-{os.getpid()}.scope", + properties={"Delegate": True}, + ), + ) + except NotImplementedError: + linuxnamespaces.systemd.reexec_as_transient_unit( + properties={"Delegate": True} ) - print("Error: Failed to re-execute myself inside systemd-run.") - sys.exit(1) linuxnamespaces.unshare_user_idmap( [linuxnamespaces.IDMapping(os.getuid(), os.getuid(), 1)], [linuxnamespaces.IDMapping(os.getgid(), os.getgid(), 1)], -- cgit v1.2.3