summaryrefslogtreecommitdiff
path: root/linuxnamespaces/systemd/jeepney.py
diff options
context:
space:
mode:
Diffstat (limited to 'linuxnamespaces/systemd/jeepney.py')
-rw-r--r--linuxnamespaces/systemd/jeepney.py77
1 files changed, 77 insertions, 0 deletions
diff --git a/linuxnamespaces/systemd/jeepney.py b/linuxnamespaces/systemd/jeepney.py
new file mode 100644
index 0000000..ef276d3
--- /dev/null
+++ b/linuxnamespaces/systemd/jeepney.py
@@ -0,0 +1,77 @@
+# Copyright 2024 Helmut Grohne <helmut@subdivi.de>
+# SPDX-License-Identifier: GPL-3
+
+"""Communicate with a systemd instance via dbus using jeepney."""
+
+import asyncio
+import contextlib
+import typing
+
+import jeepney.io.asyncio
+
+
+@contextlib.asynccontextmanager
+async def jeepney_listen_signal(
+ router: jeepney.io.asyncio.DBusRouter, matchrule: jeepney.MatchRule,
+) -> jeepney.io.asyncio.FilterHandle:
+ """Call AddMatch/RemoveMatch on context entry/exit and give filtered queue.
+ """
+ jeepney.wrappers.unwrap_msg(
+ await router.send_and_get_reply(
+ jeepney.bus_messages.message_bus.AddMatch(matchrule)
+ ),
+ )
+ try:
+ with router.filter(matchrule) as queue:
+ yield queue
+ finally:
+ jeepney.wrappers.unwrap_msg(
+ await router.send_and_get_reply(
+ jeepney.bus_messages.message_bus.RemoveMatch(matchrule)
+ ),
+ )
+
+
+async def start_transient_unit(
+ unitname: str, properties: list[tuple[str, tuple[str, typing.Any]]]
+) -> None:
+ """Call the StartTransientUnit dbus method on the user manager. The given
+ properties are tuples of property names and dbus-typed values.
+ """
+ async with (
+ jeepney.io.asyncio.open_dbus_router() as router,
+ jeepney_listen_signal(
+ router,
+ jeepney.MatchRule(
+ type="signal",
+ interface="org.freedesktop.systemd1.Manager",
+ member="JobRemoved",
+ path="/org/freedesktop/systemd1",
+ ),
+ ) as queue,
+ ):
+ (scope_job,) = jeepney.wrappers.unwrap_msg(
+ await router.send_and_get_reply(
+ jeepney.new_method_call(
+ jeepney.DBusAddress(
+ "/org/freedesktop/systemd1",
+ bus_name="org.freedesktop.systemd1",
+ interface="org.freedesktop.systemd1.Manager",
+ ),
+ "StartTransientUnit",
+ "ssa(sv)a(sa(sv))",
+ (unitname, "fail", properties, []),
+ ),
+ ),
+ )
+ loop = asyncio.get_running_loop()
+ deadline = loop.time() + 60
+ while True:
+ message = jeepney.wrappers.unwrap_msg(
+ await asyncio.wait_for(queue.get(), deadline - loop.time())
+ )
+ if message[1] != scope_job:
+ continue
+ if message[3] != "done":
+ raise OSError("StartTransientUnit failed: " + message[3])
+ return