diff options
Diffstat (limited to 'multiarchimport.py')
-rwxr-xr-x | multiarchimport.py | 245 |
1 files changed, 245 insertions, 0 deletions
diff --git a/multiarchimport.py b/multiarchimport.py new file mode 100755 index 0000000..7d5439b --- /dev/null +++ b/multiarchimport.py @@ -0,0 +1,245 @@ +#!/usr/bin/python + +import argparse +from contextlib import closing +import hashlib +import logging +import multiprocessing +try: + import queue +except ImportError: + import Queue as queue +import sqlite3 +try: + from urllib.request import urlopen +except ImportError: + from urllib2 import urlopen + +from debian import deb822 +from debian.debian_support import version_compare + +from dedup.debpkg import DebExtractor, decodetarname +from dedup.hashing import hash_file +from dedup.utils import fetchiter, open_compressed_mirror_url + +def release_architectures(mirror): + with closing(urlopen("%s/dists/testing/Release" % mirror)) as f: + return set(deb822.Release(f)["Architectures"].split()) + +def gather_pkgmeta(pkg): + meta = dict(depends=set(), provides=set()) + for key in ("version", "filename"): + meta[key] = pkg[key] + try: + meta["multiarch"] = pkg["multi-arch"] + except KeyError: + pass + try: + meta["source"] = pkg["source"].split(None, 1)[0] + except KeyError: + pass + for kind, relations in pkg.relations.items(): + if kind == "pre-depends": + kind = "depends" + elif kind not in ("provides", "depends"): + continue + for relation in relations: + if kind == "provides" and len(relation) != 1: + raise ValueError("provides with alternatives") + for alternative in relation: + name = alternative["name"] + # deb822 currently returns :any dependencies raw. see #670679 + archqual = alternative.get("archqual") + if ':' in name: + name, archqual = name.split(None, 1)[0].split(u':') + if kind == "provides": + if archqual: + raise ValueError( + "arch qualification not supported on provides") + meta[kind].add(name) + else: + meta[kind].add((name, archqual)) + return meta + +def process_list(mirror, architecture): + pkgs = dict() + url = "%s/dists/sid/main/binary-%s/Packages" % (mirror, architecture) + with closing(open_compressed_mirror_url(url)) as pkglist: + for pkg in deb822.Packages.iter_paragraphs(pkglist): + if pkg["architecture"] != architecture: + continue # e.g. Arch: all + try: + otherver = pkgs[pkg["package"]]["version"] + except KeyError: + pass + else: + if version_compare(otherver, pkg["version"]) > 0: + continue + pkgs[pkg["package"]] = gather_pkgmeta(pkg) + return pkgs + +def process_one_list(item): + mirror, arch = item + return (arch, process_list(mirror, arch)) + +class ProcessingFinished(Exception): + pass + +class MultiarchExtractor(DebExtractor): + def __init__(self): + DebExtractor.__init__(self) + self.architecture = None + self.result = dict(hasscripts=False) + + def handle_control_info(self, info): + self.architecture = info["architecture"] + + def handle_control_member(self, name, content): + if name in ("preinst", "postinst", "prerm", "postrm"): + self.result["hasscripts"] = True + + def handle_control_end(self): + if self.architecture == "all": + raise ProcessingFinished + + def handle_data_tar(self, tarfileobj): + self.result["contents"] = contents = dict() + for elem in tarfileobj: + try: + name = decodetarname(elem.name) + except UnicodeDecodeError: + logging.warning("skipping filename with encoding error %r", + elem.name) + continue # skip files with non-utf8 encoding for now + if elem.isreg(): + hasher = hashlib.sha256() + hash_file(hasher, tarfileobj.extractfile(elem)) + contents[name] = "R" + hasher.hexdigest() + elif elem.issym() or elem.islnk(): + try: + linkname = decodetarname(elem.linkname) + except UnicodeDecodeError: + logging.warning("skipping link with encoding error %r", + elem.linkname) + continue + if elem.issym(): + contents[name] = u"S" + linkname + else: + try: + contents[name] = contents[linkname] + except KeyError: + logging.warning("hard link to non-existent file %r", + linkname) + raise ProcessingFinished + +def process_one_package(item): + pid, url = item + extractor = MultiarchExtractor() + with closing(urlopen(url)) as pkgfile: + try: + extractor.process(pkgfile) + except ProcessingFinished: + pass + return (pid, extractor.result, url) + +def consume_items(dct): + while True: + try: + yield dct.popitem() + except KeyError: + break + +def bounded_imap_unordered(bound, pool, function, iterable): + iterable = iter(iterable) + results = queue.Queue() + outstanding = 0 + while iterable or outstanding: + if iterable: + for elem in iterable: + pool.apply_async(function, (elem,), callback=results.put) + outstanding += 1 + if outstanding >= bound or not results.empty(): + break + else: + iterable = None + if outstanding: + yield results.get() + outstanding -= 1 + +def insert_package(cur, name, arch, meta): + logging.debug("adding %s %s", name, meta["version"]) + cur.execute("INSERT INTO package (name, version, architecture, source, multiarch) VALUES (?, ?, ?, ?, ?);", + (name, meta["version"], arch, meta.get("source", name), + meta.get("multiarch", "no"))) + pid = cur.lastrowid + logging.debug("added %s as %d", name, pid) + if meta["depends"]: + logging.debug("inserting %d dependencies for %s", + len(meta["depends"]), name) + cur.executemany("INSERT INTO depends (pid, dependee, archqual) VALUES (?, ?, ?);", + ((pid, dependee, archqual) + for dependee, archqual in meta["depends"])) + if meta["provides"]: + logging.debug("inserting %d provides for %s", + len(meta["provides"]), name) + cur.executemany("INSERT INTO provides (pid, provided) VALUES (?, ?);", + ((pid, provided) for provided in meta["provides"])) + return pid + +def main(): + logging.basicConfig(level=logging.INFO) + parser = argparse.ArgumentParser() + parser.add_argument("-d", "--database", action="store", + default="test.sqlite3", + help="path to the sqlite3 database file") + parser.add_argument("-m", "--mirror", action="store", + default="http://httpredir.debian.org/debian", + help="Debian mirror to use") + args = parser.parse_args() + architectures = release_architectures(args.mirror) + architectures.add("all") + workers = multiprocessing.cpu_count() + pool = multiprocessing.Pool(workers) + db = sqlite3.connect(args.database) + cur = db.cursor() + cur.execute("PRAGMA foreign_keys = ON;") + todo = {} + for arch, pkgs in bounded_imap_unordered(3, pool, process_one_list, + ((args.mirror, arch) + for arch in architectures)): + logging.info("found %d Packages for architecture %s", len(pkgs), arch) + cur.execute("SELECT name, id, version FROM package WHERE architecture = ?;", + (arch,)) + remove = set() + for name, pid, version in fetchiter(cur): + if name in pkgs and version == pkgs[name]["version"]: + del pkgs[name] + else: + remove.add(pid) + logging.info("%d Packages are new", len(pkgs)) + logging.info("removing %d old packages", len(remove)) + cur.executemany("DELETE FROM package WHERE id = ?;", + ((pid,) for pid in remove)) + del remove + + for name, meta in pkgs.items(): + pid = insert_package(cur, name, arch, meta) + todo[pid] = meta["filename"] + del pkgs + + logging.info("need to fetch %d packages", len(todo)) + for pid, result, url in bounded_imap_unordered(2 * workers, pool, + process_one_package, + ((pid, "%s/%s" % (args.mirror, filename)) + for pid, filename in consume_items(todo))): + logging.info("fetched %d from %s", pid, url) + cur.execute("UPDATE package SET hasscripts = ? WHERE id = ?;", + (result["hasscripts"], pid)) + if "contents" in result: + cur.executemany("INSERT INTO content (pid, filename, hash) VALUES (?, ?, ?);", + ((pid, key, value) + for key, value in result["contents"].items())) + db.commit() + +if __name__ == "__main__": + main() |