summaryrefslogtreecommitdiff
path: root/linuxnamespaces/atlocation.py
diff options
context:
space:
mode:
Diffstat (limited to 'linuxnamespaces/atlocation.py')
-rw-r--r--linuxnamespaces/atlocation.py362
1 files changed, 362 insertions, 0 deletions
diff --git a/linuxnamespaces/atlocation.py b/linuxnamespaces/atlocation.py
new file mode 100644
index 0000000..2c827a2
--- /dev/null
+++ b/linuxnamespaces/atlocation.py
@@ -0,0 +1,362 @@
+# Copyright 2024 Helmut Grohne <helmut@subdivi.de>
+# SPDX-License-Identifier: GPL-3
+
+"""Describe a location in the filesystem by a combination of a file descriptor
+and a file name each of which can be optional. Many Linux system calls are able
+to work with a location described in this way and this module provides support
+code for doing so.
+"""
+
+import enum
+import os
+import os.path
+import pathlib
+import typing
+
+
+AT_FDCWD = -100
+
+
+PathConvertible = typing.Union[bytes, str, os.PathLike]
+
+
+class AtFlags(enum.IntFlag):
+ """Linux AT_* flags used with many different syscalls."""
+
+ NONE = 0
+ AT_SYMLINK_NOFOLLOW = 0x100
+ AT_NO_AUTOMOUNT = 0x800
+ AT_EMPTY_PATH = 0x1000
+
+
+class AtLocation:
+ """Represent a location in the filesystem suitable for use with the
+ at-family of syscalls. If flags has the AT_EMPTY_PATH bit set, the
+ location string must be empty and the file descriptor specifies the
+ filesystem object. Otherwise, the location specifies the filesystem object.
+ If it is relative, it the anchor is the file descriptor or the current
+ working directory if the file descriptor is AT_FDCWD.
+ """
+
+ fd: int
+ location: PathConvertible
+ flags: AtFlags
+
+ def __new__(
+ cls,
+ thing: typing.Union["AtLocation", int, PathConvertible],
+ location: PathConvertible | None = None,
+ flags: AtFlags = AtFlags.NONE,
+ ) -> "AtLocation":
+ """The argument thing can be many different thing. If it is an
+ AtLocation, it is copied and all other arguments must be unset. If it
+ is an integer, it is considered to be a file descriptor and the
+ location must be unset if flags contains AT_EMPTY_PATH. flags are used
+ as is except that AT_EMPTY_PATH is automatically added when given a
+ file descriptor and no location.
+ """
+ if isinstance(thing, AtLocation):
+ if location is not None or flags != AtFlags.NONE:
+ raise ValueError(
+ "cannot override location or flags for an AtLocation"
+ )
+ return thing # Don't copy.
+ obj = super(AtLocation, cls).__new__(cls)
+ if isinstance(thing, int):
+ if thing < 0 and thing != AT_FDCWD:
+ raise ValueError("fd cannot be negative")
+ obj.fd = thing
+ if location is None:
+ obj.location = ""
+ obj.flags = flags | AtFlags.AT_EMPTY_PATH
+ elif flags & AtFlags.AT_EMPTY_PATH:
+ raise ValueError(
+ "cannot set AT_EMPTY_PATH with a non-empty location"
+ )
+ else:
+ obj.location = location
+ obj.flags = flags
+ elif location is not None:
+ raise ValueError("location specified twice")
+ else:
+ obj.fd = AT_FDCWD
+ obj.location = thing
+ obj.flags = flags
+ return obj
+
+ def close(self) -> None:
+ """Close the underlying file descriptor."""
+ if self.fd >= 0:
+ os.close(self.fd)
+ self.fd = AT_FDCWD
+
+ def nosymfollow(self) -> "AtLocation":
+ """Return a copy with the AT_SYMLINK_NOFOLLOW set."""
+ return AtLocation(
+ self.fd, self.location, self.flags | AtFlags.AT_SYMLINK_NOFOLLOW
+ )
+
+ def symfollow(self) -> "AtLocation":
+ """Return a copy with AT_SYMLINK_NOFOLLOW cleared."""
+ return AtLocation(
+ self.fd, self.location, self.flags & ~AtFlags.AT_SYMLINK_NOFOLLOW
+ )
+
+ def noautomount(self) -> "AtLocation":
+ """Return a copy with AT_NO_AUTOMOUNT set."""
+ return AtLocation(
+ self.fd, self.location, self.flags | AtFlags.AT_NO_AUTOMOUNT
+ )
+
+ def automount(self) -> "AtLocation":
+ """Return a copy with AT_NO_AUTOMOUNT cleared."""
+ return AtLocation(
+ self.fd, self.location, self.flags & ~AtFlags.AT_NO_AUTOMOUNT
+ )
+
+ def joinpath(self, name: PathConvertible) -> "AtLocation":
+ """Combine an AtLocation and a path by doing the equivalent of joining
+ them with a slash as separator.
+ """
+ if self.flags & AtFlags.AT_EMPTY_PATH:
+ return AtLocation(
+ self.fd, name, self.flags & ~AtFlags.AT_EMPTY_PATH
+ )
+ if not self.location:
+ return AtLocation(self.fd, name, self.flags)
+ if isinstance(self.location, bytes) or isinstance(name, bytes):
+ return AtLocation(
+ self.fd,
+ os.path.join(os.fsencode(self.location), os.fsencode(name)),
+ self.flags,
+ )
+ return AtLocation(
+ self.fd, pathlib.Path(self.location).joinpath(name), self.flags
+ )
+
+ def __truediv__(self, name: PathConvertible) -> "AtLocation":
+ return self.joinpath(name)
+
+ def fileno(self) -> int:
+ """Return the underlying file descriptor if this is an AT_EMPTY_PATH
+ location and raise a ValueError otherwise.
+ """
+ if self.flags != AtFlags.AT_EMPTY_PATH:
+ raise ValueError("AtLocation is not simply a file descriptor")
+ assert self.fd >= 0
+ assert not self.location
+ return self.fd
+
+ @property
+ def fd_or_none(self) -> int | None:
+ """A variant of the fd attribute that replaces AT_FDCWD with None."""
+ return None if self.fd == AT_FDCWD else self.fd
+
+ def access(self, mode: int, *, effective_ids: bool = False) -> bool:
+ """Wrapper for os.access supplying path, dir_fd and follow_symlinks."""
+ if self.flags == AtFlags.AT_SYMLINK_NOFOLLOW:
+ follow_symlinks = False
+ elif self.flags == AtFlags.NONE:
+ follow_symlinks = True
+ else:
+ raise NotImplementedError(
+ "access on AtLocation only supports flag AT_SYMLINK_NOFOLLOW"
+ )
+ assert self.location
+ return os.access(
+ self.location,
+ mode,
+ dir_fd=self.fd_or_none,
+ effective_ids=effective_ids,
+ follow_symlinks=follow_symlinks,
+ )
+
+ def chdir(self) -> None:
+ """Wrapper for os.chdir or os.fchdir."""
+ if self.flags == AtFlags.AT_EMPTY_PATH:
+ return os.fchdir(self.fd)
+ if self.flags != AtFlags.NONE:
+ raise NotImplementedError(
+ "chdir on AtLocation only supports flag AT_EMPTY_PATH"
+ )
+ assert self.location
+ return os.chdir(self.location)
+
+ def chmod(self, mode: int) -> None:
+ """Wrapper for os.chmod or os.fchmod."""
+ if self.flags == AtFlags.AT_EMPTY_PATH:
+ return os.fchmod(self.fd, mode)
+ if self.flags == AtFlags.AT_SYMLINK_NOFOLLOW:
+ follow_symlinks = False
+ elif self.flags == AtFlags.NONE:
+ follow_symlinks = True
+ else:
+ raise NotImplementedError(
+ "chmod on AtLocation with unsupported flags"
+ )
+ assert self.location
+ return os.chmod(
+ self.location,
+ mode,
+ dir_fd=self.fd_or_none,
+ follow_symlinks=follow_symlinks,
+ )
+
+ def chown(self, uid: int, gid: int) -> None:
+ """Wrapper for os.chown or os.chown."""
+ if self.flags == AtFlags.AT_EMPTY_PATH:
+ return os.fchown(self.fd, uid, gid)
+ if self.flags == AtFlags.AT_SYMLINK_NOFOLLOW:
+ follow_symlinks = False
+ elif self.flags == AtFlags.NONE:
+ follow_symlinks = True
+ else:
+ raise NotImplementedError(
+ "chmod on AtLocation with unsupported flags"
+ )
+ assert self.location
+ return os.chown(
+ self.location,
+ uid,
+ gid,
+ dir_fd=self.fd_or_none,
+ follow_symlinks=follow_symlinks,
+ )
+
+ def mkdir(self, mode: int = 0o777) -> None:
+ """Wrapper for os.mkdir supplying path and dir_fd."""
+ if self.flags != AtFlags.NONE:
+ raise NotImplementedError(
+ "mkdir is not supported for an AtLocation with flags"
+ )
+ assert self.location
+ os.mkdir(self.location, mode, dir_fd=self.fd_or_none)
+
+ def mknod(self, mode: int = 0o600, device: int = 0) -> None:
+ """Wrapper for os.mknod supplying path and dir_fd."""
+ if self.flags != AtFlags.NONE:
+ raise NotImplementedError(
+ "mknod is not supported for an AtLocation with flags"
+ )
+ assert self.location
+ os.mknod(self.location, mode, device, dir_fd=self.fd_or_none)
+
+ def open(self, flags: int, mode: int = 0o777) -> int:
+ """Wrapper for os.open supplying path and dir_fd."""
+ if self.flags == AtFlags.AT_SYMLINK_NOFOLLOW:
+ flags |= os.O_NOFOLLOW
+ elif self.flags != AtFlags.NONE:
+ raise NotImplementedError(
+ "opening an AtLocation only supports flag AT_SYMLINK_NOFOLLOW"
+ )
+ assert self.location
+ return os.open(self.location, flags, mode, dir_fd=self.fd_or_none)
+
+ def readlink(self) -> str:
+ """Wrapper for os.readlink supplying path and dir_fd."""
+ if self.flags & ~AtFlags.AT_EMPTY_PATH != AtFlags.NONE:
+ raise NotImplementedError(
+ "readlink on AtLocation only support flag AT_EMPTY_PATH"
+ )
+ return os.fsdecode(
+ os.readlink(os.fspath(self.location), dir_fd=self.fd_or_none)
+ )
+
+ def rmdir(self) -> None:
+ """Wrapper for os.rmdir suppling path and dir_fd."""
+ if self.flags != AtFlags.NONE:
+ raise NotImplementedError(
+ "rmdir is not supported for an AtLocation with flags"
+ )
+ assert self.location
+ return os.rmdir(self.location, dir_fd=self.fd_or_none)
+
+ def symlink(self, linktarget: PathConvertible) -> None:
+ """Create a symlink at self pointing to linktarget. Note that this
+ method has its arguments reversed compared to the usual os.symlink,
+ because the dir_fd is applicable to the second argument there.
+ """
+ if self.flags != AtFlags.NONE:
+ raise NotImplementedError(
+ "symlink is not supported for an AtLocation with flags"
+ )
+ assert self.location
+ os.symlink(linktarget, self.location, dir_fd=self.fd_or_none)
+
+ def unlink(self) -> None:
+ """Wrapper for os.unlink suppling path and dir_fd."""
+ if self.flags != AtFlags.NONE:
+ raise NotImplementedError(
+ "unlink is not supported for an AtLocation with flags"
+ )
+ assert self.location
+ return os.unlink(self.location, dir_fd=self.fd_or_none)
+
+ def walk(
+ self,
+ topdown: bool = True,
+ onerror: typing.Callable[[OSError], typing.Any] | None = None,
+ follow_symlinks: bool = False,
+ ) -> typing.Iterator[
+ tuple[
+ "AtLocation", list["AtLocation"], list["AtLocation"], "AtLocation",
+ ]
+ ]:
+ """Resemble os.fwalk with a few differences. The returned iterator
+ yields the dirpath as an AtLocation that borrows the fd from self. The
+ dirnames and filenames become AtLocations whose location is the entry
+ name and whose fd is temporary. Finally, the dirfd also becomes an
+ AtLocations referencing the same object as the dirpath though as an
+ AT_EMPTY_PATH with temporary fd.
+ """
+ if self.flags != AtFlags.NONE:
+ raise NotImplementedError(
+ "walk is not supported for an AtLocation with flags"
+ )
+ for dirpath, dirnames, filenames, dirfd in os.fwalk(
+ self.location,
+ topdown=topdown,
+ onerror=onerror,
+ follow_symlinks=follow_symlinks,
+ dir_fd=self.fd_or_none,
+ ):
+ yield (
+ AtLocation(self.fd, dirpath),
+ [AtLocation(dirfd, dirname) for dirname in dirnames],
+ [AtLocation(dirfd, filename) for filename in filenames],
+ AtLocation(dirfd),
+ )
+
+ def __enter__(self) -> "AtLocation":
+ """When used as a context manager, the associated fd will be closed on
+ scope exit.
+ """
+ return self
+
+ def __exit__(
+ self,
+ exc_type: typing.Any,
+ exc_value: typing.Any,
+ traceback: typing.Any,
+ ) -> None:
+ """When used as a context manager, the associated fd will be closed on
+ scope exit.
+ """
+ self.close()
+
+ def __fspath__(self) -> str | bytes:
+ """Return the underlying location if it uniquely defines this object.
+ Otherwise raise a ValueError.
+ """
+ if self.fd != AT_FDCWD:
+ raise ValueError(
+ "AtLocation with fd is not convertible to plain path"
+ )
+ if self.flags != AtFlags.NONE:
+ raise ValueError(
+ "AtLocation with flags is not convertible to plain path"
+ )
+ return os.fspath(self.location)
+
+
+AtLocationLike = typing.Union[AtLocation, int, PathConvertible]