# Copyright 2024 Helmut Grohne # SPDX-License-Identifier: GPL-3 """Communicate with a systemd instance to create e.g. delegated croups.""" import os import sys import typing _DBUS_INTEGER_BOUNDS = ( ("q", 0, 1 << 16), ("n", -(1 << 15), 1 << 15), ("u", 0, 1 << 32), ("i", -(1 << 31), 1 << 31), ("t", 0, 1 << 64), ("x", -(1 << 63), 1 << 63), ) def _guess_dbus_type(value: typing.Any) -> typing.Iterator[str]: """Guess the type of a Python value in dbus. May yield multiple candidates. """ if isinstance(value, bool): yield "b" elif isinstance(value, str): yield "s" elif isinstance(value, int): found = False for guess, low, high in _DBUS_INTEGER_BOUNDS: if low <= value < high: found = True yield guess if not found: raise ValueError("integer out of bounds for dbus") elif isinstance(value, float): yield "d" elif isinstance(value, list): if not value: raise ValueError("cannot guess dbus type for empty list") types = [list(_guess_dbus_type(v)) for v in value] found = False for guess in types[0]: if all(guess in guesses for guesses in types): found = True yield "a" + guess if not found: raise ValueError("could not determine homogeneous type of list") else: raise ValueError("failed to guess dbus type") async def start_transient_unit( unitname: str, pids: list[int] | None = None, properties: dict[str, typing.Any] | None = None, dbusdriver: typing.Literal["auto", "jeepney", "dbussy"] = "auto", ) -> None: """Call the StartTransientUnit dbus method on the user manager.""" dbus_properties: list[tuple[str, tuple[str, typing.Any]]] = [] if pids is None: pids = [os.getpid()] dbus_properties.append(("PIDs", ("au", pids))) for key, value in ({} if properties is None else properties).items(): try: guess = next(_guess_dbus_type(value)) except ValueError as err: raise ValueError( f"cannot infer dbus type for property {key} value" ) from err dbus_properties.append((key, (guess, value))) if dbusdriver in ("auto", "jeepney"): try: from .jeepney import start_transient_unit as jeepney_impl except ImportError: pass else: return await jeepney_impl(unitname, dbus_properties) if dbusdriver in ("auto", "dbussy"): try: from .dbussy import start_transient_unit as dbussy_impl except ImportError: pass else: return await dbussy_impl(unitname, dbus_properties) raise NotImplementedError("requested dbusdriver not available") def reexec_as_transient_unit( unitname: str | None = None, properties: dict[str, typing.Any] | None = None, argv: list[str] | None = None, ) -> typing.NoReturn: """Reexecute the current process via systemd-run thus placing it into a new .scope unit. If no argv is given, sys.argv is used. """ execargs = ["systemd-run", "--user", "--scope"] if unitname is not None: execargs.append("--unit=" + unitname) if properties: for key, value in properties.items(): if isinstance(value, int): value = str(value) elif not isinstance(value, str): raise ValueError(f"cannot format property {key} value") execargs.append(f"--property={key}={value}") execargs.append("--") execargs.extend(sys.argv if argv is None else argv) os.execvp("systemd-run", execargs)