diff options
Diffstat (limited to 'dedup/debpkg.py')
-rw-r--r-- | dedup/debpkg.py | 61 |
1 files changed, 15 insertions, 46 deletions
diff --git a/dedup/debpkg.py b/dedup/debpkg.py index 3a30b3e..de00e60 100644 --- a/dedup/debpkg.py +++ b/dedup/debpkg.py @@ -1,13 +1,12 @@ -import sys import tarfile +import arpy from debian import deb822 -from dedup.arreader import ArReader from dedup.compression import decompress from dedup.hashing import hash_file -class MultiHash(object): +class MultiHash: def __init__(self, *hashes): self.hashes = hashes @@ -30,7 +29,7 @@ def get_tar_hashes(tar, hash_functions): if not elem.isreg(): # excludes hard links as well continue hasher = MultiHash(*[func() for func in hash_functions]) - hasher = hash_file(hasher, tar.extractfile(elem)) + hash_file(hasher, tar.extractfile(elem)) hashes = {} for hashobj in hasher.hashes: hashvalue = hashobj.hexdigest() @@ -38,32 +37,11 @@ def get_tar_hashes(tar, hash_functions): hashes[hashobj.name] = hashvalue yield (elem.name, elem.size, hashes) -if sys.version_info.major >= 3: - def opentar(filelike): - return tarfile.open(fileobj=filelike, mode="r|", encoding="utf8", - errors="surrogateescape") +def opentar(filelike): + return tarfile.open(fileobj=filelike, mode="r|", encoding="utf8", + errors="surrogateescape") - def decodetarname(name): - """Decoded name of a tarinfo. - @raises UnicodeDecodeError: - """ - try: - name.encode("utf8", "strict") - except UnicodeEncodeError as e: - if e.reason == "surrogates not allowed": - name.encode("utf8", "surrogateescape").decode("utf8", "strict") - return name -else: - def opentar(filelike): - return tarfile.open(fileobj=filelike, mode="r|") - - def decodetarname(name): - """Decoded name of a tarinfo. - @raises UnicodeDecodeError: - """ - return name.decode("utf8") - -class DebExtractor(object): +class DebExtractor: "Base class for extracting desired features from a Debian package." def __init__(self): @@ -74,45 +52,36 @@ class DebExtractor(object): @param filelike: is a file-like object containing the contents of the Debian packge and can be read once without seeks. """ - af = ArReader(filelike) - af.read_magic() - while True: - try: - name = af.read_entry() - except EOFError: - break - else: - self.handle_ar_member(name, af) + af = arpy.Archive(fileobj=filelike) + for member in af: + self.handle_ar_member(member) self.handle_ar_end() - def handle_ar_member(self, name, filelike): + def handle_ar_member(self, arfiledata: arpy.ArchiveFileData) -> None: """Handle an ar archive member of the Debian package. If you replace this method, you must also replace handle_ar_end and none of the methods handle_debversion, handle_control_tar or handle_data_tar are called. - @type name: bytes - @param name: is the name of the member - @param filelike: is a file-like object containing the contents of the - member and can be read once without seeks. """ + name = arfiledata.header.name if self.arstate == "start": if name != b"debian-binary": raise ValueError("debian-binary not found") - version = filelike.read() + version = arfiledata.read() self.handle_debversion(version) if not version.startswith(b"2."): raise ValueError("debian version not recognized") self.arstate = "version" elif self.arstate == "version": if name.startswith(b"control.tar"): - filelike = decompress(filelike, name[11:].decode("ascii")) + filelike = decompress(arfiledata, name[11:].decode("ascii")) self.handle_control_tar(opentar(filelike)) self.arstate = "control" elif not name.startswith(b"_"): raise ValueError("unexpected ar member %r" % name) elif self.arstate == "control": if name.startswith(b"data.tar"): - filelike = decompress(filelike, name[8:].decode("ascii")) + filelike = decompress(arfiledata, name[8:].decode("ascii")) self.handle_data_tar(opentar(filelike)) self.arstate = "data" elif not name.startswith(b"_"): |