From b0874c6086f19809b1adf7f5e7a755ac6f146c9e Mon Sep 17 00:00:00 2001 From: Helmut Grohne Date: Tue, 23 Apr 2024 07:11:48 +0200 Subject: lift the dbus functionality from the cgroup example --- linuxnamespaces/systemd/jeepney.py | 77 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 77 insertions(+) create mode 100644 linuxnamespaces/systemd/jeepney.py (limited to 'linuxnamespaces/systemd/jeepney.py') diff --git a/linuxnamespaces/systemd/jeepney.py b/linuxnamespaces/systemd/jeepney.py new file mode 100644 index 0000000..ef276d3 --- /dev/null +++ b/linuxnamespaces/systemd/jeepney.py @@ -0,0 +1,77 @@ +# Copyright 2024 Helmut Grohne +# SPDX-License-Identifier: GPL-3 + +"""Communicate with a systemd instance via dbus using jeepney.""" + +import asyncio +import contextlib +import typing + +import jeepney.io.asyncio + + +@contextlib.asynccontextmanager +async def jeepney_listen_signal( + router: jeepney.io.asyncio.DBusRouter, matchrule: jeepney.MatchRule, +) -> jeepney.io.asyncio.FilterHandle: + """Call AddMatch/RemoveMatch on context entry/exit and give filtered queue. + """ + jeepney.wrappers.unwrap_msg( + await router.send_and_get_reply( + jeepney.bus_messages.message_bus.AddMatch(matchrule) + ), + ) + try: + with router.filter(matchrule) as queue: + yield queue + finally: + jeepney.wrappers.unwrap_msg( + await router.send_and_get_reply( + jeepney.bus_messages.message_bus.RemoveMatch(matchrule) + ), + ) + + +async def start_transient_unit( + unitname: str, properties: list[tuple[str, tuple[str, typing.Any]]] +) -> None: + """Call the StartTransientUnit dbus method on the user manager. The given + properties are tuples of property names and dbus-typed values. + """ + async with ( + jeepney.io.asyncio.open_dbus_router() as router, + jeepney_listen_signal( + router, + jeepney.MatchRule( + type="signal", + interface="org.freedesktop.systemd1.Manager", + member="JobRemoved", + path="/org/freedesktop/systemd1", + ), + ) as queue, + ): + (scope_job,) = jeepney.wrappers.unwrap_msg( + await router.send_and_get_reply( + jeepney.new_method_call( + jeepney.DBusAddress( + "/org/freedesktop/systemd1", + bus_name="org.freedesktop.systemd1", + interface="org.freedesktop.systemd1.Manager", + ), + "StartTransientUnit", + "ssa(sv)a(sa(sv))", + (unitname, "fail", properties, []), + ), + ), + ) + loop = asyncio.get_running_loop() + deadline = loop.time() + 60 + while True: + message = jeepney.wrappers.unwrap_msg( + await asyncio.wait_for(queue.get(), deadline - loop.time()) + ) + if message[1] != scope_job: + continue + if message[3] != "done": + raise OSError("StartTransientUnit failed: " + message[3]) + return -- cgit v1.2.3