From b0874c6086f19809b1adf7f5e7a755ac6f146c9e Mon Sep 17 00:00:00 2001 From: Helmut Grohne Date: Tue, 23 Apr 2024 07:11:48 +0200 Subject: lift the dbus functionality from the cgroup example --- linuxnamespaces/systemd/__init__.py | 68 +++++++++++++++++++++++++++++++ linuxnamespaces/systemd/dbussy.py | 81 +++++++++++++++++++++++++++++++++++++ linuxnamespaces/systemd/jeepney.py | 77 +++++++++++++++++++++++++++++++++++ 3 files changed, 226 insertions(+) create mode 100644 linuxnamespaces/systemd/__init__.py create mode 100644 linuxnamespaces/systemd/dbussy.py create mode 100644 linuxnamespaces/systemd/jeepney.py (limited to 'linuxnamespaces/systemd') diff --git a/linuxnamespaces/systemd/__init__.py b/linuxnamespaces/systemd/__init__.py new file mode 100644 index 0000000..d8e7f86 --- /dev/null +++ b/linuxnamespaces/systemd/__init__.py @@ -0,0 +1,68 @@ +# Copyright 2024 Helmut Grohne +# SPDX-License-Identifier: GPL-3 + +"""Communicate with a systemd instance to create e.g. delegated croups.""" + +import os +import sys +import typing + + +async def start_transient_unit( + unitname: str, + pids: list[int] | None = None, + properties: dict[str, typing.Any] | None = None, + dbusdriver: typing.Literal["auto", "jeepney", "dbussy"] = "auto", +) -> None: + """Call the StartTransientUnit dbus method on the user manager.""" + dbus_properties: list[tuple[str, tuple[str, typing.Any]]] = [] + if pids is None: + pids = [os.getpid()] + dbus_properties.append(("PIDs", ("au", pids))) + for key, value in ({} if properties is None else properties).items(): + if isinstance(value, bool): + dbus_properties.append((key, ("b", value))) + elif isinstance(value, str): + dbus_properties.append((key, ("s", value))) + else: + raise ValueError( + f"cannot infer dbus type for property {key} value" + ) + if dbusdriver in ("auto", "jeepney"): + try: + from .jeepney import start_transient_unit as jeepney_impl + except ImportError: + pass + else: + return await jeepney_impl(unitname, dbus_properties) + if dbusdriver in ("auto", "dbussy"): + try: + from .dbussy import start_transient_unit as dbussy_impl + except ImportError: + pass + else: + return await dbussy_impl(unitname, dbus_properties) + raise NotImplementedError("requested dbusdriver not available") + + +def reexec_as_transient_unit( + unitname: str | None = None, + properties: dict[str, typing.Any] | None = None, + argv: list[str] | None = None, +) -> typing.NoReturn: + """Reexecute the current process via systemd-run thus placing it into a new + .scope unit. If no argv is given, sys.argv is used. + """ + execargs = ["systemd-run", "--user", "--scope"] + if unitname is not None: + execargs.append("--unit=" + unitname) + if properties: + for key, value in properties.items(): + if isinstance(value, int): + value = str(value) + elif not isinstance(value, str): + raise ValueError(f"cannot format property {key} value") + execargs.append(f"--property={key}={value}") + execargs.append("--") + execargs.extend(sys.argv if argv is None else argv) + os.execvp("systemd-run", execargs) diff --git a/linuxnamespaces/systemd/dbussy.py b/linuxnamespaces/systemd/dbussy.py new file mode 100644 index 0000000..77410df --- /dev/null +++ b/linuxnamespaces/systemd/dbussy.py @@ -0,0 +1,81 @@ +# Copyright 2024 Helmut Grohne +# SPDX-License-Identifier: GPL-3 + +"""Communicate with a systemd instance via dbus using dbussy.""" + +import asyncio +import typing + +import ravel + + +class SystemdJobWaiter: + """Context manager for waiting for a systemd job to complete. + Typical usage: + + with SystemdJobWaiter(bus) as wait: + job = create_a_job_on(bus) + result = await wait(job) + """ + + systemd_path = "/org/freedesktop/systemd1" + systemd_iface = "org.freedesktop.systemd1.Manager" + + def __init__(self, bus: ravel.Connection): + self.bus = bus + self.jobs_removed: dict[str, str] = {} + self.target_job: str | None = None + self.job_done = asyncio.get_running_loop().create_future() + + @ravel.signal(name="JobRemoved", in_signature="uoss") + def _on_job_removed( + self, _id: int, path: str, _unit: str, result: str + ) -> None: + if self.target_job is None: + self.jobs_removed[path] = result + elif self.target_job == path: + self.job_done.set_result(result) + + def __enter__(self) -> "SystemdJobWaiter": + self.bus.listen_signal( + self.systemd_path, + False, + self.systemd_iface, + "JobRemoved", + self._on_job_removed, + ) + return self + + async def __call__(self, job: str, timeout: int | float = 60) -> str: + assert self.target_job is None + self.target_job = job + try: + return self.jobs_removed[job] + except KeyError: + return await asyncio.wait_for(self.job_done, timeout) + + def __exit__(self, *exc_info: typing.Any) -> None: + self.bus.unlisten_signal( + self.systemd_path, + False, + self.systemd_iface, + "JobRemoved", + self._on_job_removed, + ) + + +async def start_transient_unit( + unitname: str, properties: list[tuple[str, tuple[str, typing.Any]]] +) -> None: + """Call the StartTransientUnit dbus method on the user manager. The given + properties are tuples of property names and dbus-typed values. + """ + bus = await ravel.session_bus_async() + with SystemdJobWaiter(bus) as wait: + result = await wait( + bus["org.freedesktop.systemd1"]["/org/freedesktop/systemd1"] + .get_interface("org.freedesktop.systemd1.Manager") + .StartTransientUnit(unitname, "fail", properties, [])[0], + ) + if result != "done": + raise OSError("StartTransientUnit failed: " + result) diff --git a/linuxnamespaces/systemd/jeepney.py b/linuxnamespaces/systemd/jeepney.py new file mode 100644 index 0000000..ef276d3 --- /dev/null +++ b/linuxnamespaces/systemd/jeepney.py @@ -0,0 +1,77 @@ +# Copyright 2024 Helmut Grohne +# SPDX-License-Identifier: GPL-3 + +"""Communicate with a systemd instance via dbus using jeepney.""" + +import asyncio +import contextlib +import typing + +import jeepney.io.asyncio + + +@contextlib.asynccontextmanager +async def jeepney_listen_signal( + router: jeepney.io.asyncio.DBusRouter, matchrule: jeepney.MatchRule, +) -> jeepney.io.asyncio.FilterHandle: + """Call AddMatch/RemoveMatch on context entry/exit and give filtered queue. + """ + jeepney.wrappers.unwrap_msg( + await router.send_and_get_reply( + jeepney.bus_messages.message_bus.AddMatch(matchrule) + ), + ) + try: + with router.filter(matchrule) as queue: + yield queue + finally: + jeepney.wrappers.unwrap_msg( + await router.send_and_get_reply( + jeepney.bus_messages.message_bus.RemoveMatch(matchrule) + ), + ) + + +async def start_transient_unit( + unitname: str, properties: list[tuple[str, tuple[str, typing.Any]]] +) -> None: + """Call the StartTransientUnit dbus method on the user manager. The given + properties are tuples of property names and dbus-typed values. + """ + async with ( + jeepney.io.asyncio.open_dbus_router() as router, + jeepney_listen_signal( + router, + jeepney.MatchRule( + type="signal", + interface="org.freedesktop.systemd1.Manager", + member="JobRemoved", + path="/org/freedesktop/systemd1", + ), + ) as queue, + ): + (scope_job,) = jeepney.wrappers.unwrap_msg( + await router.send_and_get_reply( + jeepney.new_method_call( + jeepney.DBusAddress( + "/org/freedesktop/systemd1", + bus_name="org.freedesktop.systemd1", + interface="org.freedesktop.systemd1.Manager", + ), + "StartTransientUnit", + "ssa(sv)a(sa(sv))", + (unitname, "fail", properties, []), + ), + ), + ) + loop = asyncio.get_running_loop() + deadline = loop.time() + 60 + while True: + message = jeepney.wrappers.unwrap_msg( + await asyncio.wait_for(queue.get(), deadline - loop.time()) + ) + if message[1] != scope_job: + continue + if message[3] != "done": + raise OSError("StartTransientUnit failed: " + message[3]) + return -- cgit v1.2.3