summaryrefslogtreecommitdiff
path: root/linuxnamespaces
diff options
context:
space:
mode:
authorHelmut Grohne <helmut@subdivi.de>2024-04-23 07:11:48 +0200
committerHelmut Grohne <helmut@subdivi.de>2024-04-23 07:11:48 +0200
commitb0874c6086f19809b1adf7f5e7a755ac6f146c9e (patch)
treee778269925050cfa56c1a197e975be10c6415f71 /linuxnamespaces
parent2411b941ea0cc8b95cd3492cefd35436ac94f86f (diff)
downloadpython-linuxnamespaces-b0874c6086f19809b1adf7f5e7a755ac6f146c9e.tar.gz
lift the dbus functionality from the cgroup example
Diffstat (limited to 'linuxnamespaces')
-rw-r--r--linuxnamespaces/systemd/__init__.py68
-rw-r--r--linuxnamespaces/systemd/dbussy.py81
-rw-r--r--linuxnamespaces/systemd/jeepney.py77
3 files changed, 226 insertions, 0 deletions
diff --git a/linuxnamespaces/systemd/__init__.py b/linuxnamespaces/systemd/__init__.py
new file mode 100644
index 0000000..d8e7f86
--- /dev/null
+++ b/linuxnamespaces/systemd/__init__.py
@@ -0,0 +1,68 @@
+# Copyright 2024 Helmut Grohne <helmut@subdivi.de>
+# SPDX-License-Identifier: GPL-3
+
+"""Communicate with a systemd instance to create e.g. delegated croups."""
+
+import os
+import sys
+import typing
+
+
+async def start_transient_unit(
+ unitname: str,
+ pids: list[int] | None = None,
+ properties: dict[str, typing.Any] | None = None,
+ dbusdriver: typing.Literal["auto", "jeepney", "dbussy"] = "auto",
+) -> None:
+ """Call the StartTransientUnit dbus method on the user manager."""
+ dbus_properties: list[tuple[str, tuple[str, typing.Any]]] = []
+ if pids is None:
+ pids = [os.getpid()]
+ dbus_properties.append(("PIDs", ("au", pids)))
+ for key, value in ({} if properties is None else properties).items():
+ if isinstance(value, bool):
+ dbus_properties.append((key, ("b", value)))
+ elif isinstance(value, str):
+ dbus_properties.append((key, ("s", value)))
+ else:
+ raise ValueError(
+ f"cannot infer dbus type for property {key} value"
+ )
+ if dbusdriver in ("auto", "jeepney"):
+ try:
+ from .jeepney import start_transient_unit as jeepney_impl
+ except ImportError:
+ pass
+ else:
+ return await jeepney_impl(unitname, dbus_properties)
+ if dbusdriver in ("auto", "dbussy"):
+ try:
+ from .dbussy import start_transient_unit as dbussy_impl
+ except ImportError:
+ pass
+ else:
+ return await dbussy_impl(unitname, dbus_properties)
+ raise NotImplementedError("requested dbusdriver not available")
+
+
+def reexec_as_transient_unit(
+ unitname: str | None = None,
+ properties: dict[str, typing.Any] | None = None,
+ argv: list[str] | None = None,
+) -> typing.NoReturn:
+ """Reexecute the current process via systemd-run thus placing it into a new
+ .scope unit. If no argv is given, sys.argv is used.
+ """
+ execargs = ["systemd-run", "--user", "--scope"]
+ if unitname is not None:
+ execargs.append("--unit=" + unitname)
+ if properties:
+ for key, value in properties.items():
+ if isinstance(value, int):
+ value = str(value)
+ elif not isinstance(value, str):
+ raise ValueError(f"cannot format property {key} value")
+ execargs.append(f"--property={key}={value}")
+ execargs.append("--")
+ execargs.extend(sys.argv if argv is None else argv)
+ os.execvp("systemd-run", execargs)
diff --git a/linuxnamespaces/systemd/dbussy.py b/linuxnamespaces/systemd/dbussy.py
new file mode 100644
index 0000000..77410df
--- /dev/null
+++ b/linuxnamespaces/systemd/dbussy.py
@@ -0,0 +1,81 @@
+# Copyright 2024 Helmut Grohne <helmut@subdivi.de>
+# SPDX-License-Identifier: GPL-3
+
+"""Communicate with a systemd instance via dbus using dbussy."""
+
+import asyncio
+import typing
+
+import ravel
+
+
+class SystemdJobWaiter:
+ """Context manager for waiting for a systemd job to complete.
+ Typical usage:
+
+ with SystemdJobWaiter(bus) as wait:
+ job = create_a_job_on(bus)
+ result = await wait(job)
+ """
+
+ systemd_path = "/org/freedesktop/systemd1"
+ systemd_iface = "org.freedesktop.systemd1.Manager"
+
+ def __init__(self, bus: ravel.Connection):
+ self.bus = bus
+ self.jobs_removed: dict[str, str] = {}
+ self.target_job: str | None = None
+ self.job_done = asyncio.get_running_loop().create_future()
+
+ @ravel.signal(name="JobRemoved", in_signature="uoss")
+ def _on_job_removed(
+ self, _id: int, path: str, _unit: str, result: str
+ ) -> None:
+ if self.target_job is None:
+ self.jobs_removed[path] = result
+ elif self.target_job == path:
+ self.job_done.set_result(result)
+
+ def __enter__(self) -> "SystemdJobWaiter":
+ self.bus.listen_signal(
+ self.systemd_path,
+ False,
+ self.systemd_iface,
+ "JobRemoved",
+ self._on_job_removed,
+ )
+ return self
+
+ async def __call__(self, job: str, timeout: int | float = 60) -> str:
+ assert self.target_job is None
+ self.target_job = job
+ try:
+ return self.jobs_removed[job]
+ except KeyError:
+ return await asyncio.wait_for(self.job_done, timeout)
+
+ def __exit__(self, *exc_info: typing.Any) -> None:
+ self.bus.unlisten_signal(
+ self.systemd_path,
+ False,
+ self.systemd_iface,
+ "JobRemoved",
+ self._on_job_removed,
+ )
+
+
+async def start_transient_unit(
+ unitname: str, properties: list[tuple[str, tuple[str, typing.Any]]]
+) -> None:
+ """Call the StartTransientUnit dbus method on the user manager. The given
+ properties are tuples of property names and dbus-typed values.
+ """
+ bus = await ravel.session_bus_async()
+ with SystemdJobWaiter(bus) as wait:
+ result = await wait(
+ bus["org.freedesktop.systemd1"]["/org/freedesktop/systemd1"]
+ .get_interface("org.freedesktop.systemd1.Manager")
+ .StartTransientUnit(unitname, "fail", properties, [])[0],
+ )
+ if result != "done":
+ raise OSError("StartTransientUnit failed: " + result)
diff --git a/linuxnamespaces/systemd/jeepney.py b/linuxnamespaces/systemd/jeepney.py
new file mode 100644
index 0000000..ef276d3
--- /dev/null
+++ b/linuxnamespaces/systemd/jeepney.py
@@ -0,0 +1,77 @@
+# Copyright 2024 Helmut Grohne <helmut@subdivi.de>
+# SPDX-License-Identifier: GPL-3
+
+"""Communicate with a systemd instance via dbus using jeepney."""
+
+import asyncio
+import contextlib
+import typing
+
+import jeepney.io.asyncio
+
+
+@contextlib.asynccontextmanager
+async def jeepney_listen_signal(
+ router: jeepney.io.asyncio.DBusRouter, matchrule: jeepney.MatchRule,
+) -> jeepney.io.asyncio.FilterHandle:
+ """Call AddMatch/RemoveMatch on context entry/exit and give filtered queue.
+ """
+ jeepney.wrappers.unwrap_msg(
+ await router.send_and_get_reply(
+ jeepney.bus_messages.message_bus.AddMatch(matchrule)
+ ),
+ )
+ try:
+ with router.filter(matchrule) as queue:
+ yield queue
+ finally:
+ jeepney.wrappers.unwrap_msg(
+ await router.send_and_get_reply(
+ jeepney.bus_messages.message_bus.RemoveMatch(matchrule)
+ ),
+ )
+
+
+async def start_transient_unit(
+ unitname: str, properties: list[tuple[str, tuple[str, typing.Any]]]
+) -> None:
+ """Call the StartTransientUnit dbus method on the user manager. The given
+ properties are tuples of property names and dbus-typed values.
+ """
+ async with (
+ jeepney.io.asyncio.open_dbus_router() as router,
+ jeepney_listen_signal(
+ router,
+ jeepney.MatchRule(
+ type="signal",
+ interface="org.freedesktop.systemd1.Manager",
+ member="JobRemoved",
+ path="/org/freedesktop/systemd1",
+ ),
+ ) as queue,
+ ):
+ (scope_job,) = jeepney.wrappers.unwrap_msg(
+ await router.send_and_get_reply(
+ jeepney.new_method_call(
+ jeepney.DBusAddress(
+ "/org/freedesktop/systemd1",
+ bus_name="org.freedesktop.systemd1",
+ interface="org.freedesktop.systemd1.Manager",
+ ),
+ "StartTransientUnit",
+ "ssa(sv)a(sa(sv))",
+ (unitname, "fail", properties, []),
+ ),
+ ),
+ )
+ loop = asyncio.get_running_loop()
+ deadline = loop.time() + 60
+ while True:
+ message = jeepney.wrappers.unwrap_msg(
+ await asyncio.wait_for(queue.get(), deadline - loop.time())
+ )
+ if message[1] != scope_job:
+ continue
+ if message[3] != "done":
+ raise OSError("StartTransientUnit failed: " + message[3])
+ return