summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xexamples/cgroup.py203
-rw-r--r--linuxnamespaces/systemd/__init__.py68
-rw-r--r--linuxnamespaces/systemd/dbussy.py81
-rw-r--r--linuxnamespaces/systemd/jeepney.py77
-rw-r--r--pyproject.toml6
5 files changed, 241 insertions, 194 deletions
diff --git a/examples/cgroup.py b/examples/cgroup.py
index 90978dc..28a1f7c 100755
--- a/examples/cgroup.py
+++ b/examples/cgroup.py
@@ -7,26 +7,15 @@
"""
import asyncio
-import contextlib
import os
import pathlib
import sys
-import typing
-
-try:
- import jeepney.io.asyncio
-except ImportError:
- jeepney = None
-
-try:
- import ravel
-except ImportError:
- ravel = None
if __file__.split("/")[-2:-1] == ["examples"]:
sys.path.insert(0, "/".join(__file__.split("/")[:-2]))
import linuxnamespaces
+import linuxnamespaces.systemd
def get_cgroup(pid: int = -1) -> pathlib.PurePath:
@@ -40,158 +29,6 @@ def get_cgroup(pid: int = -1) -> pathlib.PurePath:
)
-if jeepney is not None:
- @contextlib.asynccontextmanager
- async def jeepney_listen_signal(
- router: jeepney.io.asyncio.DBusRouter, matchrule: jeepney.MatchRule,
- ) -> jeepney.io.asyncio.FilterHandle:
- """Call AddMatch/RemoveMatch on context entry/exit and give filtered
- queue.
- """
- jeepney.wrappers.unwrap_msg(
- await router.send_and_get_reply(
- jeepney.bus_messages.message_bus.AddMatch(matchrule)
- ),
- )
- try:
- with router.filter(matchrule) as queue:
- yield queue
- finally:
- jeepney.wrappers.unwrap_msg(
- await router.send_and_get_reply(
- jeepney.bus_messages.message_bus.RemoveMatch(matchrule)
- ),
- )
-
-
- async def start_transient_unit_with_jeepney(pid: int) -> None:
- """Call the StartTransientUnit dbus method on the user manager for the
- given pid.
- """
- async with (
- jeepney.io.asyncio.open_dbus_router() as router,
- jeepney_listen_signal(
- router,
- jeepney.MatchRule(
- type="signal",
- interface="org.freedesktop.systemd1.Manager",
- member="JobRemoved",
- path="/org/freedesktop/systemd1",
- ),
- ) as queue,
- ):
- (scope_job,) = jeepney.wrappers.unwrap_msg(
- await router.send_and_get_reply(
- jeepney.new_method_call(
- jeepney.DBusAddress(
- "/org/freedesktop/systemd1",
- bus_name="org.freedesktop.systemd1",
- interface="org.freedesktop.systemd1.Manager",
- ),
- "StartTransientUnit",
- "ssa(sv)a(sa(sv))",
- (
- f"cgroup-{pid}.scope",
- "fail",
- [
- ("PIDs", ("au", [pid])),
- ("Delegate", ("b", True)),
- ],
- [],
- ),
- ),
- ),
- )
- loop = asyncio.get_running_loop()
- deadline = loop.time() + 60
- while True:
- message = jeepney.wrappers.unwrap_msg(
- await asyncio.wait_for(queue.get(), deadline - loop.time())
- )
- if message[1] != scope_job:
- continue
- if message[3] != "done":
- raise OSError("StartTransientUnit failed: " + message[3])
- return
-
-
-if ravel is not None:
- class RavelSystemdJobWaiter:
- """Context manager for waiting for a systemd job to complete.
- Typical usage:
-
- with RavelSystemdJobWaiter(bus) as wait:
- job = create_a_job_on(bus)
- result = await wait(job)
- """
-
- systemd_path = "/org/freedesktop/systemd1"
- systemd_iface = "org.freedesktop.systemd1.Manager"
-
- def __init__(self, bus: ravel.Connection):
- self.bus = bus
- self.jobs_removed: dict[str, str] = {}
- self.target_job: str | None = None
- self.job_done = asyncio.get_running_loop().create_future()
-
- @ravel.signal(name="JobRemoved", in_signature="uoss")
- def _on_job_removed(
- self, _id: int, path: str, _unit: str, result: str
- ) -> None:
- if self.target_job is None:
- self.jobs_removed[path] = result
- elif self.target_job == path:
- self.job_done.set_result(result)
-
- def __enter__(self) -> "RavelSystemdJobWaiter":
- self.bus.listen_signal(
- self.systemd_path,
- False,
- self.systemd_iface,
- "JobRemoved",
- self._on_job_removed,
- )
- return self
-
- async def __call__(self, job: str, timeout: int | float = 60) -> str:
- assert self.target_job is None
- self.target_job = job
- try:
- return self.jobs_removed[job]
- except KeyError:
- return await asyncio.wait_for(self.job_done, timeout)
-
- def __exit__(self, *exc_info: typing.Any) -> None:
- self.bus.unlisten_signal(
- self.systemd_path,
- False,
- self.systemd_iface,
- "JobRemoved",
- self._on_job_removed,
- )
-
-
- async def start_transient_unit_with_ravel(pid: int) -> None:
- """Call the StartTransientUnit dbus method on the user manager for the
- given pid.
- """
- bus = await ravel.session_bus_async()
- with RavelSystemdJobWaiter(bus) as wait:
- scope_job = (
- bus["org.freedesktop.systemd1"]["/org/freedesktop/systemd1"]
- .get_interface("org.freedesktop.systemd1.Manager")
- .StartTransientUnit(
- f"cgroup-{pid}.scope",
- "fail",
- [("PIDs", ("au", [pid])), ("Delegate", ("b", True))],
- [],
- )
- )[0]
- result = await wait(scope_job)
- if result != "done":
- raise OSError("StartTransientUnit failed: " + result)
-
-
def main() -> None:
mycgroup = get_cgroup()
if not os.access(
@@ -201,35 +38,17 @@ def main() -> None:
# For some shells - notably from graphical desktop environments, the
# hierarchy is immediately writeable. For others, we may create a scope
# unit.
- if jeepney is not None:
- asyncio.run(start_transient_unit_with_jeepney(os.getpid()))
- mycgroup = get_cgroup()
- elif ravel is not None:
- asyncio.run(start_transient_unit_with_ravel(os.getpid()))
- mycgroup = get_cgroup()
- else:
- # Re-execute ourselves via systemd-run.
- if (
- mycgroup.name.startswith("run-")
- and mycgroup.name.endswith(".scope")
- ):
- print(
- "Error: We're running in a .scope cgroup, but it is not writeable. Giving up."
- )
- sys.exit(1)
- os.execvp(
- "systemd-run",
- [
- "systemd-run",
- "--user",
- "--scope",
- "--property",
- "Delegate=true",
- *sys.argv,
- ],
+ try:
+ asyncio.run(
+ linuxnamespaces.systemd.start_transient_unit(
+ f"cgroup-{os.getpid()}.scope",
+ properties={"Delegate": True},
+ ),
+ )
+ except NotImplementedError:
+ linuxnamespaces.systemd.reexec_as_transient_unit(
+ properties={"Delegate": True}
)
- print("Error: Failed to re-execute myself inside systemd-run.")
- sys.exit(1)
linuxnamespaces.unshare_user_idmap(
[linuxnamespaces.IDMapping(os.getuid(), os.getuid(), 1)],
[linuxnamespaces.IDMapping(os.getgid(), os.getgid(), 1)],
diff --git a/linuxnamespaces/systemd/__init__.py b/linuxnamespaces/systemd/__init__.py
new file mode 100644
index 0000000..d8e7f86
--- /dev/null
+++ b/linuxnamespaces/systemd/__init__.py
@@ -0,0 +1,68 @@
+# Copyright 2024 Helmut Grohne <helmut@subdivi.de>
+# SPDX-License-Identifier: GPL-3
+
+"""Communicate with a systemd instance to create e.g. delegated croups."""
+
+import os
+import sys
+import typing
+
+
+async def start_transient_unit(
+ unitname: str,
+ pids: list[int] | None = None,
+ properties: dict[str, typing.Any] | None = None,
+ dbusdriver: typing.Literal["auto", "jeepney", "dbussy"] = "auto",
+) -> None:
+ """Call the StartTransientUnit dbus method on the user manager."""
+ dbus_properties: list[tuple[str, tuple[str, typing.Any]]] = []
+ if pids is None:
+ pids = [os.getpid()]
+ dbus_properties.append(("PIDs", ("au", pids)))
+ for key, value in ({} if properties is None else properties).items():
+ if isinstance(value, bool):
+ dbus_properties.append((key, ("b", value)))
+ elif isinstance(value, str):
+ dbus_properties.append((key, ("s", value)))
+ else:
+ raise ValueError(
+ f"cannot infer dbus type for property {key} value"
+ )
+ if dbusdriver in ("auto", "jeepney"):
+ try:
+ from .jeepney import start_transient_unit as jeepney_impl
+ except ImportError:
+ pass
+ else:
+ return await jeepney_impl(unitname, dbus_properties)
+ if dbusdriver in ("auto", "dbussy"):
+ try:
+ from .dbussy import start_transient_unit as dbussy_impl
+ except ImportError:
+ pass
+ else:
+ return await dbussy_impl(unitname, dbus_properties)
+ raise NotImplementedError("requested dbusdriver not available")
+
+
+def reexec_as_transient_unit(
+ unitname: str | None = None,
+ properties: dict[str, typing.Any] | None = None,
+ argv: list[str] | None = None,
+) -> typing.NoReturn:
+ """Reexecute the current process via systemd-run thus placing it into a new
+ .scope unit. If no argv is given, sys.argv is used.
+ """
+ execargs = ["systemd-run", "--user", "--scope"]
+ if unitname is not None:
+ execargs.append("--unit=" + unitname)
+ if properties:
+ for key, value in properties.items():
+ if isinstance(value, int):
+ value = str(value)
+ elif not isinstance(value, str):
+ raise ValueError(f"cannot format property {key} value")
+ execargs.append(f"--property={key}={value}")
+ execargs.append("--")
+ execargs.extend(sys.argv if argv is None else argv)
+ os.execvp("systemd-run", execargs)
diff --git a/linuxnamespaces/systemd/dbussy.py b/linuxnamespaces/systemd/dbussy.py
new file mode 100644
index 0000000..77410df
--- /dev/null
+++ b/linuxnamespaces/systemd/dbussy.py
@@ -0,0 +1,81 @@
+# Copyright 2024 Helmut Grohne <helmut@subdivi.de>
+# SPDX-License-Identifier: GPL-3
+
+"""Communicate with a systemd instance via dbus using dbussy."""
+
+import asyncio
+import typing
+
+import ravel
+
+
+class SystemdJobWaiter:
+ """Context manager for waiting for a systemd job to complete.
+ Typical usage:
+
+ with SystemdJobWaiter(bus) as wait:
+ job = create_a_job_on(bus)
+ result = await wait(job)
+ """
+
+ systemd_path = "/org/freedesktop/systemd1"
+ systemd_iface = "org.freedesktop.systemd1.Manager"
+
+ def __init__(self, bus: ravel.Connection):
+ self.bus = bus
+ self.jobs_removed: dict[str, str] = {}
+ self.target_job: str | None = None
+ self.job_done = asyncio.get_running_loop().create_future()
+
+ @ravel.signal(name="JobRemoved", in_signature="uoss")
+ def _on_job_removed(
+ self, _id: int, path: str, _unit: str, result: str
+ ) -> None:
+ if self.target_job is None:
+ self.jobs_removed[path] = result
+ elif self.target_job == path:
+ self.job_done.set_result(result)
+
+ def __enter__(self) -> "SystemdJobWaiter":
+ self.bus.listen_signal(
+ self.systemd_path,
+ False,
+ self.systemd_iface,
+ "JobRemoved",
+ self._on_job_removed,
+ )
+ return self
+
+ async def __call__(self, job: str, timeout: int | float = 60) -> str:
+ assert self.target_job is None
+ self.target_job = job
+ try:
+ return self.jobs_removed[job]
+ except KeyError:
+ return await asyncio.wait_for(self.job_done, timeout)
+
+ def __exit__(self, *exc_info: typing.Any) -> None:
+ self.bus.unlisten_signal(
+ self.systemd_path,
+ False,
+ self.systemd_iface,
+ "JobRemoved",
+ self._on_job_removed,
+ )
+
+
+async def start_transient_unit(
+ unitname: str, properties: list[tuple[str, tuple[str, typing.Any]]]
+) -> None:
+ """Call the StartTransientUnit dbus method on the user manager. The given
+ properties are tuples of property names and dbus-typed values.
+ """
+ bus = await ravel.session_bus_async()
+ with SystemdJobWaiter(bus) as wait:
+ result = await wait(
+ bus["org.freedesktop.systemd1"]["/org/freedesktop/systemd1"]
+ .get_interface("org.freedesktop.systemd1.Manager")
+ .StartTransientUnit(unitname, "fail", properties, [])[0],
+ )
+ if result != "done":
+ raise OSError("StartTransientUnit failed: " + result)
diff --git a/linuxnamespaces/systemd/jeepney.py b/linuxnamespaces/systemd/jeepney.py
new file mode 100644
index 0000000..ef276d3
--- /dev/null
+++ b/linuxnamespaces/systemd/jeepney.py
@@ -0,0 +1,77 @@
+# Copyright 2024 Helmut Grohne <helmut@subdivi.de>
+# SPDX-License-Identifier: GPL-3
+
+"""Communicate with a systemd instance via dbus using jeepney."""
+
+import asyncio
+import contextlib
+import typing
+
+import jeepney.io.asyncio
+
+
+@contextlib.asynccontextmanager
+async def jeepney_listen_signal(
+ router: jeepney.io.asyncio.DBusRouter, matchrule: jeepney.MatchRule,
+) -> jeepney.io.asyncio.FilterHandle:
+ """Call AddMatch/RemoveMatch on context entry/exit and give filtered queue.
+ """
+ jeepney.wrappers.unwrap_msg(
+ await router.send_and_get_reply(
+ jeepney.bus_messages.message_bus.AddMatch(matchrule)
+ ),
+ )
+ try:
+ with router.filter(matchrule) as queue:
+ yield queue
+ finally:
+ jeepney.wrappers.unwrap_msg(
+ await router.send_and_get_reply(
+ jeepney.bus_messages.message_bus.RemoveMatch(matchrule)
+ ),
+ )
+
+
+async def start_transient_unit(
+ unitname: str, properties: list[tuple[str, tuple[str, typing.Any]]]
+) -> None:
+ """Call the StartTransientUnit dbus method on the user manager. The given
+ properties are tuples of property names and dbus-typed values.
+ """
+ async with (
+ jeepney.io.asyncio.open_dbus_router() as router,
+ jeepney_listen_signal(
+ router,
+ jeepney.MatchRule(
+ type="signal",
+ interface="org.freedesktop.systemd1.Manager",
+ member="JobRemoved",
+ path="/org/freedesktop/systemd1",
+ ),
+ ) as queue,
+ ):
+ (scope_job,) = jeepney.wrappers.unwrap_msg(
+ await router.send_and_get_reply(
+ jeepney.new_method_call(
+ jeepney.DBusAddress(
+ "/org/freedesktop/systemd1",
+ bus_name="org.freedesktop.systemd1",
+ interface="org.freedesktop.systemd1.Manager",
+ ),
+ "StartTransientUnit",
+ "ssa(sv)a(sa(sv))",
+ (unitname, "fail", properties, []),
+ ),
+ ),
+ )
+ loop = asyncio.get_running_loop()
+ deadline = loop.time() + 60
+ while True:
+ message = jeepney.wrappers.unwrap_msg(
+ await asyncio.wait_for(queue.get(), deadline - loop.time())
+ )
+ if message[1] != scope_job:
+ continue
+ if message[3] != "done":
+ raise OSError("StartTransientUnit failed: " + message[3])
+ return
diff --git a/pyproject.toml b/pyproject.toml
index b226a33..481593a 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -12,9 +12,11 @@ classifiers = [
requires-python = ">=3.9"
[project.optional-dependencies]
+# linuxnamespaces.systemd needs jeepney or dbussy, not both.
+jeepney = ["jeepney"]
+dbussy = ["dbussy"]
test = ["pytest", "pytest-forked", "pytest-subtests"]
-# We want jeepney or dbussy, not both.
-examples = ["dbussy", "jeepney", "zstandard"]
+examples = ["zstandard"]
[tool.black]
line-length = 79