summaryrefslogtreecommitdiff
path: root/linuxnamespaces
diff options
context:
space:
mode:
Diffstat (limited to 'linuxnamespaces')
-rw-r--r--linuxnamespaces/__init__.py1
-rw-r--r--linuxnamespaces/atlocation.py32
-rw-r--r--linuxnamespaces/filedescriptor.py75
3 files changed, 92 insertions, 16 deletions
diff --git a/linuxnamespaces/__init__.py b/linuxnamespaces/__init__.py
index 3302867..b50f113 100644
--- a/linuxnamespaces/__init__.py
+++ b/linuxnamespaces/__init__.py
@@ -16,6 +16,7 @@ import stat
import subprocess
import typing
+from .filedescriptor import FileDescriptor
from .atlocation import *
from .syscalls import *
diff --git a/linuxnamespaces/atlocation.py b/linuxnamespaces/atlocation.py
index 8da5982..8a38650 100644
--- a/linuxnamespaces/atlocation.py
+++ b/linuxnamespaces/atlocation.py
@@ -9,15 +9,16 @@ code for doing so.
import enum
import errno
-import fcntl
import os
import os.path
import pathlib
import stat
import typing
+from .filedescriptor import FileDescriptor
-AT_FDCWD = -100
+
+AT_FDCWD = FileDescriptor(-100)
PathConvertible = typing.Union[str, os.PathLike]
@@ -51,7 +52,7 @@ class AtLocation:
management.
"""
- fd: int
+ fd: FileDescriptor
location: PathConvertible
flags: AtFlags
@@ -63,10 +64,10 @@ class AtLocation:
) -> "AtLocation":
"""The argument thing can be many different thing. If it is an
AtLocation, it is copied and all other arguments must be unset. If it
- is an integer, it is considered to be a file descriptor and the
- location must be unset if flags contains AT_EMPTY_PATH. flags are used
- as is except that AT_EMPTY_PATH is automatically added when given a
- file descriptor and no location.
+ is an integer (e.g. FileDescriptor), it is considered to be a file
+ descriptor and the location must be unset if flags contains
+ AT_EMPTY_PATH. flags are used as is except that AT_EMPTY_PATH is
+ automatically added when given a file descriptor and no location.
"""
if isinstance(thing, AtLocation):
if location is not None or flags != AtFlags.NONE:
@@ -78,7 +79,10 @@ class AtLocation:
if isinstance(thing, int):
if thing < 0 and thing != AT_FDCWD:
raise ValueError("fd cannot be negative")
- obj.fd = thing
+ if isinstance(thing, FileDescriptor):
+ obj.fd = thing
+ else:
+ obj.fd = FileDescriptor(thing)
if location is None:
obj.location = ""
obj.flags = flags | AtFlags.AT_EMPTY_PATH
@@ -100,7 +104,7 @@ class AtLocation:
def close(self) -> None:
"""Close the underlying file descriptor."""
if self.fd >= 0:
- os.close(self.fd)
+ self.fd.close()
self.fd = AT_FDCWD
def as_emptypath(self, inheritable: bool = True) -> "AtLocation":
@@ -109,11 +113,7 @@ class AtLocation:
all cases, the caller is responsible for closing the result object.
"""
if self.flags & AtFlags.AT_EMPTY_PATH:
- newfd = fcntl.fcntl(
- self.fd,
- fcntl.F_DUPFD if inheritable else fcntl.F_DUPFD_CLOEXEC,
- 0,
- )
+ newfd = self.fd.dup(inheritable=inheritable)
return AtLocation(newfd, flags=self.flags)
return AtLocation(
self.open(flags=os.O_PATH | (0 if inheritable else os.O_CLOEXEC))
@@ -175,7 +175,7 @@ class AtLocation:
def __truediv__(self, name: "AtLocationLike") -> "AtLocation":
return self.joinpath(name)
- def fileno(self) -> int:
+ def fileno(self) -> FileDescriptor:
"""Return the underlying file descriptor if this is an AT_EMPTY_PATH
location and raise a ValueError otherwise.
"""
@@ -186,7 +186,7 @@ class AtLocation:
return self.fd
@property
- def fd_or_none(self) -> int | None:
+ def fd_or_none(self) -> FileDescriptor | None:
"""A variant of the fd attribute that replaces AT_FDCWD with None."""
return None if self.fd == AT_FDCWD else self.fd
diff --git a/linuxnamespaces/filedescriptor.py b/linuxnamespaces/filedescriptor.py
new file mode 100644
index 0000000..4395a54
--- /dev/null
+++ b/linuxnamespaces/filedescriptor.py
@@ -0,0 +1,75 @@
+# Copyright 2024 Helmut Grohne <helmut@subdivi.de>
+# SPDX-License-Identifier: GPL-3
+
+"""A type tag for integers that represent file descriptors."""
+
+import fcntl
+import os
+import typing
+
+
+class FileDescriptor(int):
+ """Type tag for integers that represent file descriptors. It also provides
+ a few very generic file descriptor methods.
+ """
+
+ def __enter__(self) -> "FileDescriptor":
+ """When used as a context manager, close the file descriptor on scope
+ exit.
+ """
+ return self
+
+ def __exit__(self, *args: typing.Any) -> None:
+ """When used as a context manager, close the file descriptor on scope
+ exit.
+ """
+ self.close()
+
+ def close(self) -> None:
+ """Close the file descriptor. Since int is immutable, the caller is
+ responsibe for not closing twice.
+ """
+ os.close(self)
+
+ def dup(self, inheritable: bool = True) -> "FileDescriptor":
+ """Return a duplicate of the file descriptor."""
+ if inheritable:
+ return FileDescriptor(os.dup(self))
+ return FileDescriptor(fcntl.fcntl(self, fcntl.F_DUPFD_CLOEXEC, 0))
+
+ def dup2(self, fd2: int, inheritable: bool = True) -> "FileDescriptor":
+ """Duplicate the file to the given file descriptor number."""
+ return FileDescriptor(os.dup2(self, fd2, inheritable))
+
+ def fileno(self) -> int:
+ """Return self such that it satisfies the HasFileno protocol."""
+ return self
+
+ def get_blocking(self) -> bool:
+ """Get the blocking mode of the file descriptor."""
+ return os.get_blocking(self)
+
+ def get_inheritable(self) -> bool:
+ """Get the close-on-exec flag of the file descriptor."""
+ return os.get_inheritable(self)
+
+ @classmethod
+ def pipe(
+ cls, blocking: bool = True, inheritable: bool = True
+ ) -> tuple["FileDescriptor", "FileDescriptor"]:
+ """Create a pipe with flags set atomically. This actually corresponds
+ to the pipe2 syscall, but skipping flags is equivalent to calling pipe.
+ """
+ rfd, wfd = os.pipe2(
+ (0 if blocking else os.O_NONBLOCK)
+ | (0 if inheritable else os.O_CLOEXEC),
+ )
+ return (cls(rfd), cls(wfd))
+
+ def set_blocking(self, blocking: bool) -> None:
+ """Set the blocking mode of the file descriptor."""
+ os.set_blocking(self, blocking)
+
+ def set_inheritable(self, inheritable: bool) -> None:
+ """Set the close-on-exec flag of the file descriptor."""
+ os.set_inheritable(self, inheritable)