import sys import tarfile from debian import deb822 from dedup.arreader import ArReader from dedup.compression import decompress from dedup.hashing import hash_file def process_control(control_contents): """Parses the contents of a control file from a control.tar of a Debian package and returns a dictionary containing the fields relevant to dedup. @type control_contents: bytes @rtype: {str: object} """ control = deb822.Packages(control_contents) package = control["package"] try: source = control["source"].split()[0] except KeyError: source = package version = control["version"] architecture = control["architecture"] # deb822 currently returns :any dependencies raw. see #670679 deprelations = control.relations.get("depends", []) + \ control.relations.get("pre-depends", []) depends = set(dep[0]["name"].split(u':', 1)[0] for dep in deprelations if len(dep) == 1) return dict(package=package, source=source, version=version, architecture=architecture, depends=depends) class MultiHash(object): def __init__(self, *hashes): self.hashes = hashes def update(self, data): for hasher in self.hashes: hasher.update(data) def get_tar_hashes(tar, hash_functions): """Given a TarFile read all regular files and compute all of the given hash functions on each file. @type tar: tarfile.TarFile @param hash_functions: a sequence of parameter-less functions each creating a new hashlib-like object @rtype: gen((str, int, {str: str}} @returns: an iterable of (filename, filesize, hashes) tuples where hashes is a dict mapping hash function names to hash values """ for elem in tar: if not elem.isreg(): # excludes hard links as well continue hasher = MultiHash(*[func() for func in hash_functions]) hasher = hash_file(hasher, tar.extractfile(elem)) hashes = {} for hashobj in hasher.hashes: hashvalue = hashobj.hexdigest() if hashvalue: hashes[hashobj.name] = hashvalue yield (elem.name, elem.size, hashes) if sys.version_info.major >= 3: def opentar(filelike): return tarfile.open(fileobj=filelike, mode="r|", encoding="utf8", errors="surrogateescape") def decodetarname(name): """Decoded name of a tarinfo. @raises UnicodeDecodeError: """ try: name.encode("utf8", "strict") except UnicodeEncodeError as e: if e.reason == "surrogates not allowed": name.encode("utf8", "surrogateescape").decode("utf8", "strict") return name else: def opentar(filelike): return tarfile.open(fileobj=filelike, mode="r|") def decodetarname(name): """Decoded name of a tarinfo. @raises UnicodeDecodeError: """ return name.decode("utf8") class DebExtractor(object): "Base class for extracting desired features from a Debian package." def __init__(self): self.arstate = "start" def process(self, filelike): """Process a Debian package. @param filelike: is a file-like object containing the contents of the Debian packge and can be read once without seeks. """ af = ArReader(filelike) af.read_magic() while True: try: name = af.read_entry() except EOFError: break else: self.handle_ar_member(name, af) self.handle_ar_end() def handle_ar_member(self, name, filelike): """Handle an ar archive member of the Debian package. If you replace this method, you must also replace handle_ar_end and none of the methods handle_debversion, handle_control_tar or handle_data_tar are called. @type name: bytes @param name: is the name of the member @param filelike: is a file-like object containing the contents of the member and can be read once without seeks. """ if self.arstate == "start": if name != b"debian-binary": raise ValueError("debian-binary not found") version = filelike.read() self.handle_debversion(version) if not version.startswith(b"2."): raise ValueError("debian version not recognized") self.arstate = "version" elif self.arstate == "version": if name.startswith(b"control.tar"): filelike = decompress(filelike, name[11:].decode("ascii")) self.handle_control_tar(opentar(filelike)) self.arstate = "control" elif not name.startswith(b"_"): raise ValueError("unexpected ar member %r" % name) elif self.arstate == "control": if name.startswith(b"data.tar"): filelike = decompress(filelike, name[8:].decode("ascii")) self.handle_data_tar(opentar(filelike)) self.arstate = "data" elif not name.startswith(b"_"): raise ValueError("unexpected ar member %r" % name) else: assert self.arstate == "data" def handle_ar_end(self): "Handle the end of the ar archive of the Debian package." if self.arstate != "data": raise ValueError("data.tar not found") def handle_debversion(self, version): """Handle the debian-binary member of the Debian package. @type version: bytes @param version: The full contents of the ar member. """ def handle_control_tar(self, tarfileobj): """Handle the control.tar member of the Debian package. @type tarfileobj: tarfile.TarFile @param tarfile: is opened for streaming reads """ def handle_data_tar(self, tarfileobj): """Handle the data.tar member of the Debian package. @type tarfileobj: tarfile.TarFile @param tarfile: is opened for streaming reads """