From b0874c6086f19809b1adf7f5e7a755ac6f146c9e Mon Sep 17 00:00:00 2001 From: Helmut Grohne Date: Tue, 23 Apr 2024 07:11:48 +0200 Subject: lift the dbus functionality from the cgroup example --- examples/cgroup.py | 203 ++---------------------------------- linuxnamespaces/systemd/__init__.py | 68 ++++++++++++ linuxnamespaces/systemd/dbussy.py | 81 ++++++++++++++ linuxnamespaces/systemd/jeepney.py | 77 ++++++++++++++ pyproject.toml | 6 +- 5 files changed, 241 insertions(+), 194 deletions(-) create mode 100644 linuxnamespaces/systemd/__init__.py create mode 100644 linuxnamespaces/systemd/dbussy.py create mode 100644 linuxnamespaces/systemd/jeepney.py diff --git a/examples/cgroup.py b/examples/cgroup.py index 90978dc..28a1f7c 100755 --- a/examples/cgroup.py +++ b/examples/cgroup.py @@ -7,26 +7,15 @@ """ import asyncio -import contextlib import os import pathlib import sys -import typing - -try: - import jeepney.io.asyncio -except ImportError: - jeepney = None - -try: - import ravel -except ImportError: - ravel = None if __file__.split("/")[-2:-1] == ["examples"]: sys.path.insert(0, "/".join(__file__.split("/")[:-2])) import linuxnamespaces +import linuxnamespaces.systemd def get_cgroup(pid: int = -1) -> pathlib.PurePath: @@ -40,158 +29,6 @@ def get_cgroup(pid: int = -1) -> pathlib.PurePath: ) -if jeepney is not None: - @contextlib.asynccontextmanager - async def jeepney_listen_signal( - router: jeepney.io.asyncio.DBusRouter, matchrule: jeepney.MatchRule, - ) -> jeepney.io.asyncio.FilterHandle: - """Call AddMatch/RemoveMatch on context entry/exit and give filtered - queue. - """ - jeepney.wrappers.unwrap_msg( - await router.send_and_get_reply( - jeepney.bus_messages.message_bus.AddMatch(matchrule) - ), - ) - try: - with router.filter(matchrule) as queue: - yield queue - finally: - jeepney.wrappers.unwrap_msg( - await router.send_and_get_reply( - jeepney.bus_messages.message_bus.RemoveMatch(matchrule) - ), - ) - - - async def start_transient_unit_with_jeepney(pid: int) -> None: - """Call the StartTransientUnit dbus method on the user manager for the - given pid. - """ - async with ( - jeepney.io.asyncio.open_dbus_router() as router, - jeepney_listen_signal( - router, - jeepney.MatchRule( - type="signal", - interface="org.freedesktop.systemd1.Manager", - member="JobRemoved", - path="/org/freedesktop/systemd1", - ), - ) as queue, - ): - (scope_job,) = jeepney.wrappers.unwrap_msg( - await router.send_and_get_reply( - jeepney.new_method_call( - jeepney.DBusAddress( - "/org/freedesktop/systemd1", - bus_name="org.freedesktop.systemd1", - interface="org.freedesktop.systemd1.Manager", - ), - "StartTransientUnit", - "ssa(sv)a(sa(sv))", - ( - f"cgroup-{pid}.scope", - "fail", - [ - ("PIDs", ("au", [pid])), - ("Delegate", ("b", True)), - ], - [], - ), - ), - ), - ) - loop = asyncio.get_running_loop() - deadline = loop.time() + 60 - while True: - message = jeepney.wrappers.unwrap_msg( - await asyncio.wait_for(queue.get(), deadline - loop.time()) - ) - if message[1] != scope_job: - continue - if message[3] != "done": - raise OSError("StartTransientUnit failed: " + message[3]) - return - - -if ravel is not None: - class RavelSystemdJobWaiter: - """Context manager for waiting for a systemd job to complete. - Typical usage: - - with RavelSystemdJobWaiter(bus) as wait: - job = create_a_job_on(bus) - result = await wait(job) - """ - - systemd_path = "/org/freedesktop/systemd1" - systemd_iface = "org.freedesktop.systemd1.Manager" - - def __init__(self, bus: ravel.Connection): - self.bus = bus - self.jobs_removed: dict[str, str] = {} - self.target_job: str | None = None - self.job_done = asyncio.get_running_loop().create_future() - - @ravel.signal(name="JobRemoved", in_signature="uoss") - def _on_job_removed( - self, _id: int, path: str, _unit: str, result: str - ) -> None: - if self.target_job is None: - self.jobs_removed[path] = result - elif self.target_job == path: - self.job_done.set_result(result) - - def __enter__(self) -> "RavelSystemdJobWaiter": - self.bus.listen_signal( - self.systemd_path, - False, - self.systemd_iface, - "JobRemoved", - self._on_job_removed, - ) - return self - - async def __call__(self, job: str, timeout: int | float = 60) -> str: - assert self.target_job is None - self.target_job = job - try: - return self.jobs_removed[job] - except KeyError: - return await asyncio.wait_for(self.job_done, timeout) - - def __exit__(self, *exc_info: typing.Any) -> None: - self.bus.unlisten_signal( - self.systemd_path, - False, - self.systemd_iface, - "JobRemoved", - self._on_job_removed, - ) - - - async def start_transient_unit_with_ravel(pid: int) -> None: - """Call the StartTransientUnit dbus method on the user manager for the - given pid. - """ - bus = await ravel.session_bus_async() - with RavelSystemdJobWaiter(bus) as wait: - scope_job = ( - bus["org.freedesktop.systemd1"]["/org/freedesktop/systemd1"] - .get_interface("org.freedesktop.systemd1.Manager") - .StartTransientUnit( - f"cgroup-{pid}.scope", - "fail", - [("PIDs", ("au", [pid])), ("Delegate", ("b", True))], - [], - ) - )[0] - result = await wait(scope_job) - if result != "done": - raise OSError("StartTransientUnit failed: " + result) - - def main() -> None: mycgroup = get_cgroup() if not os.access( @@ -201,35 +38,17 @@ def main() -> None: # For some shells - notably from graphical desktop environments, the # hierarchy is immediately writeable. For others, we may create a scope # unit. - if jeepney is not None: - asyncio.run(start_transient_unit_with_jeepney(os.getpid())) - mycgroup = get_cgroup() - elif ravel is not None: - asyncio.run(start_transient_unit_with_ravel(os.getpid())) - mycgroup = get_cgroup() - else: - # Re-execute ourselves via systemd-run. - if ( - mycgroup.name.startswith("run-") - and mycgroup.name.endswith(".scope") - ): - print( - "Error: We're running in a .scope cgroup, but it is not writeable. Giving up." - ) - sys.exit(1) - os.execvp( - "systemd-run", - [ - "systemd-run", - "--user", - "--scope", - "--property", - "Delegate=true", - *sys.argv, - ], + try: + asyncio.run( + linuxnamespaces.systemd.start_transient_unit( + f"cgroup-{os.getpid()}.scope", + properties={"Delegate": True}, + ), + ) + except NotImplementedError: + linuxnamespaces.systemd.reexec_as_transient_unit( + properties={"Delegate": True} ) - print("Error: Failed to re-execute myself inside systemd-run.") - sys.exit(1) linuxnamespaces.unshare_user_idmap( [linuxnamespaces.IDMapping(os.getuid(), os.getuid(), 1)], [linuxnamespaces.IDMapping(os.getgid(), os.getgid(), 1)], diff --git a/linuxnamespaces/systemd/__init__.py b/linuxnamespaces/systemd/__init__.py new file mode 100644 index 0000000..d8e7f86 --- /dev/null +++ b/linuxnamespaces/systemd/__init__.py @@ -0,0 +1,68 @@ +# Copyright 2024 Helmut Grohne +# SPDX-License-Identifier: GPL-3 + +"""Communicate with a systemd instance to create e.g. delegated croups.""" + +import os +import sys +import typing + + +async def start_transient_unit( + unitname: str, + pids: list[int] | None = None, + properties: dict[str, typing.Any] | None = None, + dbusdriver: typing.Literal["auto", "jeepney", "dbussy"] = "auto", +) -> None: + """Call the StartTransientUnit dbus method on the user manager.""" + dbus_properties: list[tuple[str, tuple[str, typing.Any]]] = [] + if pids is None: + pids = [os.getpid()] + dbus_properties.append(("PIDs", ("au", pids))) + for key, value in ({} if properties is None else properties).items(): + if isinstance(value, bool): + dbus_properties.append((key, ("b", value))) + elif isinstance(value, str): + dbus_properties.append((key, ("s", value))) + else: + raise ValueError( + f"cannot infer dbus type for property {key} value" + ) + if dbusdriver in ("auto", "jeepney"): + try: + from .jeepney import start_transient_unit as jeepney_impl + except ImportError: + pass + else: + return await jeepney_impl(unitname, dbus_properties) + if dbusdriver in ("auto", "dbussy"): + try: + from .dbussy import start_transient_unit as dbussy_impl + except ImportError: + pass + else: + return await dbussy_impl(unitname, dbus_properties) + raise NotImplementedError("requested dbusdriver not available") + + +def reexec_as_transient_unit( + unitname: str | None = None, + properties: dict[str, typing.Any] | None = None, + argv: list[str] | None = None, +) -> typing.NoReturn: + """Reexecute the current process via systemd-run thus placing it into a new + .scope unit. If no argv is given, sys.argv is used. + """ + execargs = ["systemd-run", "--user", "--scope"] + if unitname is not None: + execargs.append("--unit=" + unitname) + if properties: + for key, value in properties.items(): + if isinstance(value, int): + value = str(value) + elif not isinstance(value, str): + raise ValueError(f"cannot format property {key} value") + execargs.append(f"--property={key}={value}") + execargs.append("--") + execargs.extend(sys.argv if argv is None else argv) + os.execvp("systemd-run", execargs) diff --git a/linuxnamespaces/systemd/dbussy.py b/linuxnamespaces/systemd/dbussy.py new file mode 100644 index 0000000..77410df --- /dev/null +++ b/linuxnamespaces/systemd/dbussy.py @@ -0,0 +1,81 @@ +# Copyright 2024 Helmut Grohne +# SPDX-License-Identifier: GPL-3 + +"""Communicate with a systemd instance via dbus using dbussy.""" + +import asyncio +import typing + +import ravel + + +class SystemdJobWaiter: + """Context manager for waiting for a systemd job to complete. + Typical usage: + + with SystemdJobWaiter(bus) as wait: + job = create_a_job_on(bus) + result = await wait(job) + """ + + systemd_path = "/org/freedesktop/systemd1" + systemd_iface = "org.freedesktop.systemd1.Manager" + + def __init__(self, bus: ravel.Connection): + self.bus = bus + self.jobs_removed: dict[str, str] = {} + self.target_job: str | None = None + self.job_done = asyncio.get_running_loop().create_future() + + @ravel.signal(name="JobRemoved", in_signature="uoss") + def _on_job_removed( + self, _id: int, path: str, _unit: str, result: str + ) -> None: + if self.target_job is None: + self.jobs_removed[path] = result + elif self.target_job == path: + self.job_done.set_result(result) + + def __enter__(self) -> "SystemdJobWaiter": + self.bus.listen_signal( + self.systemd_path, + False, + self.systemd_iface, + "JobRemoved", + self._on_job_removed, + ) + return self + + async def __call__(self, job: str, timeout: int | float = 60) -> str: + assert self.target_job is None + self.target_job = job + try: + return self.jobs_removed[job] + except KeyError: + return await asyncio.wait_for(self.job_done, timeout) + + def __exit__(self, *exc_info: typing.Any) -> None: + self.bus.unlisten_signal( + self.systemd_path, + False, + self.systemd_iface, + "JobRemoved", + self._on_job_removed, + ) + + +async def start_transient_unit( + unitname: str, properties: list[tuple[str, tuple[str, typing.Any]]] +) -> None: + """Call the StartTransientUnit dbus method on the user manager. The given + properties are tuples of property names and dbus-typed values. + """ + bus = await ravel.session_bus_async() + with SystemdJobWaiter(bus) as wait: + result = await wait( + bus["org.freedesktop.systemd1"]["/org/freedesktop/systemd1"] + .get_interface("org.freedesktop.systemd1.Manager") + .StartTransientUnit(unitname, "fail", properties, [])[0], + ) + if result != "done": + raise OSError("StartTransientUnit failed: " + result) diff --git a/linuxnamespaces/systemd/jeepney.py b/linuxnamespaces/systemd/jeepney.py new file mode 100644 index 0000000..ef276d3 --- /dev/null +++ b/linuxnamespaces/systemd/jeepney.py @@ -0,0 +1,77 @@ +# Copyright 2024 Helmut Grohne +# SPDX-License-Identifier: GPL-3 + +"""Communicate with a systemd instance via dbus using jeepney.""" + +import asyncio +import contextlib +import typing + +import jeepney.io.asyncio + + +@contextlib.asynccontextmanager +async def jeepney_listen_signal( + router: jeepney.io.asyncio.DBusRouter, matchrule: jeepney.MatchRule, +) -> jeepney.io.asyncio.FilterHandle: + """Call AddMatch/RemoveMatch on context entry/exit and give filtered queue. + """ + jeepney.wrappers.unwrap_msg( + await router.send_and_get_reply( + jeepney.bus_messages.message_bus.AddMatch(matchrule) + ), + ) + try: + with router.filter(matchrule) as queue: + yield queue + finally: + jeepney.wrappers.unwrap_msg( + await router.send_and_get_reply( + jeepney.bus_messages.message_bus.RemoveMatch(matchrule) + ), + ) + + +async def start_transient_unit( + unitname: str, properties: list[tuple[str, tuple[str, typing.Any]]] +) -> None: + """Call the StartTransientUnit dbus method on the user manager. The given + properties are tuples of property names and dbus-typed values. + """ + async with ( + jeepney.io.asyncio.open_dbus_router() as router, + jeepney_listen_signal( + router, + jeepney.MatchRule( + type="signal", + interface="org.freedesktop.systemd1.Manager", + member="JobRemoved", + path="/org/freedesktop/systemd1", + ), + ) as queue, + ): + (scope_job,) = jeepney.wrappers.unwrap_msg( + await router.send_and_get_reply( + jeepney.new_method_call( + jeepney.DBusAddress( + "/org/freedesktop/systemd1", + bus_name="org.freedesktop.systemd1", + interface="org.freedesktop.systemd1.Manager", + ), + "StartTransientUnit", + "ssa(sv)a(sa(sv))", + (unitname, "fail", properties, []), + ), + ), + ) + loop = asyncio.get_running_loop() + deadline = loop.time() + 60 + while True: + message = jeepney.wrappers.unwrap_msg( + await asyncio.wait_for(queue.get(), deadline - loop.time()) + ) + if message[1] != scope_job: + continue + if message[3] != "done": + raise OSError("StartTransientUnit failed: " + message[3]) + return diff --git a/pyproject.toml b/pyproject.toml index b226a33..481593a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,9 +12,11 @@ classifiers = [ requires-python = ">=3.9" [project.optional-dependencies] +# linuxnamespaces.systemd needs jeepney or dbussy, not both. +jeepney = ["jeepney"] +dbussy = ["dbussy"] test = ["pytest", "pytest-forked", "pytest-subtests"] -# We want jeepney or dbussy, not both. -examples = ["dbussy", "jeepney", "zstandard"] +examples = ["zstandard"] [tool.black] line-length = 79 -- cgit v1.2.3