summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xexamples/cgroup.py145
1 files changed, 73 insertions, 72 deletions
diff --git a/examples/cgroup.py b/examples/cgroup.py
index 8f81a85..da222f9 100755
--- a/examples/cgroup.py
+++ b/examples/cgroup.py
@@ -34,80 +34,81 @@ def get_cgroup(pid: int = -1) -> pathlib.PurePath:
)
-class SystemdJobWaiter:
- """Context manager for waiting for a systemd job to complete.
- Typical usage:
+if ravel is not None:
+ class SystemdJobWaiter:
+ """Context manager for waiting for a systemd job to complete.
+ Typical usage:
+
+ with SystemdJobWaiter(bus) as wait:
+ job = create_a_job_on(bus)
+ result = await wait(job)
+ """
+
+ systemd_path = "/org/freedesktop/systemd1"
+ systemd_iface = "org.freedesktop.systemd1.Manager"
+
+ def __init__(self, bus: ravel.Connection):
+ self.bus = bus
+ self.jobs_removed: dict[str, str] = {}
+ self.target_job: str | None = None
+ self.job_done = asyncio.get_running_loop().create_future()
+
+ @ravel.signal(name="JobRemoved", in_signature="uoss")
+ def _on_job_removed(
+ self, _id: int, path: str, _unit: str, result: str
+ ) -> None:
+ if self.target_job is None:
+ self.jobs_removed[path] = result
+ elif self.target_job == path:
+ self.job_done.set_result(result)
+
+ def __enter__(self) -> "SystemdJobWaiter":
+ self.bus.listen_signal(
+ self.systemd_path,
+ False,
+ self.systemd_iface,
+ "JobRemoved",
+ self._on_job_removed,
+ )
+ return self
+
+ async def __call__(self, job: str, timeout: int | float = 60) -> str:
+ assert self.target_job is None
+ self.target_job = job
+ try:
+ return self.jobs_removed[job]
+ except KeyError:
+ return await asyncio.wait_for(self.job_done, timeout)
+
+ def __exit__(self, *exc_info: typing.Any) -> None:
+ self.bus.unlisten_signal(
+ self.systemd_path,
+ False,
+ self.systemd_iface,
+ "JobRemoved",
+ self._on_job_removed,
+ )
- with SystemdJobWaiter(bus) as wait:
- job = create_a_job_on(bus)
- result = await wait(job)
- """
- systemd_path = "/org/freedesktop/systemd1"
- systemd_iface = "org.freedesktop.systemd1.Manager"
-
- def __init__(self, bus: ravel.Connection):
- self.bus = bus
- self.jobs_removed: dict[str, str] = {}
- self.target_job: str | None = None
- self.job_done = asyncio.get_running_loop().create_future()
-
- @ravel.signal(name="JobRemoved", in_signature="uoss")
- def _on_job_removed(
- self, _id: int, path: str, _unit: str, result: str
- ) -> None:
- if self.target_job is None:
- self.jobs_removed[path] = result
- elif self.target_job == path:
- self.job_done.set_result(result)
-
- def __enter__(self) -> "SystemdJobWaiter":
- self.bus.listen_signal(
- self.systemd_path,
- False,
- self.systemd_iface,
- "JobRemoved",
- self._on_job_removed,
- )
- return self
-
- async def __call__(self, job: str, timeout: int | float = 60) -> str:
- assert self.target_job is None
- self.target_job = job
- try:
- return self.jobs_removed[job]
- except KeyError:
- return await asyncio.wait_for(self.job_done, timeout)
-
- def __exit__(self, _1: typing.Any, _2: typing.Any, _3: typing.Any) -> None:
- self.bus.unlisten_signal(
- self.systemd_path,
- False,
- self.systemd_iface,
- "JobRemoved",
- self._on_job_removed,
- )
-
-
-async def start_transient_unit_with_ravel(pid: int) -> None:
- """Call the StartTransientUnit dbus method on the user manager for the
- given pid.
- """
- bus = await ravel.session_bus_async()
- with SystemdJobWaiter(bus) as wait:
- scope_job = (
- bus["org.freedesktop.systemd1"]["/org/freedesktop/systemd1"]
- .get_interface("org.freedesktop.systemd1.Manager")
- .StartTransientUnit(
- f"cgroup-{pid}.scope",
- "fail",
- [("PIDs", ("au", [pid])), ("Delegate", ("b", True))],
- [],
- )
- )[0]
- result = await wait(scope_job)
- if result != "done":
- raise OSError("StartTransientUnit failed: " + result)
+ async def start_transient_unit_with_ravel(pid: int) -> None:
+ """Call the StartTransientUnit dbus method on the user manager for the
+ given pid.
+ """
+ bus = await ravel.session_bus_async()
+ with SystemdJobWaiter(bus) as wait:
+ scope_job = (
+ bus["org.freedesktop.systemd1"]["/org/freedesktop/systemd1"]
+ .get_interface("org.freedesktop.systemd1.Manager")
+ .StartTransientUnit(
+ f"cgroup-{pid}.scope",
+ "fail",
+ [("PIDs", ("au", [pid])), ("Delegate", ("b", True))],
+ [],
+ )
+ )[0]
+ result = await wait(scope_job)
+ if result != "done":
+ raise OSError("StartTransientUnit failed: " + result)
def main() -> None: