summaryrefslogtreecommitdiff
path: root/linuxnamespaces/systemd/dbussy.py
diff options
context:
space:
mode:
authorHelmut Grohne <helmut@subdivi.de>2024-04-23 07:11:48 +0200
committerHelmut Grohne <helmut@subdivi.de>2024-04-23 07:11:48 +0200
commitb0874c6086f19809b1adf7f5e7a755ac6f146c9e (patch)
treee778269925050cfa56c1a197e975be10c6415f71 /linuxnamespaces/systemd/dbussy.py
parent2411b941ea0cc8b95cd3492cefd35436ac94f86f (diff)
downloadpython-linuxnamespaces-b0874c6086f19809b1adf7f5e7a755ac6f146c9e.tar.gz
lift the dbus functionality from the cgroup example
Diffstat (limited to 'linuxnamespaces/systemd/dbussy.py')
-rw-r--r--linuxnamespaces/systemd/dbussy.py81
1 files changed, 81 insertions, 0 deletions
diff --git a/linuxnamespaces/systemd/dbussy.py b/linuxnamespaces/systemd/dbussy.py
new file mode 100644
index 0000000..77410df
--- /dev/null
+++ b/linuxnamespaces/systemd/dbussy.py
@@ -0,0 +1,81 @@
+# Copyright 2024 Helmut Grohne <helmut@subdivi.de>
+# SPDX-License-Identifier: GPL-3
+
+"""Communicate with a systemd instance via dbus using dbussy."""
+
+import asyncio
+import typing
+
+import ravel
+
+
+class SystemdJobWaiter:
+ """Context manager for waiting for a systemd job to complete.
+ Typical usage:
+
+ with SystemdJobWaiter(bus) as wait:
+ job = create_a_job_on(bus)
+ result = await wait(job)
+ """
+
+ systemd_path = "/org/freedesktop/systemd1"
+ systemd_iface = "org.freedesktop.systemd1.Manager"
+
+ def __init__(self, bus: ravel.Connection):
+ self.bus = bus
+ self.jobs_removed: dict[str, str] = {}
+ self.target_job: str | None = None
+ self.job_done = asyncio.get_running_loop().create_future()
+
+ @ravel.signal(name="JobRemoved", in_signature="uoss")
+ def _on_job_removed(
+ self, _id: int, path: str, _unit: str, result: str
+ ) -> None:
+ if self.target_job is None:
+ self.jobs_removed[path] = result
+ elif self.target_job == path:
+ self.job_done.set_result(result)
+
+ def __enter__(self) -> "SystemdJobWaiter":
+ self.bus.listen_signal(
+ self.systemd_path,
+ False,
+ self.systemd_iface,
+ "JobRemoved",
+ self._on_job_removed,
+ )
+ return self
+
+ async def __call__(self, job: str, timeout: int | float = 60) -> str:
+ assert self.target_job is None
+ self.target_job = job
+ try:
+ return self.jobs_removed[job]
+ except KeyError:
+ return await asyncio.wait_for(self.job_done, timeout)
+
+ def __exit__(self, *exc_info: typing.Any) -> None:
+ self.bus.unlisten_signal(
+ self.systemd_path,
+ False,
+ self.systemd_iface,
+ "JobRemoved",
+ self._on_job_removed,
+ )
+
+
+async def start_transient_unit(
+ unitname: str, properties: list[tuple[str, tuple[str, typing.Any]]]
+) -> None:
+ """Call the StartTransientUnit dbus method on the user manager. The given
+ properties are tuples of property names and dbus-typed values.
+ """
+ bus = await ravel.session_bus_async()
+ with SystemdJobWaiter(bus) as wait:
+ result = await wait(
+ bus["org.freedesktop.systemd1"]["/org/freedesktop/systemd1"]
+ .get_interface("org.freedesktop.systemd1.Manager")
+ .StartTransientUnit(unitname, "fail", properties, [])[0],
+ )
+ if result != "done":
+ raise OSError("StartTransientUnit failed: " + result)