From b0874c6086f19809b1adf7f5e7a755ac6f146c9e Mon Sep 17 00:00:00 2001 From: Helmut Grohne Date: Tue, 23 Apr 2024 07:11:48 +0200 Subject: lift the dbus functionality from the cgroup example --- linuxnamespaces/systemd/dbussy.py | 81 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 81 insertions(+) create mode 100644 linuxnamespaces/systemd/dbussy.py (limited to 'linuxnamespaces/systemd/dbussy.py') diff --git a/linuxnamespaces/systemd/dbussy.py b/linuxnamespaces/systemd/dbussy.py new file mode 100644 index 0000000..77410df --- /dev/null +++ b/linuxnamespaces/systemd/dbussy.py @@ -0,0 +1,81 @@ +# Copyright 2024 Helmut Grohne +# SPDX-License-Identifier: GPL-3 + +"""Communicate with a systemd instance via dbus using dbussy.""" + +import asyncio +import typing + +import ravel + + +class SystemdJobWaiter: + """Context manager for waiting for a systemd job to complete. + Typical usage: + + with SystemdJobWaiter(bus) as wait: + job = create_a_job_on(bus) + result = await wait(job) + """ + + systemd_path = "/org/freedesktop/systemd1" + systemd_iface = "org.freedesktop.systemd1.Manager" + + def __init__(self, bus: ravel.Connection): + self.bus = bus + self.jobs_removed: dict[str, str] = {} + self.target_job: str | None = None + self.job_done = asyncio.get_running_loop().create_future() + + @ravel.signal(name="JobRemoved", in_signature="uoss") + def _on_job_removed( + self, _id: int, path: str, _unit: str, result: str + ) -> None: + if self.target_job is None: + self.jobs_removed[path] = result + elif self.target_job == path: + self.job_done.set_result(result) + + def __enter__(self) -> "SystemdJobWaiter": + self.bus.listen_signal( + self.systemd_path, + False, + self.systemd_iface, + "JobRemoved", + self._on_job_removed, + ) + return self + + async def __call__(self, job: str, timeout: int | float = 60) -> str: + assert self.target_job is None + self.target_job = job + try: + return self.jobs_removed[job] + except KeyError: + return await asyncio.wait_for(self.job_done, timeout) + + def __exit__(self, *exc_info: typing.Any) -> None: + self.bus.unlisten_signal( + self.systemd_path, + False, + self.systemd_iface, + "JobRemoved", + self._on_job_removed, + ) + + +async def start_transient_unit( + unitname: str, properties: list[tuple[str, tuple[str, typing.Any]]] +) -> None: + """Call the StartTransientUnit dbus method on the user manager. The given + properties are tuples of property names and dbus-typed values. + """ + bus = await ravel.session_bus_async() + with SystemdJobWaiter(bus) as wait: + result = await wait( + bus["org.freedesktop.systemd1"]["/org/freedesktop/systemd1"] + .get_interface("org.freedesktop.systemd1.Manager") + .StartTransientUnit(unitname, "fail", properties, [])[0], + ) + if result != "done": + raise OSError("StartTransientUnit failed: " + result) -- cgit v1.2.3