From a75ebb7c6b558feb9a3b93ace6cbca54bccb067a Mon Sep 17 00:00:00 2001 From: Helmut Grohne Date: Fri, 19 Apr 2024 05:29:50 +0200 Subject: examples/cgroup.py: do not fail when ravel is unavailable --- examples/cgroup.py | 145 +++++++++++++++++++++++++++-------------------------- 1 file changed, 73 insertions(+), 72 deletions(-) (limited to 'examples/cgroup.py') diff --git a/examples/cgroup.py b/examples/cgroup.py index 8f81a85..da222f9 100755 --- a/examples/cgroup.py +++ b/examples/cgroup.py @@ -34,80 +34,81 @@ def get_cgroup(pid: int = -1) -> pathlib.PurePath: ) -class SystemdJobWaiter: - """Context manager for waiting for a systemd job to complete. - Typical usage: +if ravel is not None: + class SystemdJobWaiter: + """Context manager for waiting for a systemd job to complete. + Typical usage: + + with SystemdJobWaiter(bus) as wait: + job = create_a_job_on(bus) + result = await wait(job) + """ + + systemd_path = "/org/freedesktop/systemd1" + systemd_iface = "org.freedesktop.systemd1.Manager" + + def __init__(self, bus: ravel.Connection): + self.bus = bus + self.jobs_removed: dict[str, str] = {} + self.target_job: str | None = None + self.job_done = asyncio.get_running_loop().create_future() + + @ravel.signal(name="JobRemoved", in_signature="uoss") + def _on_job_removed( + self, _id: int, path: str, _unit: str, result: str + ) -> None: + if self.target_job is None: + self.jobs_removed[path] = result + elif self.target_job == path: + self.job_done.set_result(result) + + def __enter__(self) -> "SystemdJobWaiter": + self.bus.listen_signal( + self.systemd_path, + False, + self.systemd_iface, + "JobRemoved", + self._on_job_removed, + ) + return self + + async def __call__(self, job: str, timeout: int | float = 60) -> str: + assert self.target_job is None + self.target_job = job + try: + return self.jobs_removed[job] + except KeyError: + return await asyncio.wait_for(self.job_done, timeout) + + def __exit__(self, *exc_info: typing.Any) -> None: + self.bus.unlisten_signal( + self.systemd_path, + False, + self.systemd_iface, + "JobRemoved", + self._on_job_removed, + ) - with SystemdJobWaiter(bus) as wait: - job = create_a_job_on(bus) - result = await wait(job) - """ - systemd_path = "/org/freedesktop/systemd1" - systemd_iface = "org.freedesktop.systemd1.Manager" - - def __init__(self, bus: ravel.Connection): - self.bus = bus - self.jobs_removed: dict[str, str] = {} - self.target_job: str | None = None - self.job_done = asyncio.get_running_loop().create_future() - - @ravel.signal(name="JobRemoved", in_signature="uoss") - def _on_job_removed( - self, _id: int, path: str, _unit: str, result: str - ) -> None: - if self.target_job is None: - self.jobs_removed[path] = result - elif self.target_job == path: - self.job_done.set_result(result) - - def __enter__(self) -> "SystemdJobWaiter": - self.bus.listen_signal( - self.systemd_path, - False, - self.systemd_iface, - "JobRemoved", - self._on_job_removed, - ) - return self - - async def __call__(self, job: str, timeout: int | float = 60) -> str: - assert self.target_job is None - self.target_job = job - try: - return self.jobs_removed[job] - except KeyError: - return await asyncio.wait_for(self.job_done, timeout) - - def __exit__(self, _1: typing.Any, _2: typing.Any, _3: typing.Any) -> None: - self.bus.unlisten_signal( - self.systemd_path, - False, - self.systemd_iface, - "JobRemoved", - self._on_job_removed, - ) - - -async def start_transient_unit_with_ravel(pid: int) -> None: - """Call the StartTransientUnit dbus method on the user manager for the - given pid. - """ - bus = await ravel.session_bus_async() - with SystemdJobWaiter(bus) as wait: - scope_job = ( - bus["org.freedesktop.systemd1"]["/org/freedesktop/systemd1"] - .get_interface("org.freedesktop.systemd1.Manager") - .StartTransientUnit( - f"cgroup-{pid}.scope", - "fail", - [("PIDs", ("au", [pid])), ("Delegate", ("b", True))], - [], - ) - )[0] - result = await wait(scope_job) - if result != "done": - raise OSError("StartTransientUnit failed: " + result) + async def start_transient_unit_with_ravel(pid: int) -> None: + """Call the StartTransientUnit dbus method on the user manager for the + given pid. + """ + bus = await ravel.session_bus_async() + with SystemdJobWaiter(bus) as wait: + scope_job = ( + bus["org.freedesktop.systemd1"]["/org/freedesktop/systemd1"] + .get_interface("org.freedesktop.systemd1.Manager") + .StartTransientUnit( + f"cgroup-{pid}.scope", + "fail", + [("PIDs", ("au", [pid])), ("Delegate", ("b", True))], + [], + ) + )[0] + result = await wait(scope_job) + if result != "done": + raise OSError("StartTransientUnit failed: " + result) def main() -> None: -- cgit v1.2.3