# Copyright 2024 Helmut Grohne # SPDX-License-Identifier: GPL-3 """Describe a location in the filesystem by a combination of a file descriptor and a file name each of which can be optional. Many Linux system calls are able to work with a location described in this way and this module provides support code for doing so. """ import enum import errno import os import os.path import pathlib import stat import typing from .filedescriptor import FileDescriptor AT_FDCWD = FileDescriptor(-100) PathConvertible = typing.Union[str, os.PathLike] class AtFlags(enum.IntFlag): """Linux AT_* flags used with many different syscalls.""" NONE = 0 AT_SYMLINK_NOFOLLOW = 0x100 AT_NO_AUTOMOUNT = 0x800 AT_EMPTY_PATH = 0x1000 _IGNORED_ERRNOS = frozenset((errno.ENOENT, errno.ENOTDIR, errno.ELOOP)) class AtLocation: """Represent a location in the filesystem suitable for use with the at-family of syscalls. If flags has the AT_EMPTY_PATH bit set, the location string must be empty and the file descriptor specifies the filesystem object. Otherwise, the location specifies the filesystem object. If it is relative, it the anchor is the file descriptor or the current working directory if the file descriptor is AT_FDCWD. There are two ways of managing the closing of file descriptors. One is to treat AtLocation as an object that borrows file descriptors but never closes them and taking care of closing them in some other way. The other is using AtLocation as a context manager and thinking of it as owning a file descriptor. Some methods such as walk dictate a particular lifetime management. """ fd: FileDescriptor location: PathConvertible flags: AtFlags def __new__( cls, thing: typing.Union["AtLocation", int, PathConvertible], location: PathConvertible | None = None, flags: AtFlags = AtFlags.NONE, ) -> "AtLocation": """The argument thing can be many different thing. If it is an AtLocation, it is copied and all other arguments must be unset. If it is an integer (e.g. FileDescriptor), it is considered to be a file descriptor and the location must be unset if flags contains AT_EMPTY_PATH. flags are used as is except that AT_EMPTY_PATH is automatically added when given a file descriptor and no location. """ if isinstance(thing, AtLocation): if location is not None or flags != AtFlags.NONE: raise ValueError( "cannot override location or flags for an AtLocation" ) return thing # Don't copy. obj = super(AtLocation, cls).__new__(cls) if isinstance(thing, int): if thing < 0 and thing != AT_FDCWD: raise ValueError("fd cannot be negative") if isinstance(thing, FileDescriptor): obj.fd = thing else: obj.fd = FileDescriptor(thing) if location is None: obj.location = "" obj.flags = flags | AtFlags.AT_EMPTY_PATH elif flags & AtFlags.AT_EMPTY_PATH: raise ValueError( "cannot set AT_EMPTY_PATH with a non-empty location" ) else: obj.location = location obj.flags = flags elif location is not None: raise ValueError("location specified twice") else: obj.fd = AT_FDCWD obj.location = thing obj.flags = flags return obj def close(self) -> None: """Close the underlying file descriptor.""" if self.fd >= 0: self.fd.close() self.fd = AT_FDCWD def as_emptypath(self, inheritable: bool = True) -> "AtLocation": """Return a new AtLocation with flag AT_EMPTY_PATH with a new file descriptor. If self already is an empty path, its fd is duplicated. In all cases, the caller is responsible for closing the result object. """ if self.flags & AtFlags.AT_EMPTY_PATH: newfd = self.fd.dup(inheritable=inheritable) return AtLocation(newfd, flags=self.flags) return AtLocation( self.open(flags=os.O_PATH | (0 if inheritable else os.O_CLOEXEC)) ) def nosymfollow(self) -> "AtLocation": """Return a copy with the AT_SYMLINK_NOFOLLOW set.""" return AtLocation( self.fd, self.location, self.flags | AtFlags.AT_SYMLINK_NOFOLLOW ) def symfollow(self) -> "AtLocation": """Return a copy with AT_SYMLINK_NOFOLLOW cleared.""" return AtLocation( self.fd, self.location, self.flags & ~AtFlags.AT_SYMLINK_NOFOLLOW ) def noautomount(self) -> "AtLocation": """Return a copy with AT_NO_AUTOMOUNT set.""" return AtLocation( self.fd, self.location, self.flags | AtFlags.AT_NO_AUTOMOUNT ) def automount(self) -> "AtLocation": """Return a copy with AT_NO_AUTOMOUNT cleared.""" return AtLocation( self.fd, self.location, self.flags & ~AtFlags.AT_NO_AUTOMOUNT ) def joinpath(self, other: "AtLocationLike") -> "AtLocation": """Combine an AtLocation and a path by doing the equivalent of joining them with a slash as separator. The returned AtLocation borrows its fd if any. """ if isinstance(other, int): # A an fd is considered an absolute AT_EMPTY_PATH path. return AtLocation(other) non_empty_flags = self.flags & ~AtFlags.AT_EMPTY_PATH if isinstance(other, AtLocation): if other.is_absolute(): # Absolute path trumps self. return other if non_empty_flags != other.flags: raise ValueError( "cannot join AtLocations with differing flags" ) other = other.location if not other: return self elif pathlib.Path(other).is_absolute(): return AtLocation(other, flags=non_empty_flags) # other now is a PathConvertible that isn't absolute. if self.flags & AtFlags.AT_EMPTY_PATH or not self.location: return AtLocation(self.fd, other, non_empty_flags) return AtLocation( self.fd, pathlib.Path(self.location).joinpath(other), self.flags ) def __truediv__(self, name: "AtLocationLike") -> "AtLocation": return self.joinpath(name) def fileno(self) -> FileDescriptor: """Return the underlying file descriptor if this is an AT_EMPTY_PATH location and raise a ValueError otherwise. """ if self.flags != AtFlags.AT_EMPTY_PATH: raise ValueError("AtLocation is not simply a file descriptor") assert self.fd >= 0 assert not self.location return self.fd @property def fd_or_none(self) -> FileDescriptor | None: """A variant of the fd attribute that replaces AT_FDCWD with None.""" return None if self.fd == AT_FDCWD else self.fd def access(self, mode: int, *, effective_ids: bool = False) -> bool: """Wrapper for os.access supplying path, dir_fd and follow_symlinks.""" if self.flags == AtFlags.AT_SYMLINK_NOFOLLOW: follow_symlinks = False elif self.flags == AtFlags.NONE: follow_symlinks = True else: raise NotImplementedError( "access on AtLocation only supports flag AT_SYMLINK_NOFOLLOW" ) assert self.location return os.access( self.location, mode, dir_fd=self.fd_or_none, effective_ids=effective_ids, follow_symlinks=follow_symlinks, ) def chdir(self) -> None: """Wrapper for os.chdir or os.fchdir.""" if self.flags == AtFlags.AT_EMPTY_PATH: return os.fchdir(self.fd) if self.flags != AtFlags.NONE: raise NotImplementedError( "chdir on AtLocation only supports flag AT_EMPTY_PATH" ) assert self.location return os.chdir(self.location) def chmod(self, mode: int) -> None: """Wrapper for os.chmod or os.fchmod.""" if self.flags == AtFlags.AT_EMPTY_PATH: return os.fchmod(self.fd, mode) if self.flags == AtFlags.AT_SYMLINK_NOFOLLOW: follow_symlinks = False elif self.flags == AtFlags.NONE: follow_symlinks = True else: raise NotImplementedError( "chmod on AtLocation with unsupported flags" ) assert self.location return os.chmod( self.location, mode, dir_fd=self.fd_or_none, follow_symlinks=follow_symlinks, ) def chown(self, uid: int, gid: int) -> None: """Wrapper for os.chown or os.chown.""" if self.flags == AtFlags.AT_EMPTY_PATH: return os.fchown(self.fd, uid, gid) if self.flags == AtFlags.AT_SYMLINK_NOFOLLOW: follow_symlinks = False elif self.flags == AtFlags.NONE: follow_symlinks = True else: raise NotImplementedError( "chmod on AtLocation with unsupported flags" ) assert self.location return os.chown( self.location, uid, gid, dir_fd=self.fd_or_none, follow_symlinks=follow_symlinks, ) def exists(self) -> bool: """Report whether the location refers to an existing filesystem object. Similar to pathlib.Path.exists. """ try: self.stat() except OSError as err: if err.errno in _IGNORED_ERRNOS: return False raise return True def is_absolute(self) -> bool: """Report whether the location is absolute or not. Not that any location with an a valid filedescriptor is considered absolute as it is not dependent on the working directory. """ return self.fd >= 0 or pathlib.Path(self.location).is_absolute() def is_block_device(self) -> bool: """Report whether the location refers to a block device. Similar to pathlib.Path.is_block_device. """ try: return stat.S_ISBLK(self.stat().st_mode) except OSError as err: if err.errno in _IGNORED_ERRNOS: return False raise def is_char_device(self) -> bool: """Report whether the location refers to a character device. Similar to pathlib.Path.is_char_device. """ try: return stat.S_ISCHR(self.stat().st_mode) except OSError as err: if err.errno in _IGNORED_ERRNOS: return False raise def is_dir(self) -> bool: """Report whether the location refers to a directory. Similar to pathlib.Path.is_dir. """ try: return stat.S_ISDIR(self.stat().st_mode) except OSError as err: if err.errno in _IGNORED_ERRNOS: return False raise def is_fifo(self) -> bool: """Report whether the location refers to a FIFO. Similar to pathlib.Path.is_fifo. """ try: return stat.S_ISFIFO(self.stat().st_mode) except OSError as err: if err.errno in _IGNORED_ERRNOS: return False raise def is_file(self) -> bool: """Report whether the location refers to a regular file. Similar to pathlib.Path.is_file. """ try: return stat.S_ISREG(self.stat().st_mode) except OSError as err: if err.errno in _IGNORED_ERRNOS: return False raise def is_socket(self) -> bool: """Report whether the location refers to a socket. Similar to pathlib.Path.is_socket. """ try: return stat.S_ISSOCK(self.stat().st_mode) except OSError as err: if err.errno in _IGNORED_ERRNOS: return False raise def is_symlink(self) -> bool: """Report whether the location refers to a symbolic link. Similar to pathlib.Path.is_symlink. """ try: return stat.S_ISLNK(self.stat().st_mode) except OSError as err: if err.errno in _IGNORED_ERRNOS: return False raise def link(self, dst: "AtLocationLike") -> None: """Wrapper for os.link supplying src_dir_fd, dst_dir_fd and follow_symlinks. """ if self.flags & AtFlags.AT_NO_AUTOMOUNT != AtFlags.NONE: raise NotImplementedError( "link on AtFlags with unsupported source flags" ) dst = AtLocation(dst) if dst.flags != AtFlags.NONE: raise NotImplementedError( "link on AtFlags with unsupported destination flags" ) os.link( self.location, dst.location, src_dir_fd=self.fd_or_none, dst_dir_fd=dst.fd_or_none, follow_symlinks=( self.flags & AtFlags.AT_SYMLINK_NOFOLLOW != AtFlags.NONE ), ) def mkdir( self, mode: int = 0o777, parents: bool = False, exist_ok: bool = False ) -> None: """Wrapper for os.mkdir supplying path and dir_fd. It also supports the parents and exist_ok arguments from pathlib.Path.mkdir. """ if self.flags != AtFlags.NONE: raise NotImplementedError( "mkdir is not supported for an AtLocation with flags" ) assert self.location try: os.mkdir(self.location, mode, dir_fd=self.fd_or_none) except FileNotFoundError: if not parents: raise parentlocation = os.path.dirname(self.location) if not parentlocation: raise AtLocation(self.fd, parentlocation, self.flags).mkdir( parents=True, exist_ok=True ) self.mkdir(mode, False, exist_ok) except OSError: # Like pathlib, avoid checking EEXISTS as there may be more reasons if not exist_ok or not self.is_dir(): raise def mknod(self, mode: int = 0o600, device: int = 0) -> None: """Wrapper for os.mknod supplying path and dir_fd.""" if self.flags != AtFlags.NONE: raise NotImplementedError( "mknod is not supported for an AtLocation with flags" ) assert self.location os.mknod(self.location, mode, device, dir_fd=self.fd_or_none) def open(self, flags: int, mode: int = 0o777) -> int: """Wrapper for os.open supplying path and dir_fd.""" if self.flags == AtFlags.AT_SYMLINK_NOFOLLOW: flags |= os.O_NOFOLLOW elif self.flags != AtFlags.NONE: raise NotImplementedError( "opening an AtLocation only supports flag AT_SYMLINK_NOFOLLOW" ) assert self.location return os.open(self.location, flags, mode, dir_fd=self.fd_or_none) def readlink(self) -> str: """Wrapper for os.readlink supplying path and dir_fd.""" if self.flags & AtFlags.AT_SYMLINK_NOFOLLOW == AtFlags.NONE: raise NotImplementedError("cannot read a link after following it") if self.flags & ~AtFlags.AT_EMPTY_PATH != AtFlags.AT_SYMLINK_NOFOLLOW: raise NotImplementedError( "readlink on AtLocation only support flag AT_EMPTY_PATH" ) return os.fsdecode( os.readlink(os.fspath(self.location), dir_fd=self.fd_or_none) ) def rename(self, dst: "AtLocationLike") -> None: """Wrapper for os.rename supplying src_dir_fd and dst_dir_fd.""" if self.flags != AtFlags.AT_SYMLINK_NOFOLLOW: raise NotImplementedError( "rename on AtLocation only supports source flag AT_SYMLINK_NOFOLLOW" ) dst = AtLocation(dst) if dst.flags != AtFlags.AT_SYMLINK_NOFOLLOW: raise NotImplementedError( "rename on AtLocation only supports destination flag AT_SYMLINK_NOFOLLOW" ) os.rename( self.location, dst.location, src_dir_fd=self.fd_or_none, dst_dir_fd=dst.fd_or_none, ) def rmdir(self) -> None: """Wrapper for os.rmdir supplying path and dir_fd.""" if self.flags != AtFlags.NONE: raise NotImplementedError( "rmdir is not supported for an AtLocation with flags" ) assert self.location return os.rmdir(self.location, dir_fd=self.fd_or_none) def stat(self) -> os.stat_result: """Wrapper for os.stat supplying dir_fd and follow_symlinks.""" if self.flags == AtFlags.AT_EMPTY_PATH: return os.stat(self.fd) follow_symlinks = True if self.flags == AtFlags.AT_SYMLINK_NOFOLLOW: follow_symlinks = False elif self.flags != AtFlags.NONE: raise NotImplementedError( "stat is not supported for an AtFlags with given flags" ) return os.stat( self.location, dir_fd=self.fd_or_none, follow_symlinks=follow_symlinks, ) def symlink_to(self, linktarget: PathConvertible) -> None: """Create a symlink at self pointing to linktarget. Note that this method has its arguments reversed compared to the usual os.symlink, because the dir_fd is applicable to the second argument there. """ if self.flags != AtFlags.NONE: raise NotImplementedError( "symlink is not supported for an AtLocation with flags" ) assert self.location os.symlink(linktarget, self.location, dir_fd=self.fd_or_none) def unlink(self) -> None: """Wrapper for os.unlink supplying path and dir_fd.""" if self.flags != AtFlags.NONE: raise NotImplementedError( "unlink is not supported for an AtLocation with flags" ) assert self.location return os.unlink(self.location, dir_fd=self.fd_or_none) def walk( self, topdown: bool = True, onerror: typing.Callable[[OSError], typing.Any] | None = None, follow_symlinks: bool = False, ) -> typing.Iterator[ tuple[ "AtLocation", list["AtLocation"], list["AtLocation"], "AtLocation" ] ]: """Resemble os.fwalk with a few differences. The returned iterator yields the dirpath as an AtLocation that borrows the fd from self. The dirnames and filenames become AtLocations whose location is the entry name and whose fd is temporary. Finally, the dirfd also becomes an AtLocations referencing the same object as the dirpath though as an AT_EMPTY_PATH with temporary fd. """ if self.flags & ~AtFlags.AT_EMPTY_PATH != AtFlags.NONE: raise NotImplementedError( "walk is not supported for an AtLocation with flags" ) flags = AtFlags.AT_SYMLINK_NOFOLLOW if follow_symlinks: flags = AtFlags.NONE for dirpath, dirnames, filenames, dirfd in os.fwalk( "." if self.flags & AtFlags.AT_EMPTY_PATH else self.location, topdown=topdown, onerror=onerror, follow_symlinks=follow_symlinks, dir_fd=self.fd_or_none, ): yield ( AtLocation(self.fd, dirpath, flags), [AtLocation(dirfd, dirname, flags) for dirname in dirnames], [AtLocation(dirfd, filename, flags) for filename in filenames], AtLocation(dirfd), ) def __enter__(self) -> "AtLocation": """When used as a context manager, the associated fd will be closed on scope exit. """ return self def __exit__( self, exc_type: typing.Any, exc_value: typing.Any, traceback: typing.Any, ) -> None: """When used as a context manager, the associated fd will be closed on scope exit. """ self.close() def __fspath__(self) -> str: """Return the underlying location if it uniquely defines this object. Otherwise raise a ValueError. """ if self.fd != AT_FDCWD: raise ValueError( "AtLocation with fd is not convertible to plain path" ) if self.flags != AtFlags.NONE: raise ValueError( "AtLocation with flags is not convertible to plain path" ) return os.fspath(self.location) def __repr__(self) -> str: """Return a textual representation of the AtLocation object.""" cn = self.__class__.__name__ if self.fd < 0: if self.flags == AtFlags.NONE: return f"{cn}({self.location!r})" return f"{cn}({self.location!r}, flags={self.flags!r})" if self.location: if self.flags == AtFlags.NONE: return f"{cn}({self.fd}, {self.location!r})" return f"{cn}({self.fd}, {self.location!r}, {self.flags!r})" if self.flags & ~AtFlags.AT_EMPTY_PATH == AtFlags.NONE: return f"{cn}({self.fd})" return f"{cn}({self.fd}, flags={self.flags!r})" AtLocationLike = typing.Union[AtLocation, int, PathConvertible]