summaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
Diffstat (limited to 'examples')
-rwxr-xr-xexamples/cgroup.py94
1 files changed, 89 insertions, 5 deletions
diff --git a/examples/cgroup.py b/examples/cgroup.py
index 8fa8457..90978dc 100755
--- a/examples/cgroup.py
+++ b/examples/cgroup.py
@@ -7,12 +7,18 @@
"""
import asyncio
+import contextlib
import os
import pathlib
import sys
import typing
try:
+ import jeepney.io.asyncio
+except ImportError:
+ jeepney = None
+
+try:
import ravel
except ImportError:
ravel = None
@@ -34,12 +40,87 @@ def get_cgroup(pid: int = -1) -> pathlib.PurePath:
)
+if jeepney is not None:
+ @contextlib.asynccontextmanager
+ async def jeepney_listen_signal(
+ router: jeepney.io.asyncio.DBusRouter, matchrule: jeepney.MatchRule,
+ ) -> jeepney.io.asyncio.FilterHandle:
+ """Call AddMatch/RemoveMatch on context entry/exit and give filtered
+ queue.
+ """
+ jeepney.wrappers.unwrap_msg(
+ await router.send_and_get_reply(
+ jeepney.bus_messages.message_bus.AddMatch(matchrule)
+ ),
+ )
+ try:
+ with router.filter(matchrule) as queue:
+ yield queue
+ finally:
+ jeepney.wrappers.unwrap_msg(
+ await router.send_and_get_reply(
+ jeepney.bus_messages.message_bus.RemoveMatch(matchrule)
+ ),
+ )
+
+
+ async def start_transient_unit_with_jeepney(pid: int) -> None:
+ """Call the StartTransientUnit dbus method on the user manager for the
+ given pid.
+ """
+ async with (
+ jeepney.io.asyncio.open_dbus_router() as router,
+ jeepney_listen_signal(
+ router,
+ jeepney.MatchRule(
+ type="signal",
+ interface="org.freedesktop.systemd1.Manager",
+ member="JobRemoved",
+ path="/org/freedesktop/systemd1",
+ ),
+ ) as queue,
+ ):
+ (scope_job,) = jeepney.wrappers.unwrap_msg(
+ await router.send_and_get_reply(
+ jeepney.new_method_call(
+ jeepney.DBusAddress(
+ "/org/freedesktop/systemd1",
+ bus_name="org.freedesktop.systemd1",
+ interface="org.freedesktop.systemd1.Manager",
+ ),
+ "StartTransientUnit",
+ "ssa(sv)a(sa(sv))",
+ (
+ f"cgroup-{pid}.scope",
+ "fail",
+ [
+ ("PIDs", ("au", [pid])),
+ ("Delegate", ("b", True)),
+ ],
+ [],
+ ),
+ ),
+ ),
+ )
+ loop = asyncio.get_running_loop()
+ deadline = loop.time() + 60
+ while True:
+ message = jeepney.wrappers.unwrap_msg(
+ await asyncio.wait_for(queue.get(), deadline - loop.time())
+ )
+ if message[1] != scope_job:
+ continue
+ if message[3] != "done":
+ raise OSError("StartTransientUnit failed: " + message[3])
+ return
+
+
if ravel is not None:
- class SystemdJobWaiter:
+ class RavelSystemdJobWaiter:
"""Context manager for waiting for a systemd job to complete.
Typical usage:
- with SystemdJobWaiter(bus) as wait:
+ with RavelSystemdJobWaiter(bus) as wait:
job = create_a_job_on(bus)
result = await wait(job)
"""
@@ -62,7 +143,7 @@ if ravel is not None:
elif self.target_job == path:
self.job_done.set_result(result)
- def __enter__(self) -> "SystemdJobWaiter":
+ def __enter__(self) -> "RavelSystemdJobWaiter":
self.bus.listen_signal(
self.systemd_path,
False,
@@ -95,7 +176,7 @@ if ravel is not None:
given pid.
"""
bus = await ravel.session_bus_async()
- with SystemdJobWaiter(bus) as wait:
+ with RavelSystemdJobWaiter(bus) as wait:
scope_job = (
bus["org.freedesktop.systemd1"]["/org/freedesktop/systemd1"]
.get_interface("org.freedesktop.systemd1.Manager")
@@ -120,7 +201,10 @@ def main() -> None:
# For some shells - notably from graphical desktop environments, the
# hierarchy is immediately writeable. For others, we may create a scope
# unit.
- if ravel is not None:
+ if jeepney is not None:
+ asyncio.run(start_transient_unit_with_jeepney(os.getpid()))
+ mycgroup = get_cgroup()
+ elif ravel is not None:
asyncio.run(start_transient_unit_with_ravel(os.getpid()))
mycgroup = get_cgroup()
else: