summaryrefslogtreecommitdiff
path: root/linuxnamespaces
diff options
context:
space:
mode:
Diffstat (limited to 'linuxnamespaces')
-rw-r--r--linuxnamespaces/tarutils.py75
1 files changed, 75 insertions, 0 deletions
diff --git a/linuxnamespaces/tarutils.py b/linuxnamespaces/tarutils.py
index c7a065c..facb537 100644
--- a/linuxnamespaces/tarutils.py
+++ b/linuxnamespaces/tarutils.py
@@ -5,12 +5,17 @@
"""Extensions to the tarfile module.
* ZstdTarFile extends TarFile to deal with zstd-compressed archives.
* get_comptype guesses the compression used for an open TarFile.
+ * XAttrTarFile extends TarFile to map extended attributes to PAX headers.
"""
+import os
import tarfile
import typing
+TarPath = str | bytes | os.PathLike[str] | os.PathLike[bytes]
+
+
class ZstdTarFile(tarfile.TarFile):
"""Subclass of tarfile.TarFile that can read zstd compressed archives."""
@@ -75,3 +80,73 @@ def get_comptype(tarobj: tarfile.TarFile) -> str:
except KeyError:
# pylint: disable=raise-missing-from # no value in chaining
raise ValueError(f"cannot guess comptype for module {compmodule}")
+
+
+class XAttrTarFile(tarfile.TarFile):
+ """A subclass to tarfile.TarFile that adds support for extended attributes
+ via SCHILY.xattr.* PAX headers to extraction and creation of archives. It
+ can be used as a mixin class with others as it does not add any state.
+ """
+
+ def extract(
+ self,
+ member: tarfile.TarInfo | str,
+ path: TarPath = "",
+ set_attrs: bool = True,
+ **kwargs: typing.Any,
+ ) -> None:
+ """Refer to tarfile.TarFile.extract. In addition, SCHILY.xattr.* PAX
+ headers are examined and applied as extended attributes if set_attrs is
+ true-ish.
+ """
+ if not set_attrs:
+ super().extract(member, path, False, **kwargs)
+ return
+
+ # We also need the tarinfo, so mimic the start of the built-in extract.
+ if isinstance(member, str):
+ tarinfo = self.getmember(member)
+ else:
+ tarinfo = member
+
+ super().extract(tarinfo, path, True, **kwargs)
+
+ # mypy is unhappy about the next line, but we have the same code in
+ # TarFile.extract and if it bails here, it also bails there.
+ path = os.path.join(path, tarinfo.name) # type: ignore
+
+ for attr, value in tarinfo.pax_headers.items():
+ if not attr.startswith("SCHILY.xattr."):
+ continue
+ attr = attr.removeprefix("SCHILY.xattr.")
+ os.setxattr(
+ path,
+ attr,
+ value.encode(self.encoding or "utf8", "surrogateescape"),
+ follow_symlinks=False,
+ )
+
+ def gettarinfo(
+ self,
+ name: TarPath | None = None,
+ arcname: str | None = None,
+ fileobj: typing.IO[bytes] | None = None,
+ ) -> tarfile.TarInfo:
+ tarinfo = super().gettarinfo(name, arcname, fileobj)
+ path: int | TarPath
+ if fileobj is not None:
+ path = fileobj.fileno()
+ elif name is not None:
+ path = name
+ else:
+ raise ValueError("gettarinfo requires a name or fileobj")
+ dereference = True if self.dereference is None else self.dereference
+ for attr in os.listxattr(path, follow_symlinks=dereference):
+ key = "SCHILY.xattr." + attr
+ value = os.getxattr(
+ path, attr, follow_symlinks=dereference
+ ).decode(self.encoding or "utf8", "surrogateescape")
+ # TarInfo.pax_headers is designated as (read-only) Mapping, but it
+ # really is a writable dict.
+ tarinfo.pax_headers[key] = value # type: ignore[index]
+ return tarinfo