from debian import deb822 from dedup.hashing import hash_file def pkgname_from_dict(entry): """Given an entry dictionary obtained from deb822, return the package name. @rtype: bytes """ name = entry[u"name"] # deb822 currently returns :any dependencies raw. see #670679 name = name.split(u':', 1)[0] return name.encode("ascii") def process_control(control_contents): """Parses the contents of a control file from a control.tar.gz of a Debian package and returns a dictionary containing the fields relevant to dedup. @type control_contents: bytes @rtype: {str: object} """ control = deb822.Packages(control_contents) package = control["package"].encode("ascii") try: source = control["source"].encode("ascii").split()[0] except KeyError: source = package version = control["version"].encode("ascii") architecture = control["architecture"].encode("ascii") depends = set(pkgname_from_dict(dep[0]) for dep in control.relations.get("depends", ()) if len(dep) == 1) conflicts = set(pkgname_from_dict(ent) for group in control.relations.get("conflicts", ()) for ent in group) conflicts.update(set(pkgname_from_dict(ent) for group in control.relations.get("replaces", ()) for ent in group)) provides = set(pkgname_from_dict(ent) for group in control.relations.get("provides", ()) for ent in group) return dict(package=package, source=source, version=version, architecture=architecture, depends=depends, conflicts=conflicts, provides=provides) class MultiHash(object): def __init__(self, *hashes): self.hashes = hashes def update(self, data): for hasher in self.hashes: hasher.update(data) def get_tar_hashes(tar, hash_functions): """Given a TarFile read all regular files and compute all of the given hash functions on each file. @type tar: tarfile.TarFile @param hash_functions: a sequence of parameter-less functions each creating a new hashlib-like object @rtype: gen((str, int, {str: str}} @returns: an iterable of (filename, filesize, hashes) tuples where hashes is a dict mapping hash function names to hash values """ for elem in tar: if not elem.isreg(): # excludes hard links as well continue hasher = MultiHash(*[func() for func in hash_functions]) hasher = hash_file(hasher, tar.extractfile(elem)) hashes = {} for hashobj in hasher.hashes: hashvalue = hashobj.hexdigest() if hashvalue: hashes[hashobj.name] = hashvalue yield (elem.name, elem.size, hashes)