from debian import deb822 from dedup.arreader import ArReader from dedup.hashing import hash_file def process_control(control_contents): """Parses the contents of a control file from a control.tar of a Debian package and returns a dictionary containing the fields relevant to dedup. @type control_contents: bytes @rtype: {str: object} """ control = deb822.Packages(control_contents) package = control["package"] try: source = control["source"].split()[0] except KeyError: source = package version = control["version"] architecture = control["architecture"] # deb822 currently returns :any dependencies raw. see #670679 depends = set(dep[0]["name"].split(u':', 1)[0] for dep in control.relations.get("depends", ()) if len(dep) == 1) return dict(package=package, source=source, version=version, architecture=architecture, depends=depends) class MultiHash(object): def __init__(self, *hashes): self.hashes = hashes def update(self, data): for hasher in self.hashes: hasher.update(data) def get_tar_hashes(tar, hash_functions): """Given a TarFile read all regular files and compute all of the given hash functions on each file. @type tar: tarfile.TarFile @param hash_functions: a sequence of parameter-less functions each creating a new hashlib-like object @rtype: gen((str, int, {str: str}} @returns: an iterable of (filename, filesize, hashes) tuples where hashes is a dict mapping hash function names to hash values """ for elem in tar: if not elem.isreg(): # excludes hard links as well continue hasher = MultiHash(*[func() for func in hash_functions]) hasher = hash_file(hasher, tar.extractfile(elem)) hashes = {} for hashobj in hasher.hashes: hashvalue = hashobj.hexdigest() if hashvalue: hashes[hashobj.name] = hashvalue yield (elem.name, elem.size, hashes) class DebExtractor(object): "Base class for extracting desired features from a Debian package." def process(self, filelike): """Process a Debian package. @param filelike: is a file-like object containing the contents of the Debian packge and can be read once without seeks. """ af = ArReader(filelike) af.read_magic() while True: try: name = af.read_entry() except EOFError: break else: self.handle_ar_member(name, af) self.handle_ar_end() def handle_ar_member(self, name, filelike): """Handle an ar archive member of the Debian package. @type name: bytes @param name: is the name of the member @param filelike: is a file-like object containing the contents of the member and can be read once without seeks. """ def handle_ar_end(self): "Handle the end of the ar archive of the Debian package."