summaryrefslogtreecommitdiff
path: root/linuxnamespaces/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'linuxnamespaces/__init__.py')
-rw-r--r--linuxnamespaces/__init__.py223
1 files changed, 1 insertions, 222 deletions
diff --git a/linuxnamespaces/__init__.py b/linuxnamespaces/__init__.py
index 0392276..ab06fb7 100644
--- a/linuxnamespaces/__init__.py
+++ b/linuxnamespaces/__init__.py
@@ -6,9 +6,7 @@ Python.
"""
import asyncio
-import bisect
import contextlib
-import dataclasses
import errno
import fcntl
import os
@@ -16,233 +14,14 @@ import pathlib
import socket
import stat
import struct
-import subprocess
import typing
from .filedescriptor import *
+from .idmap import *
from .atlocation import *
from .syscalls import *
-def subidranges(
- kind: typing.Literal["uid", "gid"], login: str | None = None
-) -> typing.Iterator[tuple[int, int]]:
- """Parse a `/etc/sub?id` file for ranges allocated to the given or current
- user. Return all ranges as (start, count) pairs.
- """
- if login is None:
- login = os.getlogin()
- with open(f"/etc/sub{kind}") as filelike:
- for line in filelike:
- parts = line.strip().split(":")
- if parts[0] == login:
- yield (int(parts[1]), int(parts[2]))
-
-
-@dataclasses.dataclass(frozen=True)
-class IDMapping:
- """Represent one range in a user or group id mapping."""
-
- innerstart: int
- outerstart: int
- count: int
-
- def __post_init__(self) -> None:
- if self.outerstart < 0:
- raise ValueError("outerstart must not be negative")
- if self.innerstart < 0:
- raise ValueError("innerstart must not be negative")
- if self.count <= 0:
- raise ValueError("count must be positive")
- if self.outerstart + self.count >= 1 << 64:
- raise ValueError("outerstart + count exceed 64bits")
- if self.innerstart + self.count >= 1 << 64:
- raise ValueError("innerstart + count exceed 64bits")
-
- @classmethod
- def identity(cls, idn: int, count: int = 1) -> typing.Self:
- """Construct an identity mapping for the given identifier."""
- return cls(idn, idn, count)
-
-
-class IDAllocation:
- """This represents a subset of IDs (user or group). It can be used to
- allocate a contiguous range for use with a user namespace.
- """
-
- def __init__(self) -> None:
- self.ranges: list[tuple[int, int]] = []
-
- def add_range(self, start: int, count: int) -> None:
- """Add count ids starting from start to this allocation."""
- if start < 0 or count <= 0:
- raise ValueError("invalid range")
- index = bisect.bisect_right(self.ranges, (start, 0))
- prevrange = None
- if index > 0:
- prevrange = self.ranges[index - 1]
- if prevrange[0] + prevrange[1] > start:
- raise ValueError("attempt to add overlapping range")
- nextrange = None
- if index < len(self.ranges):
- nextrange = self.ranges[index]
- if nextrange[0] < start + count:
- raise ValueError("attempt to add overlapping range")
- if prevrange and prevrange[0] + prevrange[1] == start:
- if nextrange and nextrange[0] == start + count:
- self.ranges[index - 1] = (
- prevrange[0],
- prevrange[1] + count + nextrange[1],
- )
- del self.ranges[index]
- else:
- self.ranges[index - 1] = (prevrange[0], prevrange[1] + count)
- elif nextrange and nextrange[0] == start + count:
- self.ranges[index] = (start, count + nextrange[1])
- else:
- self.ranges.insert(index, (start, count))
-
- @classmethod
- def loadsubid(
- cls, kind: typing.Literal["uid", "gid"], login: str | None = None,
- ) -> "IDAllocation":
- """Load a `/etc/sub?id` file and return ids allocated to the given
- login or current user.
- """
- self = cls()
- for start, count in subidranges(kind, login):
- self.add_range(start, count)
- return self
-
- def find(self, count: int) -> int:
- """Locate count contiguous ids from this allocation. The start of
- the allocation is returned. The allocation object is left unchanged.
- """
- for start, available in self.ranges:
- if available >= count:
- return start
- raise ValueError("could not satisfy allocation request")
-
- def allocate(self, count: int) -> int:
- """Allocate count contiguous ids from this allocation. The start of
- the allocation is returned and the ids are removed from this
- IDAllocation object.
- """
- for index, (start, available) in enumerate(self.ranges):
- if available > count:
- self.ranges[index] = (start + count, available - count)
- return start
- if available == count:
- del self.ranges[index]
- return start
- raise ValueError("could not satisfy allocation request")
-
- def allocatemap(self, count: int, target: int = 0) -> IDMapping:
- """Allocate count contiguous ids from this allocation. An IDMapping
- with its innerstart set to target is returned. The allocation is
- removed from this IDAllocation object.
- """
- return IDMapping(target, self.allocate(count), count)
-
- def reserve(self, start: int, count: int) -> None:
- """Reserve (and remove) the given range from this allocation. If the
- range is not fully contained in this allocation, a ValueError is
- raised.
- """
- if count < 0:
- raise ValueError("negative count")
- index = bisect.bisect_right(self.ranges, (start, float("inf"))) - 1
- if index < 0:
- raise ValueError("range to reserve not found")
- cur_start, cur_count = self.ranges[index]
- assert cur_start <= start
- if cur_start == start:
- # Requested range starts at range boundary
- if cur_count < count:
- raise ValueError("range to reserve not found")
- if cur_count == count:
- # Requested range matches a range exactly
- del self.ranges[index]
- else:
- # Requested range is a head of the matched range
- self.ranges[index] = (start + count, cur_count - count)
- elif cur_start + cur_count >= start + count:
- # Requested range fits into a matched range
- self.ranges[index] = (cur_start, start - cur_start)
- if cur_start + cur_count > start + count:
- # Requested range punches a hole into a matched range
- self.ranges.insert(
- index + 1,
- (start + count, cur_start + cur_count - (start + count)),
- )
- # else: Requested range is a tail of a matched range
- else:
- raise ValueError("range to reserve not found")
-
-
-def newidmap(
- kind: typing.Literal["uid", "gid"],
- pid: int,
- mapping: list[IDMapping],
- helper: bool | None = None,
-) -> None:
- """Apply the given uid or gid mapping to the given process. A positive pid
- identifies a process, other values identify the currently running process.
- Whether setuid binaries newuidmap and newgidmap are used is determined via
- the helper argument. A None value indicate automatic detection of whether
- a helper is required for setting up the given mapping.
- """
-
- assert kind in ("uid", "gid")
- if pid <= 0:
- pid = os.getpid()
- if helper is None:
- # We cannot reliably test whether we have the right EUID and we don't
- # implement checking whether setgroups has been denied either. Please
- # be explicit about the helper choice in such cases.
- helper = len(mapping) > 1 or mapping[0].count > 1
- if helper:
- argv = [f"new{kind}map", str(pid)]
- for idblock in mapping:
- argv.extend(map(str, dataclasses.astuple(idblock)))
- subprocess.check_call(argv)
- else:
- pathlib.Path(f"/proc/{pid}/{kind}_map").write_text(
- "".join(
- "%d %d %d\n" % dataclasses.astuple(idblock)
- for idblock in mapping
- ),
- encoding="ascii",
- )
-
-
-def newuidmap(pid: int, mapping: list[IDMapping], helper: bool = True) -> None:
- """Apply a given uid mapping to the given process. Refer to newidmap for
- details.
- """
- newidmap("uid", pid, mapping, helper)
-
-
-def newgidmap(pid: int, mapping: list[IDMapping], helper: bool = True) -> None:
- """Apply a given gid mapping to the given process. Refer to newidmap for
- details.
- """
- newidmap("gid", pid, mapping, helper)
-
-
-def newidmaps(
- pid: int,
- uidmapping: list[IDMapping],
- gidmapping: list[IDMapping],
- helper: bool = True,
-) -> None:
- """Apply a given uid and gid mapping to the given process. Refer to
- newidmap for details.
- """
- newgidmap(pid, gidmapping, helper)
- newuidmap(pid, uidmapping, helper)
-
-
class run_in_fork:
"""Decorator for running the decorated function once in a separate process.
"""