summaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
Diffstat (limited to 'examples')
-rwxr-xr-xexamples/cgroup.py203
1 files changed, 11 insertions, 192 deletions
diff --git a/examples/cgroup.py b/examples/cgroup.py
index 90978dc..28a1f7c 100755
--- a/examples/cgroup.py
+++ b/examples/cgroup.py
@@ -7,26 +7,15 @@
"""
import asyncio
-import contextlib
import os
import pathlib
import sys
-import typing
-
-try:
- import jeepney.io.asyncio
-except ImportError:
- jeepney = None
-
-try:
- import ravel
-except ImportError:
- ravel = None
if __file__.split("/")[-2:-1] == ["examples"]:
sys.path.insert(0, "/".join(__file__.split("/")[:-2]))
import linuxnamespaces
+import linuxnamespaces.systemd
def get_cgroup(pid: int = -1) -> pathlib.PurePath:
@@ -40,158 +29,6 @@ def get_cgroup(pid: int = -1) -> pathlib.PurePath:
)
-if jeepney is not None:
- @contextlib.asynccontextmanager
- async def jeepney_listen_signal(
- router: jeepney.io.asyncio.DBusRouter, matchrule: jeepney.MatchRule,
- ) -> jeepney.io.asyncio.FilterHandle:
- """Call AddMatch/RemoveMatch on context entry/exit and give filtered
- queue.
- """
- jeepney.wrappers.unwrap_msg(
- await router.send_and_get_reply(
- jeepney.bus_messages.message_bus.AddMatch(matchrule)
- ),
- )
- try:
- with router.filter(matchrule) as queue:
- yield queue
- finally:
- jeepney.wrappers.unwrap_msg(
- await router.send_and_get_reply(
- jeepney.bus_messages.message_bus.RemoveMatch(matchrule)
- ),
- )
-
-
- async def start_transient_unit_with_jeepney(pid: int) -> None:
- """Call the StartTransientUnit dbus method on the user manager for the
- given pid.
- """
- async with (
- jeepney.io.asyncio.open_dbus_router() as router,
- jeepney_listen_signal(
- router,
- jeepney.MatchRule(
- type="signal",
- interface="org.freedesktop.systemd1.Manager",
- member="JobRemoved",
- path="/org/freedesktop/systemd1",
- ),
- ) as queue,
- ):
- (scope_job,) = jeepney.wrappers.unwrap_msg(
- await router.send_and_get_reply(
- jeepney.new_method_call(
- jeepney.DBusAddress(
- "/org/freedesktop/systemd1",
- bus_name="org.freedesktop.systemd1",
- interface="org.freedesktop.systemd1.Manager",
- ),
- "StartTransientUnit",
- "ssa(sv)a(sa(sv))",
- (
- f"cgroup-{pid}.scope",
- "fail",
- [
- ("PIDs", ("au", [pid])),
- ("Delegate", ("b", True)),
- ],
- [],
- ),
- ),
- ),
- )
- loop = asyncio.get_running_loop()
- deadline = loop.time() + 60
- while True:
- message = jeepney.wrappers.unwrap_msg(
- await asyncio.wait_for(queue.get(), deadline - loop.time())
- )
- if message[1] != scope_job:
- continue
- if message[3] != "done":
- raise OSError("StartTransientUnit failed: " + message[3])
- return
-
-
-if ravel is not None:
- class RavelSystemdJobWaiter:
- """Context manager for waiting for a systemd job to complete.
- Typical usage:
-
- with RavelSystemdJobWaiter(bus) as wait:
- job = create_a_job_on(bus)
- result = await wait(job)
- """
-
- systemd_path = "/org/freedesktop/systemd1"
- systemd_iface = "org.freedesktop.systemd1.Manager"
-
- def __init__(self, bus: ravel.Connection):
- self.bus = bus
- self.jobs_removed: dict[str, str] = {}
- self.target_job: str | None = None
- self.job_done = asyncio.get_running_loop().create_future()
-
- @ravel.signal(name="JobRemoved", in_signature="uoss")
- def _on_job_removed(
- self, _id: int, path: str, _unit: str, result: str
- ) -> None:
- if self.target_job is None:
- self.jobs_removed[path] = result
- elif self.target_job == path:
- self.job_done.set_result(result)
-
- def __enter__(self) -> "RavelSystemdJobWaiter":
- self.bus.listen_signal(
- self.systemd_path,
- False,
- self.systemd_iface,
- "JobRemoved",
- self._on_job_removed,
- )
- return self
-
- async def __call__(self, job: str, timeout: int | float = 60) -> str:
- assert self.target_job is None
- self.target_job = job
- try:
- return self.jobs_removed[job]
- except KeyError:
- return await asyncio.wait_for(self.job_done, timeout)
-
- def __exit__(self, *exc_info: typing.Any) -> None:
- self.bus.unlisten_signal(
- self.systemd_path,
- False,
- self.systemd_iface,
- "JobRemoved",
- self._on_job_removed,
- )
-
-
- async def start_transient_unit_with_ravel(pid: int) -> None:
- """Call the StartTransientUnit dbus method on the user manager for the
- given pid.
- """
- bus = await ravel.session_bus_async()
- with RavelSystemdJobWaiter(bus) as wait:
- scope_job = (
- bus["org.freedesktop.systemd1"]["/org/freedesktop/systemd1"]
- .get_interface("org.freedesktop.systemd1.Manager")
- .StartTransientUnit(
- f"cgroup-{pid}.scope",
- "fail",
- [("PIDs", ("au", [pid])), ("Delegate", ("b", True))],
- [],
- )
- )[0]
- result = await wait(scope_job)
- if result != "done":
- raise OSError("StartTransientUnit failed: " + result)
-
-
def main() -> None:
mycgroup = get_cgroup()
if not os.access(
@@ -201,35 +38,17 @@ def main() -> None:
# For some shells - notably from graphical desktop environments, the
# hierarchy is immediately writeable. For others, we may create a scope
# unit.
- if jeepney is not None:
- asyncio.run(start_transient_unit_with_jeepney(os.getpid()))
- mycgroup = get_cgroup()
- elif ravel is not None:
- asyncio.run(start_transient_unit_with_ravel(os.getpid()))
- mycgroup = get_cgroup()
- else:
- # Re-execute ourselves via systemd-run.
- if (
- mycgroup.name.startswith("run-")
- and mycgroup.name.endswith(".scope")
- ):
- print(
- "Error: We're running in a .scope cgroup, but it is not writeable. Giving up."
- )
- sys.exit(1)
- os.execvp(
- "systemd-run",
- [
- "systemd-run",
- "--user",
- "--scope",
- "--property",
- "Delegate=true",
- *sys.argv,
- ],
+ try:
+ asyncio.run(
+ linuxnamespaces.systemd.start_transient_unit(
+ f"cgroup-{os.getpid()}.scope",
+ properties={"Delegate": True},
+ ),
+ )
+ except NotImplementedError:
+ linuxnamespaces.systemd.reexec_as_transient_unit(
+ properties={"Delegate": True}
)
- print("Error: Failed to re-execute myself inside systemd-run.")
- sys.exit(1)
linuxnamespaces.unshare_user_idmap(
[linuxnamespaces.IDMapping(os.getuid(), os.getuid(), 1)],
[linuxnamespaces.IDMapping(os.getgid(), os.getgid(), 1)],