From 2411b941ea0cc8b95cd3492cefd35436ac94f86f Mon Sep 17 00:00:00 2001 From: Helmut Grohne Date: Fri, 19 Apr 2024 05:43:33 +0200 Subject: examples/cgroup.py: support jeepney as an alternative to ravel --- examples/cgroup.py | 94 +++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 89 insertions(+), 5 deletions(-) (limited to 'examples') diff --git a/examples/cgroup.py b/examples/cgroup.py index 8fa8457..90978dc 100755 --- a/examples/cgroup.py +++ b/examples/cgroup.py @@ -7,11 +7,17 @@ """ import asyncio +import contextlib import os import pathlib import sys import typing +try: + import jeepney.io.asyncio +except ImportError: + jeepney = None + try: import ravel except ImportError: @@ -34,12 +40,87 @@ def get_cgroup(pid: int = -1) -> pathlib.PurePath: ) +if jeepney is not None: + @contextlib.asynccontextmanager + async def jeepney_listen_signal( + router: jeepney.io.asyncio.DBusRouter, matchrule: jeepney.MatchRule, + ) -> jeepney.io.asyncio.FilterHandle: + """Call AddMatch/RemoveMatch on context entry/exit and give filtered + queue. + """ + jeepney.wrappers.unwrap_msg( + await router.send_and_get_reply( + jeepney.bus_messages.message_bus.AddMatch(matchrule) + ), + ) + try: + with router.filter(matchrule) as queue: + yield queue + finally: + jeepney.wrappers.unwrap_msg( + await router.send_and_get_reply( + jeepney.bus_messages.message_bus.RemoveMatch(matchrule) + ), + ) + + + async def start_transient_unit_with_jeepney(pid: int) -> None: + """Call the StartTransientUnit dbus method on the user manager for the + given pid. + """ + async with ( + jeepney.io.asyncio.open_dbus_router() as router, + jeepney_listen_signal( + router, + jeepney.MatchRule( + type="signal", + interface="org.freedesktop.systemd1.Manager", + member="JobRemoved", + path="/org/freedesktop/systemd1", + ), + ) as queue, + ): + (scope_job,) = jeepney.wrappers.unwrap_msg( + await router.send_and_get_reply( + jeepney.new_method_call( + jeepney.DBusAddress( + "/org/freedesktop/systemd1", + bus_name="org.freedesktop.systemd1", + interface="org.freedesktop.systemd1.Manager", + ), + "StartTransientUnit", + "ssa(sv)a(sa(sv))", + ( + f"cgroup-{pid}.scope", + "fail", + [ + ("PIDs", ("au", [pid])), + ("Delegate", ("b", True)), + ], + [], + ), + ), + ), + ) + loop = asyncio.get_running_loop() + deadline = loop.time() + 60 + while True: + message = jeepney.wrappers.unwrap_msg( + await asyncio.wait_for(queue.get(), deadline - loop.time()) + ) + if message[1] != scope_job: + continue + if message[3] != "done": + raise OSError("StartTransientUnit failed: " + message[3]) + return + + if ravel is not None: - class SystemdJobWaiter: + class RavelSystemdJobWaiter: """Context manager for waiting for a systemd job to complete. Typical usage: - with SystemdJobWaiter(bus) as wait: + with RavelSystemdJobWaiter(bus) as wait: job = create_a_job_on(bus) result = await wait(job) """ @@ -62,7 +143,7 @@ if ravel is not None: elif self.target_job == path: self.job_done.set_result(result) - def __enter__(self) -> "SystemdJobWaiter": + def __enter__(self) -> "RavelSystemdJobWaiter": self.bus.listen_signal( self.systemd_path, False, @@ -95,7 +176,7 @@ if ravel is not None: given pid. """ bus = await ravel.session_bus_async() - with SystemdJobWaiter(bus) as wait: + with RavelSystemdJobWaiter(bus) as wait: scope_job = ( bus["org.freedesktop.systemd1"]["/org/freedesktop/systemd1"] .get_interface("org.freedesktop.systemd1.Manager") @@ -120,7 +201,10 @@ def main() -> None: # For some shells - notably from graphical desktop environments, the # hierarchy is immediately writeable. For others, we may create a scope # unit. - if ravel is not None: + if jeepney is not None: + asyncio.run(start_transient_unit_with_jeepney(os.getpid())) + mycgroup = get_cgroup() + elif ravel is not None: asyncio.run(start_transient_unit_with_ravel(os.getpid())) mycgroup = get_cgroup() else: -- cgit v1.2.3