summaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
Diffstat (limited to 'examples')
-rwxr-xr-xexamples/cgroup.py103
1 files changed, 65 insertions, 38 deletions
diff --git a/examples/cgroup.py b/examples/cgroup.py
index ff1a087..1c94d32 100755
--- a/examples/cgroup.py
+++ b/examples/cgroup.py
@@ -10,6 +10,7 @@ import asyncio
import os
import pathlib
import sys
+import typing
try:
import ravel
@@ -33,51 +34,77 @@ def get_cgroup(pid: int = -1) -> pathlib.PurePath:
)
-async def start_transient_unit_with_ravel(pid: int) -> None:
- """Call the StartTransientUnit dbus method on the user manager for the
- given pid.
+class SystemdJobWaiter:
+ """Context manager for waiting for a systemd job to complete.
+ Typical usage:
+
+ with SystemdJobWaiter(bus) as wait:
+ job = create_a_job_on(bus)
+ result = await wait(job)
"""
- bus = await ravel.session_bus_async()
- jobs_removed = {}
- scope_job = None
+
systemd_path = "/org/freedesktop/systemd1"
systemd_iface = "org.freedesktop.systemd1.Manager"
- scope_created = asyncio.get_running_loop().create_future()
+
+ def __init__(self, bus: ravel.Connection):
+ self.bus = bus
+ self.jobs_removed: dict[str, str] = {}
+ self.target_job: str | None = None
+ self.job_done = asyncio.get_running_loop().create_future()
@ravel.signal(name="JobRemoved", in_signature="uoss")
- def handle_job_removed(_1, path, _2, result):
- nonlocal jobs_removed
- nonlocal scope_job
- nonlocal scope_created
- if scope_job is None:
- jobs_removed[path] = result
- elif path == scope_job:
- scope_created.set_result(result)
-
- bus.listen_signal(
- systemd_path, False, systemd_iface, "JobRemoved", handle_job_removed
- )
- scope_job = (
- bus["org.freedesktop.systemd1"][systemd_path]
- .get_interface(systemd_iface)
- .StartTransientUnit(
- f"cgroup-{pid}.scope", "fail", [("PIDs", ("au", [pid]))], []
+ def _on_job_removed(
+ self, _id: int, path: str, _unit: str, result: str
+ ) -> None:
+ if self.target_job is None:
+ self.jobs_removed[path] = result
+ elif self.target_job == path:
+ self.job_done.set_result(result)
+
+ def __enter__(self) -> "SystemdJobWaiter":
+ self.bus.listen_signal(
+ self.systemd_path,
+ False,
+ self.systemd_iface,
+ "JobRemoved",
+ self._on_job_removed,
)
- )[0]
- if scope_job in jobs_removed:
- scope_created.set_result(jobs_removed[scope_job])
- else:
+ return self
+
+ async def __call__(self, job: str, timeout: int | float = 60) -> str:
+ assert self.target_job is None
+ self.target_job = job
try:
- await asyncio.wait_for(scope_created, 60)
- except TimeoutError:
- print("Error: timed out waiting for StartTransientUnit")
- sys.exit(1)
- bus.unlisten_signal(
- systemd_path, False, systemd_iface, "JobRemoved", handle_job_removed
- )
- if scope_created.result() != "done":
- print("Error: StartTransientUnit failed: " + scope_created.result())
- sys.exit(1)
+ return self.jobs_removed[job]
+ except KeyError:
+ return await asyncio.wait_for(self.job_done, timeout)
+
+ def __exit__(self, _1: typing.Any, _2: typing.Any, _3: typing.Any) -> None:
+ self.bus.unlisten_signal(
+ self.systemd_path,
+ False,
+ self.systemd_iface,
+ "JobRemoved",
+ self._on_job_removed,
+ )
+
+
+async def start_transient_unit_with_ravel(pid: int) -> None:
+ """Call the StartTransientUnit dbus method on the user manager for the
+ given pid.
+ """
+ bus = await ravel.session_bus_async()
+ with SystemdJobWaiter(bus) as wait:
+ scope_job = (
+ bus["org.freedesktop.systemd1"]["/org/freedesktop/systemd1"]
+ .get_interface("org.freedesktop.systemd1.Manager")
+ .StartTransientUnit(
+ f"cgroup-{pid}.scope", "fail", [("PIDs", ("au", [pid]))], []
+ )
+ )[0]
+ result = await wait(scope_job)
+ if result != "done":
+ raise OSError("StartTransientUnit failed: " + result)
def main() -> None: