diff options
-rw-r--r-- | linuxnamespaces/__init__.py | 223 | ||||
-rw-r--r-- | linuxnamespaces/idmap.py | 232 |
2 files changed, 233 insertions, 222 deletions
diff --git a/linuxnamespaces/__init__.py b/linuxnamespaces/__init__.py index 0392276..ab06fb7 100644 --- a/linuxnamespaces/__init__.py +++ b/linuxnamespaces/__init__.py @@ -6,9 +6,7 @@ Python. """ import asyncio -import bisect import contextlib -import dataclasses import errno import fcntl import os @@ -16,233 +14,14 @@ import pathlib import socket import stat import struct -import subprocess import typing from .filedescriptor import * +from .idmap import * from .atlocation import * from .syscalls import * -def subidranges( - kind: typing.Literal["uid", "gid"], login: str | None = None -) -> typing.Iterator[tuple[int, int]]: - """Parse a `/etc/sub?id` file for ranges allocated to the given or current - user. Return all ranges as (start, count) pairs. - """ - if login is None: - login = os.getlogin() - with open(f"/etc/sub{kind}") as filelike: - for line in filelike: - parts = line.strip().split(":") - if parts[0] == login: - yield (int(parts[1]), int(parts[2])) - - -@dataclasses.dataclass(frozen=True) -class IDMapping: - """Represent one range in a user or group id mapping.""" - - innerstart: int - outerstart: int - count: int - - def __post_init__(self) -> None: - if self.outerstart < 0: - raise ValueError("outerstart must not be negative") - if self.innerstart < 0: - raise ValueError("innerstart must not be negative") - if self.count <= 0: - raise ValueError("count must be positive") - if self.outerstart + self.count >= 1 << 64: - raise ValueError("outerstart + count exceed 64bits") - if self.innerstart + self.count >= 1 << 64: - raise ValueError("innerstart + count exceed 64bits") - - @classmethod - def identity(cls, idn: int, count: int = 1) -> typing.Self: - """Construct an identity mapping for the given identifier.""" - return cls(idn, idn, count) - - -class IDAllocation: - """This represents a subset of IDs (user or group). It can be used to - allocate a contiguous range for use with a user namespace. - """ - - def __init__(self) -> None: - self.ranges: list[tuple[int, int]] = [] - - def add_range(self, start: int, count: int) -> None: - """Add count ids starting from start to this allocation.""" - if start < 0 or count <= 0: - raise ValueError("invalid range") - index = bisect.bisect_right(self.ranges, (start, 0)) - prevrange = None - if index > 0: - prevrange = self.ranges[index - 1] - if prevrange[0] + prevrange[1] > start: - raise ValueError("attempt to add overlapping range") - nextrange = None - if index < len(self.ranges): - nextrange = self.ranges[index] - if nextrange[0] < start + count: - raise ValueError("attempt to add overlapping range") - if prevrange and prevrange[0] + prevrange[1] == start: - if nextrange and nextrange[0] == start + count: - self.ranges[index - 1] = ( - prevrange[0], - prevrange[1] + count + nextrange[1], - ) - del self.ranges[index] - else: - self.ranges[index - 1] = (prevrange[0], prevrange[1] + count) - elif nextrange and nextrange[0] == start + count: - self.ranges[index] = (start, count + nextrange[1]) - else: - self.ranges.insert(index, (start, count)) - - @classmethod - def loadsubid( - cls, kind: typing.Literal["uid", "gid"], login: str | None = None, - ) -> "IDAllocation": - """Load a `/etc/sub?id` file and return ids allocated to the given - login or current user. - """ - self = cls() - for start, count in subidranges(kind, login): - self.add_range(start, count) - return self - - def find(self, count: int) -> int: - """Locate count contiguous ids from this allocation. The start of - the allocation is returned. The allocation object is left unchanged. - """ - for start, available in self.ranges: - if available >= count: - return start - raise ValueError("could not satisfy allocation request") - - def allocate(self, count: int) -> int: - """Allocate count contiguous ids from this allocation. The start of - the allocation is returned and the ids are removed from this - IDAllocation object. - """ - for index, (start, available) in enumerate(self.ranges): - if available > count: - self.ranges[index] = (start + count, available - count) - return start - if available == count: - del self.ranges[index] - return start - raise ValueError("could not satisfy allocation request") - - def allocatemap(self, count: int, target: int = 0) -> IDMapping: - """Allocate count contiguous ids from this allocation. An IDMapping - with its innerstart set to target is returned. The allocation is - removed from this IDAllocation object. - """ - return IDMapping(target, self.allocate(count), count) - - def reserve(self, start: int, count: int) -> None: - """Reserve (and remove) the given range from this allocation. If the - range is not fully contained in this allocation, a ValueError is - raised. - """ - if count < 0: - raise ValueError("negative count") - index = bisect.bisect_right(self.ranges, (start, float("inf"))) - 1 - if index < 0: - raise ValueError("range to reserve not found") - cur_start, cur_count = self.ranges[index] - assert cur_start <= start - if cur_start == start: - # Requested range starts at range boundary - if cur_count < count: - raise ValueError("range to reserve not found") - if cur_count == count: - # Requested range matches a range exactly - del self.ranges[index] - else: - # Requested range is a head of the matched range - self.ranges[index] = (start + count, cur_count - count) - elif cur_start + cur_count >= start + count: - # Requested range fits into a matched range - self.ranges[index] = (cur_start, start - cur_start) - if cur_start + cur_count > start + count: - # Requested range punches a hole into a matched range - self.ranges.insert( - index + 1, - (start + count, cur_start + cur_count - (start + count)), - ) - # else: Requested range is a tail of a matched range - else: - raise ValueError("range to reserve not found") - - -def newidmap( - kind: typing.Literal["uid", "gid"], - pid: int, - mapping: list[IDMapping], - helper: bool | None = None, -) -> None: - """Apply the given uid or gid mapping to the given process. A positive pid - identifies a process, other values identify the currently running process. - Whether setuid binaries newuidmap and newgidmap are used is determined via - the helper argument. A None value indicate automatic detection of whether - a helper is required for setting up the given mapping. - """ - - assert kind in ("uid", "gid") - if pid <= 0: - pid = os.getpid() - if helper is None: - # We cannot reliably test whether we have the right EUID and we don't - # implement checking whether setgroups has been denied either. Please - # be explicit about the helper choice in such cases. - helper = len(mapping) > 1 or mapping[0].count > 1 - if helper: - argv = [f"new{kind}map", str(pid)] - for idblock in mapping: - argv.extend(map(str, dataclasses.astuple(idblock))) - subprocess.check_call(argv) - else: - pathlib.Path(f"/proc/{pid}/{kind}_map").write_text( - "".join( - "%d %d %d\n" % dataclasses.astuple(idblock) - for idblock in mapping - ), - encoding="ascii", - ) - - -def newuidmap(pid: int, mapping: list[IDMapping], helper: bool = True) -> None: - """Apply a given uid mapping to the given process. Refer to newidmap for - details. - """ - newidmap("uid", pid, mapping, helper) - - -def newgidmap(pid: int, mapping: list[IDMapping], helper: bool = True) -> None: - """Apply a given gid mapping to the given process. Refer to newidmap for - details. - """ - newidmap("gid", pid, mapping, helper) - - -def newidmaps( - pid: int, - uidmapping: list[IDMapping], - gidmapping: list[IDMapping], - helper: bool = True, -) -> None: - """Apply a given uid and gid mapping to the given process. Refer to - newidmap for details. - """ - newgidmap(pid, gidmapping, helper) - newuidmap(pid, uidmapping, helper) - - class run_in_fork: """Decorator for running the decorated function once in a separate process. """ diff --git a/linuxnamespaces/idmap.py b/linuxnamespaces/idmap.py new file mode 100644 index 0000000..d75f1a9 --- /dev/null +++ b/linuxnamespaces/idmap.py @@ -0,0 +1,232 @@ +# Copyright 2024-2025 Helmut Grohne <helmut@subdivi.de> +# SPDX-License-Identifier: GPL-3 + +"""Provide functionalit related to mapping user and group ids in a user +namespace. +""" + +import bisect +import dataclasses +import os +import pathlib +import subprocess +import typing + + +def subidranges( + kind: typing.Literal["uid", "gid"], login: str | None = None +) -> typing.Iterator[tuple[int, int]]: + """Parse a `/etc/sub?id` file for ranges allocated to the given or current + user. Return all ranges as (start, count) pairs. + """ + if login is None: + login = os.getlogin() + with open(f"/etc/sub{kind}") as filelike: + for line in filelike: + parts = line.strip().split(":") + if parts[0] == login: + yield (int(parts[1]), int(parts[2])) + + +@dataclasses.dataclass(frozen=True) +class IDMapping: + """Represent one range in a user or group id mapping.""" + + innerstart: int + outerstart: int + count: int + + def __post_init__(self) -> None: + if self.outerstart < 0: + raise ValueError("outerstart must not be negative") + if self.innerstart < 0: + raise ValueError("innerstart must not be negative") + if self.count <= 0: + raise ValueError("count must be positive") + if self.outerstart + self.count >= 1 << 64: + raise ValueError("outerstart + count exceed 64bits") + if self.innerstart + self.count >= 1 << 64: + raise ValueError("innerstart + count exceed 64bits") + + @classmethod + def identity(cls, idn: int, count: int = 1) -> typing.Self: + """Construct an identity mapping for the given identifier.""" + return cls(idn, idn, count) + + +class IDAllocation: + """This represents a subset of IDs (user or group). It can be used to + allocate a contiguous range for use with a user namespace. + """ + + def __init__(self) -> None: + self.ranges: list[tuple[int, int]] = [] + + def add_range(self, start: int, count: int) -> None: + """Add count ids starting from start to this allocation.""" + if start < 0 or count <= 0: + raise ValueError("invalid range") + index = bisect.bisect_right(self.ranges, (start, 0)) + prevrange = None + if index > 0: + prevrange = self.ranges[index - 1] + if prevrange[0] + prevrange[1] > start: + raise ValueError("attempt to add overlapping range") + nextrange = None + if index < len(self.ranges): + nextrange = self.ranges[index] + if nextrange[0] < start + count: + raise ValueError("attempt to add overlapping range") + if prevrange and prevrange[0] + prevrange[1] == start: + if nextrange and nextrange[0] == start + count: + self.ranges[index - 1] = ( + prevrange[0], + prevrange[1] + count + nextrange[1], + ) + del self.ranges[index] + else: + self.ranges[index - 1] = (prevrange[0], prevrange[1] + count) + elif nextrange and nextrange[0] == start + count: + self.ranges[index] = (start, count + nextrange[1]) + else: + self.ranges.insert(index, (start, count)) + + @classmethod + def loadsubid( + cls, kind: typing.Literal["uid", "gid"], login: str | None = None, + ) -> "IDAllocation": + """Load a `/etc/sub?id` file and return ids allocated to the given + login or current user. + """ + self = cls() + for start, count in subidranges(kind, login): + self.add_range(start, count) + return self + + def find(self, count: int) -> int: + """Locate count contiguous ids from this allocation. The start of + the allocation is returned. The allocation object is left unchanged. + """ + for start, available in self.ranges: + if available >= count: + return start + raise ValueError("could not satisfy allocation request") + + def allocate(self, count: int) -> int: + """Allocate count contiguous ids from this allocation. The start of + the allocation is returned and the ids are removed from this + IDAllocation object. + """ + for index, (start, available) in enumerate(self.ranges): + if available > count: + self.ranges[index] = (start + count, available - count) + return start + if available == count: + del self.ranges[index] + return start + raise ValueError("could not satisfy allocation request") + + def allocatemap(self, count: int, target: int = 0) -> IDMapping: + """Allocate count contiguous ids from this allocation. An IDMapping + with its innerstart set to target is returned. The allocation is + removed from this IDAllocation object. + """ + return IDMapping(target, self.allocate(count), count) + + def reserve(self, start: int, count: int) -> None: + """Reserve (and remove) the given range from this allocation. If the + range is not fully contained in this allocation, a ValueError is + raised. + """ + if count < 0: + raise ValueError("negative count") + index = bisect.bisect_right(self.ranges, (start, float("inf"))) - 1 + if index < 0: + raise ValueError("range to reserve not found") + cur_start, cur_count = self.ranges[index] + assert cur_start <= start + if cur_start == start: + # Requested range starts at range boundary + if cur_count < count: + raise ValueError("range to reserve not found") + if cur_count == count: + # Requested range matches a range exactly + del self.ranges[index] + else: + # Requested range is a head of the matched range + self.ranges[index] = (start + count, cur_count - count) + elif cur_start + cur_count >= start + count: + # Requested range fits into a matched range + self.ranges[index] = (cur_start, start - cur_start) + if cur_start + cur_count > start + count: + # Requested range punches a hole into a matched range + self.ranges.insert( + index + 1, + (start + count, cur_start + cur_count - (start + count)), + ) + # else: Requested range is a tail of a matched range + else: + raise ValueError("range to reserve not found") + + +def newidmap( + kind: typing.Literal["uid", "gid"], + pid: int, + mapping: list[IDMapping], + helper: bool | None = None, +) -> None: + """Apply the given uid or gid mapping to the given process. A positive pid + identifies a process, other values identify the currently running process. + Whether setuid binaries newuidmap and newgidmap are used is determined via + the helper argument. A None value indicate automatic detection of whether + a helper is required for setting up the given mapping. + """ + + assert kind in ("uid", "gid") + if pid <= 0: + pid = os.getpid() + if helper is None: + # We cannot reliably test whether we have the right EUID and we don't + # implement checking whether setgroups has been denied either. Please + # be explicit about the helper choice in such cases. + helper = len(mapping) > 1 or mapping[0].count > 1 + if helper: + argv = [f"new{kind}map", str(pid)] + for idblock in mapping: + argv.extend(map(str, dataclasses.astuple(idblock))) + subprocess.check_call(argv) + else: + pathlib.Path(f"/proc/{pid}/{kind}_map").write_text( + "".join( + "%d %d %d\n" % dataclasses.astuple(idblock) + for idblock in mapping + ), + encoding="ascii", + ) + + +def newuidmap(pid: int, mapping: list[IDMapping], helper: bool = True) -> None: + """Apply a given uid mapping to the given process. Refer to newidmap for + details. + """ + newidmap("uid", pid, mapping, helper) + + +def newgidmap(pid: int, mapping: list[IDMapping], helper: bool = True) -> None: + """Apply a given gid mapping to the given process. Refer to newidmap for + details. + """ + newidmap("gid", pid, mapping, helper) + + +def newidmaps( + pid: int, + uidmapping: list[IDMapping], + gidmapping: list[IDMapping], + helper: bool = True, +) -> None: + """Apply a given uid and gid mapping to the given process. Refer to + newidmap for details. + """ + newgidmap(pid, gidmapping, helper) + newuidmap(pid, uidmapping, helper) |