# Copyright 2024 Helmut Grohne # SPDX-License-Identifier: GPL-3 """Communicate with a systemd instance via dbus using jeepney.""" import asyncio import contextlib import typing import jeepney.io.asyncio @contextlib.asynccontextmanager async def jeepney_listen_signal( router: jeepney.io.asyncio.DBusRouter, matchrule: jeepney.MatchRule, ) -> jeepney.io.asyncio.FilterHandle: """Call AddMatch/RemoveMatch on context entry/exit and give filtered queue. """ jeepney.wrappers.unwrap_msg( await router.send_and_get_reply( jeepney.bus_messages.message_bus.AddMatch(matchrule) ), ) try: with router.filter(matchrule) as queue: yield queue finally: jeepney.wrappers.unwrap_msg( await router.send_and_get_reply( jeepney.bus_messages.message_bus.RemoveMatch(matchrule) ), ) async def start_transient_unit( unitname: str, properties: list[tuple[str, tuple[str, typing.Any]]] ) -> None: """Call the StartTransientUnit dbus method on the user manager. The given properties are tuples of property names and dbus-typed values. """ async with ( jeepney.io.asyncio.open_dbus_router() as router, jeepney_listen_signal( router, jeepney.MatchRule( type="signal", interface="org.freedesktop.systemd1.Manager", member="JobRemoved", path="/org/freedesktop/systemd1", ), ) as queue, ): (scope_job,) = jeepney.wrappers.unwrap_msg( await router.send_and_get_reply( jeepney.new_method_call( jeepney.DBusAddress( "/org/freedesktop/systemd1", bus_name="org.freedesktop.systemd1", interface="org.freedesktop.systemd1.Manager", ), "StartTransientUnit", "ssa(sv)a(sa(sv))", (unitname, "fail", properties, []), ), ), ) loop = asyncio.get_running_loop() deadline = loop.time() + 60 while True: message = jeepney.wrappers.unwrap_msg( await asyncio.wait_for(queue.get(), deadline - loop.time()) ) if message[1] != scope_job: continue if message[3] != "done": raise OSError("StartTransientUnit failed: " + message[3]) return