From f3f5068e99dd3a915defd57d9fc276a26164b1ff Mon Sep 17 00:00:00 2001 From: Helmut Grohne Date: Thu, 18 Apr 2024 17:01:43 +0200 Subject: examples/cgroup.py: extract a context manager waiting for systemd jobs --- examples/cgroup.py | 103 +++++++++++++++++++++++++++++++++-------------------- 1 file changed, 65 insertions(+), 38 deletions(-) (limited to 'examples') diff --git a/examples/cgroup.py b/examples/cgroup.py index ff1a087..1c94d32 100755 --- a/examples/cgroup.py +++ b/examples/cgroup.py @@ -10,6 +10,7 @@ import asyncio import os import pathlib import sys +import typing try: import ravel @@ -33,51 +34,77 @@ def get_cgroup(pid: int = -1) -> pathlib.PurePath: ) -async def start_transient_unit_with_ravel(pid: int) -> None: - """Call the StartTransientUnit dbus method on the user manager for the - given pid. +class SystemdJobWaiter: + """Context manager for waiting for a systemd job to complete. + Typical usage: + + with SystemdJobWaiter(bus) as wait: + job = create_a_job_on(bus) + result = await wait(job) """ - bus = await ravel.session_bus_async() - jobs_removed = {} - scope_job = None + systemd_path = "/org/freedesktop/systemd1" systemd_iface = "org.freedesktop.systemd1.Manager" - scope_created = asyncio.get_running_loop().create_future() + + def __init__(self, bus: ravel.Connection): + self.bus = bus + self.jobs_removed: dict[str, str] = {} + self.target_job: str | None = None + self.job_done = asyncio.get_running_loop().create_future() @ravel.signal(name="JobRemoved", in_signature="uoss") - def handle_job_removed(_1, path, _2, result): - nonlocal jobs_removed - nonlocal scope_job - nonlocal scope_created - if scope_job is None: - jobs_removed[path] = result - elif path == scope_job: - scope_created.set_result(result) - - bus.listen_signal( - systemd_path, False, systemd_iface, "JobRemoved", handle_job_removed - ) - scope_job = ( - bus["org.freedesktop.systemd1"][systemd_path] - .get_interface(systemd_iface) - .StartTransientUnit( - f"cgroup-{pid}.scope", "fail", [("PIDs", ("au", [pid]))], [] + def _on_job_removed( + self, _id: int, path: str, _unit: str, result: str + ) -> None: + if self.target_job is None: + self.jobs_removed[path] = result + elif self.target_job == path: + self.job_done.set_result(result) + + def __enter__(self) -> "SystemdJobWaiter": + self.bus.listen_signal( + self.systemd_path, + False, + self.systemd_iface, + "JobRemoved", + self._on_job_removed, ) - )[0] - if scope_job in jobs_removed: - scope_created.set_result(jobs_removed[scope_job]) - else: + return self + + async def __call__(self, job: str, timeout: int | float = 60) -> str: + assert self.target_job is None + self.target_job = job try: - await asyncio.wait_for(scope_created, 60) - except TimeoutError: - print("Error: timed out waiting for StartTransientUnit") - sys.exit(1) - bus.unlisten_signal( - systemd_path, False, systemd_iface, "JobRemoved", handle_job_removed - ) - if scope_created.result() != "done": - print("Error: StartTransientUnit failed: " + scope_created.result()) - sys.exit(1) + return self.jobs_removed[job] + except KeyError: + return await asyncio.wait_for(self.job_done, timeout) + + def __exit__(self, _1: typing.Any, _2: typing.Any, _3: typing.Any) -> None: + self.bus.unlisten_signal( + self.systemd_path, + False, + self.systemd_iface, + "JobRemoved", + self._on_job_removed, + ) + + +async def start_transient_unit_with_ravel(pid: int) -> None: + """Call the StartTransientUnit dbus method on the user manager for the + given pid. + """ + bus = await ravel.session_bus_async() + with SystemdJobWaiter(bus) as wait: + scope_job = ( + bus["org.freedesktop.systemd1"]["/org/freedesktop/systemd1"] + .get_interface("org.freedesktop.systemd1.Manager") + .StartTransientUnit( + f"cgroup-{pid}.scope", "fail", [("PIDs", ("au", [pid]))], [] + ) + )[0] + result = await wait(scope_job) + if result != "done": + raise OSError("StartTransientUnit failed: " + result) def main() -> None: -- cgit v1.2.3