1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
|
# Copyright 2024 Helmut Grohne <helmut@subdivi.de>
# SPDX-License-Identifier: GPL-3
"""Communicate with a systemd instance via dbus using jeepney."""
import asyncio
import contextlib
import typing
import jeepney.io.asyncio
@contextlib.asynccontextmanager
async def jeepney_listen_signal(
router: jeepney.io.asyncio.DBusRouter, matchrule: jeepney.MatchRule,
) -> jeepney.io.asyncio.FilterHandle:
"""Call AddMatch/RemoveMatch on context entry/exit and give filtered queue.
"""
jeepney.wrappers.unwrap_msg(
await router.send_and_get_reply(
jeepney.bus_messages.message_bus.AddMatch(matchrule)
),
)
try:
with router.filter(matchrule) as queue:
yield queue
finally:
jeepney.wrappers.unwrap_msg(
await router.send_and_get_reply(
jeepney.bus_messages.message_bus.RemoveMatch(matchrule)
),
)
async def start_transient_unit(
unitname: str, properties: list[tuple[str, tuple[str, typing.Any]]]
) -> None:
"""Call the StartTransientUnit dbus method on the user manager. The given
properties are tuples of property names and dbus-typed values.
"""
async with (
jeepney.io.asyncio.open_dbus_router() as router,
jeepney_listen_signal(
router,
jeepney.MatchRule(
type="signal",
interface="org.freedesktop.systemd1.Manager",
member="JobRemoved",
path="/org/freedesktop/systemd1",
),
) as queue,
):
(scope_job,) = jeepney.wrappers.unwrap_msg(
await router.send_and_get_reply(
jeepney.new_method_call(
jeepney.DBusAddress(
"/org/freedesktop/systemd1",
bus_name="org.freedesktop.systemd1",
interface="org.freedesktop.systemd1.Manager",
),
"StartTransientUnit",
"ssa(sv)a(sa(sv))",
(unitname, "fail", properties, []),
),
),
)
loop = asyncio.get_running_loop()
deadline = loop.time() + 60
while True:
message = jeepney.wrappers.unwrap_msg(
await asyncio.wait_for(queue.get(), deadline - loop.time())
)
if message[1] != scope_job:
continue
if message[3] != "done":
raise OSError("StartTransientUnit failed: " + message[3])
return
|