#!/usr/bin/python3 import argparse from contextlib import closing import hashlib import logging import multiprocessing import queue import sqlite3 from urllib.request import urlopen from debian import deb822 from debian.debian_support import version_compare from dedup.debpkg import DebExtractor from dedup.hashing import hash_file from dedup.utils import fetchiter, iterate_packages def release_architectures(mirror): with closing(urlopen("%s/dists/testing/Release" % mirror)) as f: return set(deb822.Release(f)["Architectures"].split()) def gather_pkgmeta(pkg): meta = dict(depends=set(), provides=set()) for key in ("version", "filename"): meta[key] = pkg[key] try: meta["multiarch"] = pkg["multi-arch"] except KeyError: pass try: meta["source"] = pkg["source"].split(None, 1)[0] except KeyError: pass for kind, relations in pkg.relations.items(): if kind == "pre-depends": kind = "depends" elif kind not in ("provides", "depends"): continue for relation in relations: if kind == "provides" and len(relation) != 1: raise ValueError("provides with alternatives") for alternative in relation: name = alternative["name"] # deb822 currently returns :any dependencies raw. see #670679 archqual = alternative.get("archqual") if ':' in name: name, archqual = name.split(None, 1)[0].split(':') if kind == "provides": if archqual: raise ValueError( "arch qualification not supported on provides") meta[kind].add(name) else: meta[kind].add((name, archqual)) return meta def process_list(mirror, architecture): pkgs = dict() for pkg in iterate_packages(mirror, architecture): if pkg["architecture"] != architecture: continue # e.g. Arch: all try: otherver = pkgs[pkg["package"]]["version"] except KeyError: pass else: if version_compare(otherver, pkg["version"]) > 0: continue pkgs[pkg["package"]] = gather_pkgmeta(pkg) return pkgs def process_one_list(item): mirror, arch = item return (arch, process_list(mirror, arch)) class ProcessingFinished(Exception): pass class MultiarchExtractor(DebExtractor): def __init__(self): DebExtractor.__init__(self) self.architecture = None self.result = dict(hasscripts=False) def handle_control_info(self, info): self.architecture = info["architecture"] def handle_control_member(self, name, content): if name in ("preinst", "postinst", "prerm", "postrm"): self.result["hasscripts"] = True def handle_control_end(self): if self.architecture == "all": raise ProcessingFinished def handle_data_tar(self, tarfileobj): self.result["contents"] = contents = dict() for elem in tarfileobj: try: elem.name.encode("utf8", "strict") except UnicodeEncodeError: logging.warning("skipping filename with encoding error %r", elem.name) continue # skip files with non-utf8 encoding for now if elem.isreg(): hasher = hashlib.sha256() hash_file(hasher, tarfileobj.extractfile(elem)) contents[elem.name] = "R" + hasher.hexdigest() elif elem.issym() or elem.islnk(): try: elem.linkname.encode("utf8", "strict") except UnicodeEncodeError: logging.warning("skipping link with encoding error %r", elem.linkname) continue if elem.issym(): contents[elem.name] = "S" + elem.linkname else: try: contents[elem.name] = contents[elem.linkname] except KeyError: logging.warning("hard link to non-existent file %r", elem.linkname) raise ProcessingFinished def process_one_package(item): pid, url = item extractor = MultiarchExtractor() with closing(urlopen(url)) as pkgfile: try: extractor.process(pkgfile) except ProcessingFinished: pass return (pid, extractor.result, url) def consume_items(dct): while True: try: yield dct.popitem() except KeyError: break def bounded_imap_unordered(bound, pool, function, iterable): iterable = iter(iterable) results = queue.Queue() outstanding = 0 def error_callback(exception): logging.exception("worker exception", exc_info=exception) while iterable or outstanding: if iterable: for elem in iterable: pool.apply_async(function, (elem,), callback=results.put, error_callback=error_callback) outstanding += 1 if outstanding >= bound or not results.empty(): break else: iterable = None if outstanding: yield results.get() outstanding -= 1 def insert_package(cur, name, arch, meta): logging.debug("adding %s %s", name, meta["version"]) cur.execute("INSERT INTO package (name, version, architecture, source, multiarch) VALUES (?, ?, ?, ?, ?);", (name, meta["version"], arch, meta.get("source", name), meta.get("multiarch", "no"))) pid = cur.lastrowid logging.debug("added %s as %d", name, pid) if meta["depends"]: logging.debug("inserting %d dependencies for %s", len(meta["depends"]), name) cur.executemany("INSERT INTO depends (pid, dependee, archqual) VALUES (?, ?, ?);", ((pid, dependee, archqual) for dependee, archqual in meta["depends"])) if meta["provides"]: logging.debug("inserting %d provides for %s", len(meta["provides"]), name) cur.executemany("INSERT INTO provides (pid, provided) VALUES (?, ?);", ((pid, provided) for provided in meta["provides"])) return pid def main(): logging.basicConfig(level=logging.INFO) parser = argparse.ArgumentParser() parser.add_argument("-d", "--database", action="store", default="test.sqlite3", help="path to the sqlite3 database file") parser.add_argument("-m", "--mirror", action="store", default="http://httpredir.debian.org/debian", help="Debian mirror to use") args = parser.parse_args() architectures = release_architectures(args.mirror) architectures.add("all") workers = multiprocessing.cpu_count() pool = multiprocessing.Pool(workers) db = sqlite3.connect(args.database) cur = db.cursor() cur.execute("PRAGMA foreign_keys = ON;") todo = {} for arch, pkgs in bounded_imap_unordered(3, pool, process_one_list, ((args.mirror, arch) for arch in architectures)): logging.info("found %d Packages for architecture %s", len(pkgs), arch) cur.execute("SELECT name, id, version FROM package WHERE architecture = ?;", (arch,)) remove = set() for name, pid, version in fetchiter(cur): if name in pkgs and version == pkgs[name]["version"]: del pkgs[name] else: remove.add(pid) logging.info("%d Packages are new", len(pkgs)) logging.info("removing %d old packages", len(remove)) cur.executemany("DELETE FROM package WHERE id = ?;", ((pid,) for pid in remove)) del remove for name, meta in pkgs.items(): pid = insert_package(cur, name, arch, meta) todo[pid] = meta["filename"] del pkgs logging.info("need to fetch %d packages", len(todo)) for pid, result, url in bounded_imap_unordered(2 * workers, pool, process_one_package, ((pid, "%s/%s" % (args.mirror, filename)) for pid, filename in consume_items(todo))): logging.info("fetched %d from %s", pid, url) cur.execute("UPDATE package SET hasscripts = ? WHERE id = ?;", (result["hasscripts"], pid)) if "contents" in result: cur.executemany("INSERT INTO content (pid, filename, hash) VALUES (?, ?, ?);", ((pid, key, value) for key, value in result["contents"].items())) db.commit() if __name__ == "__main__": main()