diff options
author | Helmut Grohne <helmut@subdivi.de> | 2024-01-18 22:13:03 +0100 |
---|---|---|
committer | Helmut Grohne <helmut@subdivi.de> | 2024-01-18 22:13:03 +0100 |
commit | 034f732a1af4ce295d993e6951decc4898967dd3 (patch) | |
tree | 5a49cc7b5f5db586ae67f5276071ab525ef832f3 /linuxnamespaces/atlocation.py | |
download | python-linuxnamespaces-034f732a1af4ce295d993e6951decc4898967dd3.tar.gz |
initial checkin
Diffstat (limited to 'linuxnamespaces/atlocation.py')
-rw-r--r-- | linuxnamespaces/atlocation.py | 362 |
1 files changed, 362 insertions, 0 deletions
diff --git a/linuxnamespaces/atlocation.py b/linuxnamespaces/atlocation.py new file mode 100644 index 0000000..2c827a2 --- /dev/null +++ b/linuxnamespaces/atlocation.py @@ -0,0 +1,362 @@ +# Copyright 2024 Helmut Grohne <helmut@subdivi.de> +# SPDX-License-Identifier: GPL-3 + +"""Describe a location in the filesystem by a combination of a file descriptor +and a file name each of which can be optional. Many Linux system calls are able +to work with a location described in this way and this module provides support +code for doing so. +""" + +import enum +import os +import os.path +import pathlib +import typing + + +AT_FDCWD = -100 + + +PathConvertible = typing.Union[bytes, str, os.PathLike] + + +class AtFlags(enum.IntFlag): + """Linux AT_* flags used with many different syscalls.""" + + NONE = 0 + AT_SYMLINK_NOFOLLOW = 0x100 + AT_NO_AUTOMOUNT = 0x800 + AT_EMPTY_PATH = 0x1000 + + +class AtLocation: + """Represent a location in the filesystem suitable for use with the + at-family of syscalls. If flags has the AT_EMPTY_PATH bit set, the + location string must be empty and the file descriptor specifies the + filesystem object. Otherwise, the location specifies the filesystem object. + If it is relative, it the anchor is the file descriptor or the current + working directory if the file descriptor is AT_FDCWD. + """ + + fd: int + location: PathConvertible + flags: AtFlags + + def __new__( + cls, + thing: typing.Union["AtLocation", int, PathConvertible], + location: PathConvertible | None = None, + flags: AtFlags = AtFlags.NONE, + ) -> "AtLocation": + """The argument thing can be many different thing. If it is an + AtLocation, it is copied and all other arguments must be unset. If it + is an integer, it is considered to be a file descriptor and the + location must be unset if flags contains AT_EMPTY_PATH. flags are used + as is except that AT_EMPTY_PATH is automatically added when given a + file descriptor and no location. + """ + if isinstance(thing, AtLocation): + if location is not None or flags != AtFlags.NONE: + raise ValueError( + "cannot override location or flags for an AtLocation" + ) + return thing # Don't copy. + obj = super(AtLocation, cls).__new__(cls) + if isinstance(thing, int): + if thing < 0 and thing != AT_FDCWD: + raise ValueError("fd cannot be negative") + obj.fd = thing + if location is None: + obj.location = "" + obj.flags = flags | AtFlags.AT_EMPTY_PATH + elif flags & AtFlags.AT_EMPTY_PATH: + raise ValueError( + "cannot set AT_EMPTY_PATH with a non-empty location" + ) + else: + obj.location = location + obj.flags = flags + elif location is not None: + raise ValueError("location specified twice") + else: + obj.fd = AT_FDCWD + obj.location = thing + obj.flags = flags + return obj + + def close(self) -> None: + """Close the underlying file descriptor.""" + if self.fd >= 0: + os.close(self.fd) + self.fd = AT_FDCWD + + def nosymfollow(self) -> "AtLocation": + """Return a copy with the AT_SYMLINK_NOFOLLOW set.""" + return AtLocation( + self.fd, self.location, self.flags | AtFlags.AT_SYMLINK_NOFOLLOW + ) + + def symfollow(self) -> "AtLocation": + """Return a copy with AT_SYMLINK_NOFOLLOW cleared.""" + return AtLocation( + self.fd, self.location, self.flags & ~AtFlags.AT_SYMLINK_NOFOLLOW + ) + + def noautomount(self) -> "AtLocation": + """Return a copy with AT_NO_AUTOMOUNT set.""" + return AtLocation( + self.fd, self.location, self.flags | AtFlags.AT_NO_AUTOMOUNT + ) + + def automount(self) -> "AtLocation": + """Return a copy with AT_NO_AUTOMOUNT cleared.""" + return AtLocation( + self.fd, self.location, self.flags & ~AtFlags.AT_NO_AUTOMOUNT + ) + + def joinpath(self, name: PathConvertible) -> "AtLocation": + """Combine an AtLocation and a path by doing the equivalent of joining + them with a slash as separator. + """ + if self.flags & AtFlags.AT_EMPTY_PATH: + return AtLocation( + self.fd, name, self.flags & ~AtFlags.AT_EMPTY_PATH + ) + if not self.location: + return AtLocation(self.fd, name, self.flags) + if isinstance(self.location, bytes) or isinstance(name, bytes): + return AtLocation( + self.fd, + os.path.join(os.fsencode(self.location), os.fsencode(name)), + self.flags, + ) + return AtLocation( + self.fd, pathlib.Path(self.location).joinpath(name), self.flags + ) + + def __truediv__(self, name: PathConvertible) -> "AtLocation": + return self.joinpath(name) + + def fileno(self) -> int: + """Return the underlying file descriptor if this is an AT_EMPTY_PATH + location and raise a ValueError otherwise. + """ + if self.flags != AtFlags.AT_EMPTY_PATH: + raise ValueError("AtLocation is not simply a file descriptor") + assert self.fd >= 0 + assert not self.location + return self.fd + + @property + def fd_or_none(self) -> int | None: + """A variant of the fd attribute that replaces AT_FDCWD with None.""" + return None if self.fd == AT_FDCWD else self.fd + + def access(self, mode: int, *, effective_ids: bool = False) -> bool: + """Wrapper for os.access supplying path, dir_fd and follow_symlinks.""" + if self.flags == AtFlags.AT_SYMLINK_NOFOLLOW: + follow_symlinks = False + elif self.flags == AtFlags.NONE: + follow_symlinks = True + else: + raise NotImplementedError( + "access on AtLocation only supports flag AT_SYMLINK_NOFOLLOW" + ) + assert self.location + return os.access( + self.location, + mode, + dir_fd=self.fd_or_none, + effective_ids=effective_ids, + follow_symlinks=follow_symlinks, + ) + + def chdir(self) -> None: + """Wrapper for os.chdir or os.fchdir.""" + if self.flags == AtFlags.AT_EMPTY_PATH: + return os.fchdir(self.fd) + if self.flags != AtFlags.NONE: + raise NotImplementedError( + "chdir on AtLocation only supports flag AT_EMPTY_PATH" + ) + assert self.location + return os.chdir(self.location) + + def chmod(self, mode: int) -> None: + """Wrapper for os.chmod or os.fchmod.""" + if self.flags == AtFlags.AT_EMPTY_PATH: + return os.fchmod(self.fd, mode) + if self.flags == AtFlags.AT_SYMLINK_NOFOLLOW: + follow_symlinks = False + elif self.flags == AtFlags.NONE: + follow_symlinks = True + else: + raise NotImplementedError( + "chmod on AtLocation with unsupported flags" + ) + assert self.location + return os.chmod( + self.location, + mode, + dir_fd=self.fd_or_none, + follow_symlinks=follow_symlinks, + ) + + def chown(self, uid: int, gid: int) -> None: + """Wrapper for os.chown or os.chown.""" + if self.flags == AtFlags.AT_EMPTY_PATH: + return os.fchown(self.fd, uid, gid) + if self.flags == AtFlags.AT_SYMLINK_NOFOLLOW: + follow_symlinks = False + elif self.flags == AtFlags.NONE: + follow_symlinks = True + else: + raise NotImplementedError( + "chmod on AtLocation with unsupported flags" + ) + assert self.location + return os.chown( + self.location, + uid, + gid, + dir_fd=self.fd_or_none, + follow_symlinks=follow_symlinks, + ) + + def mkdir(self, mode: int = 0o777) -> None: + """Wrapper for os.mkdir supplying path and dir_fd.""" + if self.flags != AtFlags.NONE: + raise NotImplementedError( + "mkdir is not supported for an AtLocation with flags" + ) + assert self.location + os.mkdir(self.location, mode, dir_fd=self.fd_or_none) + + def mknod(self, mode: int = 0o600, device: int = 0) -> None: + """Wrapper for os.mknod supplying path and dir_fd.""" + if self.flags != AtFlags.NONE: + raise NotImplementedError( + "mknod is not supported for an AtLocation with flags" + ) + assert self.location + os.mknod(self.location, mode, device, dir_fd=self.fd_or_none) + + def open(self, flags: int, mode: int = 0o777) -> int: + """Wrapper for os.open supplying path and dir_fd.""" + if self.flags == AtFlags.AT_SYMLINK_NOFOLLOW: + flags |= os.O_NOFOLLOW + elif self.flags != AtFlags.NONE: + raise NotImplementedError( + "opening an AtLocation only supports flag AT_SYMLINK_NOFOLLOW" + ) + assert self.location + return os.open(self.location, flags, mode, dir_fd=self.fd_or_none) + + def readlink(self) -> str: + """Wrapper for os.readlink supplying path and dir_fd.""" + if self.flags & ~AtFlags.AT_EMPTY_PATH != AtFlags.NONE: + raise NotImplementedError( + "readlink on AtLocation only support flag AT_EMPTY_PATH" + ) + return os.fsdecode( + os.readlink(os.fspath(self.location), dir_fd=self.fd_or_none) + ) + + def rmdir(self) -> None: + """Wrapper for os.rmdir suppling path and dir_fd.""" + if self.flags != AtFlags.NONE: + raise NotImplementedError( + "rmdir is not supported for an AtLocation with flags" + ) + assert self.location + return os.rmdir(self.location, dir_fd=self.fd_or_none) + + def symlink(self, linktarget: PathConvertible) -> None: + """Create a symlink at self pointing to linktarget. Note that this + method has its arguments reversed compared to the usual os.symlink, + because the dir_fd is applicable to the second argument there. + """ + if self.flags != AtFlags.NONE: + raise NotImplementedError( + "symlink is not supported for an AtLocation with flags" + ) + assert self.location + os.symlink(linktarget, self.location, dir_fd=self.fd_or_none) + + def unlink(self) -> None: + """Wrapper for os.unlink suppling path and dir_fd.""" + if self.flags != AtFlags.NONE: + raise NotImplementedError( + "unlink is not supported for an AtLocation with flags" + ) + assert self.location + return os.unlink(self.location, dir_fd=self.fd_or_none) + + def walk( + self, + topdown: bool = True, + onerror: typing.Callable[[OSError], typing.Any] | None = None, + follow_symlinks: bool = False, + ) -> typing.Iterator[ + tuple[ + "AtLocation", list["AtLocation"], list["AtLocation"], "AtLocation", + ] + ]: + """Resemble os.fwalk with a few differences. The returned iterator + yields the dirpath as an AtLocation that borrows the fd from self. The + dirnames and filenames become AtLocations whose location is the entry + name and whose fd is temporary. Finally, the dirfd also becomes an + AtLocations referencing the same object as the dirpath though as an + AT_EMPTY_PATH with temporary fd. + """ + if self.flags != AtFlags.NONE: + raise NotImplementedError( + "walk is not supported for an AtLocation with flags" + ) + for dirpath, dirnames, filenames, dirfd in os.fwalk( + self.location, + topdown=topdown, + onerror=onerror, + follow_symlinks=follow_symlinks, + dir_fd=self.fd_or_none, + ): + yield ( + AtLocation(self.fd, dirpath), + [AtLocation(dirfd, dirname) for dirname in dirnames], + [AtLocation(dirfd, filename) for filename in filenames], + AtLocation(dirfd), + ) + + def __enter__(self) -> "AtLocation": + """When used as a context manager, the associated fd will be closed on + scope exit. + """ + return self + + def __exit__( + self, + exc_type: typing.Any, + exc_value: typing.Any, + traceback: typing.Any, + ) -> None: + """When used as a context manager, the associated fd will be closed on + scope exit. + """ + self.close() + + def __fspath__(self) -> str | bytes: + """Return the underlying location if it uniquely defines this object. + Otherwise raise a ValueError. + """ + if self.fd != AT_FDCWD: + raise ValueError( + "AtLocation with fd is not convertible to plain path" + ) + if self.flags != AtFlags.NONE: + raise ValueError( + "AtLocation with flags is not convertible to plain path" + ) + return os.fspath(self.location) + + +AtLocationLike = typing.Union[AtLocation, int, PathConvertible] |