import tarfile import arpy from debian import deb822 from dedup.compression import decompress from dedup.hashing import hash_file class MultiHash: def __init__(self, *hashes): self.hashes = hashes def update(self, data): for hasher in self.hashes: hasher.update(data) def get_tar_hashes(tar, hash_functions): """Given a TarFile read all regular files and compute all of the given hash functions on each file. @type tar: tarfile.TarFile @param hash_functions: a sequence of parameter-less functions each creating a new hashlib-like object @rtype: gen((str, int, {str: str}} @returns: an iterable of (filename, filesize, hashes) tuples where hashes is a dict mapping hash function names to hash values """ for elem in tar: if not elem.isreg(): # excludes hard links as well continue hasher = MultiHash(*[func() for func in hash_functions]) hash_file(hasher, tar.extractfile(elem)) hashes = {} for hashobj in hasher.hashes: hashvalue = hashobj.hexdigest() if hashvalue: hashes[hashobj.name] = hashvalue yield (elem.name, elem.size, hashes) def opentar(filelike): return tarfile.open(fileobj=filelike, mode="r|", encoding="utf8", errors="surrogateescape") class DebExtractor: "Base class for extracting desired features from a Debian package." def __init__(self): self.arstate = "start" def process(self, filelike): """Process a Debian package. @param filelike: is a file-like object containing the contents of the Debian packge and can be read once without seeks. """ af = arpy.Archive(fileobj=filelike) for member in af: self.handle_ar_member(member) self.handle_ar_end() def handle_ar_member(self, arfiledata: arpy.ArchiveFileData) -> None: """Handle an ar archive member of the Debian package. If you replace this method, you must also replace handle_ar_end and none of the methods handle_debversion, handle_control_tar or handle_data_tar are called. """ name = arfiledata.header.name if self.arstate == "start": if name != b"debian-binary": raise ValueError("debian-binary not found") version = arfiledata.read() self.handle_debversion(version) if not version.startswith(b"2."): raise ValueError("debian version not recognized") self.arstate = "version" elif self.arstate == "version": if name.startswith(b"control.tar"): filelike = decompress(arfiledata, name[11:].decode("ascii")) self.handle_control_tar(opentar(filelike)) self.arstate = "control" elif not name.startswith(b"_"): raise ValueError("unexpected ar member %r" % name) elif self.arstate == "control": if name.startswith(b"data.tar"): filelike = decompress(arfiledata, name[8:].decode("ascii")) self.handle_data_tar(opentar(filelike)) self.arstate = "data" elif not name.startswith(b"_"): raise ValueError("unexpected ar member %r" % name) else: assert self.arstate == "data" def handle_ar_end(self): "Handle the end of the ar archive of the Debian package." if self.arstate != "data": raise ValueError("data.tar not found") def handle_debversion(self, version): """Handle the debian-binary member of the Debian package. @type version: bytes @param version: The full contents of the ar member. """ def handle_control_tar(self, tarfileobj): """Handle the control.tar member of the Debian package. If you replace this method, none of handle_control_member, handle_control_info or handle_control_end are called. @type tarfileobj: tarfile.TarFile @param tarfile: is opened for streaming reads """ controlseen = False for elem in tarfileobj: if elem.isreg(): name = elem.name if name.startswith("./"): name = name[2:] content = tarfileobj.extractfile(elem).read() self.handle_control_member(name, content) if name == "control": self.handle_control_info(deb822.Packages(content)) controlseen = True elif not (elem.isdir() and elem.name == "."): raise ValueError("invalid non-file %r found in control.tar" % elem.name) if not controlseen: raise ValueError("control missing from control.tar") self.handle_control_end() def handle_control_member(self, name, content): """Handle a file member of the control.tar member of the Debian package. @type name: str @param name: is the plain member name @type content: bytes """ def handle_control_info(self, info): """Handle the control member of the control.tar member of the Debian package. @type info: deb822.Packages """ def handle_control_end(self): "Handle the end of the control.tar member of the Debian package." def handle_data_tar(self, tarfileobj): """Handle the data.tar member of the Debian package. @type tarfileobj: tarfile.TarFile @param tarfile: is opened for streaming reads """