#!/usr/bin/python3 # SPDX-License-Identifier: GPL-2.0+ import argparse import collections import contextlib import datetime import functools import hashlib import itertools import lzma import multiprocessing import os.path import sqlite3 import subprocess import tempfile import yaml import apt_pkg apt_pkg.init() version_compare = apt_pkg.version_compare import requests from common import decompress_stream, yield_lines PROFILES = frozenset(("cross", "nocheck")) CPUEntry = collections.namedtuple('CPUEntry', 'debcpu gnucpu regex bits endianness') TupleEntry = collections.namedtuple('TupleEntry', 'abi libc os cpu') class Architectures: @staticmethod def read_table(filename): with open(filename) as f: for line in f: if not line.startswith("#"): yield line.split() def __init__(self, cputable="/usr/share/dpkg/cputable", tupletable="/usr/share/dpkg/tupletable", abitable="/usr/share/dpkg/abitable"): self.cputable = {} self.tupletable = {} self.abitable = {} self.read_cputable(cputable) self.read_tupletable(tupletable) self.read_abitable(abitable) def read_cputable(self, cputable): self.cputable.clear() for values in self.read_table(cputable): values[3] = int(values[3]) # bits entry = CPUEntry(*values) self.cputable[entry.debcpu] = entry def read_tupletable(self, tupletable): self.tupletable.clear() for debtuple, debarch in self.read_table(tupletable): if '' in debtuple: for cpu in self.cputable: entry = TupleEntry(*debtuple.replace("", cpu) .split("-")) self.tupletable[debarch.replace("", cpu)] = entry else: self.tupletable[debarch] = TupleEntry(*debtuple.split("-")) def read_abitable(self, abitable): self.abitable.clear() for arch, bits in self.read_table(abitable): self.abitable[arch] = int(bits) def match(self, arch, pattern): parts = pattern.split("-") if not "any" in parts: return pattern == arch while len(parts) < 4: parts.insert(0, "any") entry = self.tupletable[arch] return all(parts[i] in (entry[i], "any") for i in range(4)) def getendianness(self, arch): return self.cputable[self.tupletable[arch].cpu].endianness architectures = Architectures() arch_match = architectures.match def call_dose_builddebcheck(arguments): """ @type arguments: [str] @param arguments: command line arguments to dose-builddebcheck @returns: an iterable over loaded yaml documents. The first document is the header, all other documents are per-package. @raises subprocess.CalledProcessError: if dose errors out """ cmd = ["dose-builddebcheck"] cmd.extend(arguments) proc = subprocess.Popen(cmd, stdout=subprocess.PIPE) lines = [] for line in proc.stdout: if line.startswith(b' '): lines.append(line) elif line == b' -\n': yield yaml.load(b"".join(lines), Loader=yaml.CBaseLoader) lines = [] proc.stdout.close() if lines: yield yaml.load(b"".join(lines), Loader=yaml.CSafeLoader) if proc.wait() not in (0, 1): raise subprocess.CalledProcessError(proc.returncode, cmd) def parse_deb822(iterable): """Parse an iterable of bytes into an iterable of str-dicts.""" mapping = {} key = None value = None for line in yield_lines(iterable): line = line.decode("utf8") if line == "\n": if key is not None: mapping[key] = value.strip() key = None yield mapping mapping = {} elif key and line.startswith((" ", "\t")): value += line else: if key is not None: mapping[key] = value.strip() try: key, value = line.split(":", 1) except ValueError: raise ValueError("invalid input line %r" % line) if key is not None: mapping[key] = value.strip() if mapping: yield mapping def serialize_deb822(dct): """Serialize a byte-dict into a single bytes object.""" return "".join(map("%s: %s\n".__mod__, dct.items())) + "\n" class HashSumMismatch(Exception): pass def hash_check(iterable, hashobj, expected_digest): """Wraps an iterable that yields bytes. It doesn't modify the sequence, but on the final element it verifies that the concatenation of bytes yields an expected digest value. Upon failure, the final next() results in a HashSumMismatch rather than StopIteration. """ for data in iterable: hashobj.update(data) yield data if hashobj.hexdigest() != expected_digest: raise HashSumMismatch() def parse_date(s): return datetime.datetime.strptime(s, "%a, %d %b %Y %H:%M:%S %Z") class GPGV: def __init__(self, files=("/etc/apt/trusted.gpg",), partsdir="/etc/apt/trusted.gpg.d"): candidates = list(files) candidates.extend(os.path.join(partsdir, e) for e in os.listdir(partsdir)) self.keyrings = list(filter(lambda f: os.access(f, os.R_OK), candidates)) def verify(self, content): cmdline = ["gpgv", "--quiet", "--weak-digest", "SHA1", "--output", "-"] for keyring in self.keyrings: cmdline.extend(("--keyring", keyring)) proc = subprocess.Popen(cmdline, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, _ = proc.communicate(content) if proc.wait() != 0: raise ValueError("signature verififcation failed") return stdout class DebianMirror: hashfunc = "SHA256" def __init__(self, uri, dist="sid"): self.uri = uri self.dist = dist self.releasetime = None self.byhash = None self.files = {} def get_uri(self, filename): return "%s/dists/%s/%s" % (self.uri, self.dist, filename) def fetch_release(self): resp = requests.get(self.get_uri("InRelease")) resp.raise_for_status() return GPGV().verify(resp.content) def parse_release(self, content): info, = list(parse_deb822([content])) self.releasetime = parse_date(info["Date"]) valid_until = parse_date(info["Valid-Until"]) now = datetime.datetime.utcnow() if self.releasetime > now: raise ValueError("release file generated in future") if valid_until < now: raise ValueError("release signature expired") self.byhash = info.pop("Acquire-By-Hash", "no") == "yes" self.files = {} for line in info[self.hashfunc].splitlines(): parts = line.split() if not parts: continue if len(parts) != 3: raise ValueError("invalid %s line %r" % (self.hashfunc, line)) self.files[parts[2]] = parts[0] def update_release(self): self.parse_release(self.fetch_release()) def fetch_list(self, listname): if listname + ".xz" in self.files: listname += ".xz" wrapper = lambda i: decompress_stream(i, lzma.LZMADecompressor()) else: wrapper = lambda i: i hashvalue = self.files[listname] if self.byhash: listname = "%s/by-hash/%s/%s" % (os.path.dirname(listname), self.hashfunc, hashvalue) with contextlib.closing(requests.get(self.get_uri(listname), stream=True)) as resp: resp.raise_for_status() it = resp.iter_content(65536) it = hash_check(it, hashlib.new(self.hashfunc), hashvalue) yield from wrapper(it) def fetch_sources(self, component="main"): return self.fetch_list("%s/source/Sources" % component) def fetch_binaries(self, architecture, component="main"): return self.fetch_list("%s/binary-%s/Packages" % (component, architecture)) binfields = frozenset(( "Architecture", "Breaks", "Conflicts", "Depends", "Essential", "Multi-Arch", "Package", "Pre-Depends", "Provides", "Version", )) srcdepfields = frozenset(( "Build-Conflicts", "Build-Conflicts-Arch", "Build-Depends", "Build-Depends-Arch", )) srcfields = srcdepfields.union(( "Architecture", "Package", "Version", )) bad_foreign_packages = frozenset(( "flex-old", # cannot execute /usr/bin/flex "icmake", # cannot execute /usr/bin/icmake, build system "jam", # cannot execute /usr/bin/jam, build system "libtool-bin", # #836123 "python2.7-minimal", # fails postinst "python3.6-minimal", # fails postinst "python3.7-minimal", # fails postinst "python3.8-minimal", # fails postinst "python3.9-minimal", # fails postinst "python3.10-minimal", # fails postinst "python3.11-minimal", # fails postinst "python3.12-minimal", # fails postinst "swi-prolog-nox", # fails postinst "xrdp", # fails postinst "libgvc6", # fails postinst )) def strip_dict(dct, keepfields): keys = set(dct.keys()) keys.difference_update(keepfields) for k in keys: del dct[k] def latest_versions(pkgs): packages = {} for p in pkgs: name = p["Package"] try: if version_compare(packages[name]["Version"], p["Version"]) > 0: continue except KeyError: pass packages[name] = p return (p for p in packages.values() if "Package" in p and not "Negative-Entry" in p) def make_binary_list_build(mirror, buildarch, hostarch): for p in parse_deb822(mirror.fetch_binaries(buildarch)): if p["Package"].startswith("crossbuild-essential-"): if p["Package"] != "crossbuild-essential-" + hostarch: continue p["Depends"] += ", libc-dev:%s, libstdc++-dev:%s" % \ (hostarch, hostarch) strip_dict(p, binfields) yield p def make_binary_list_host(mirror, hostarch): for p in parse_deb822(mirror.fetch_binaries(hostarch)): if p["Architecture"] == "all": continue if p.get("Multi-Arch") == "foreign": continue if p.get("Essential") == "yes": continue if p["Package"] in bad_foreign_packages: continue strip_dict(p, binfields) yield p def make_binary_list(mirror, buildarch, hostarch): return itertools.chain(make_binary_list_build(mirror, buildarch, hostarch), make_binary_list_host(mirror, hostarch)) def make_source_list(mirror, hostarch): for p in parse_deb822(mirror.fetch_sources()): if p.get("Extra-Source-Only") == "yes": continue if any(arch_match(hostarch, pattern) for pattern in p["Architecture"].split()): strip_dict(p, srcfields) yield p else: # dummy entry preventing older matching versions yield {"Package": p["Package"], "Version": p["Version"], "Negative-Entry": "yes"} def check_bdsat(mirror, buildarch, hostarch): cmd = [ "--deb-native-arch=" + buildarch, "--deb-host-arch=" + hostarch, "--deb-drop-b-d-indep", "--deb-profiles=" + ",".join(PROFILES), "--successes", "--failures", "--explain", "--explain-minimal", "--deb-emulate-sbuild", ] result = {} with tempfile.NamedTemporaryFile("w", encoding="utf8") as bintmp, \ tempfile.NamedTemporaryFile("w", encoding="utf8") as srctmp: for p in make_binary_list(mirror, buildarch, hostarch): bintmp.write(serialize_deb822(p)) bintmp.flush() cmd.append(bintmp.name) for p in latest_versions(make_source_list(mirror, hostarch)): srctmp.write(serialize_deb822(p)) srctmp.flush() cmd.append(srctmp.name) dose_result = call_dose_builddebcheck(cmd) next(dose_result) # skip header for d in dose_result: reason = None if d["status"] != "ok": r = d["reasons"][0] if "missing" in r: reason = "missing %s" % r["missing"]["pkg"]["unsat-dependency"].split()[0].split(":", 1)[0] elif "conflict" in r: r = r["conflict"]["pkg1"]["unsat-conflict"] reason = "skew " if ' (!= ' in r else "conflict " reason += r.split()[0].split(':', 1)[0] else: assert False result[d["package"]] = (d["version"], reason) return result def update_depcheck(mirror, db, updatetime, buildarch, hostarch, state): with contextlib.closing(db.cursor()) as cur: cur.execute("BEGIN;") cur.execute(""" SELECT source, version, satisfiable, reason FROM depstate WHERE buildarch = ? AND hostarch = ?;""", (buildarch, hostarch,)) for source, version, satisfiable, reason in list(cur.fetchall()): if satisfiable == (reason is None) and \ state.get(source) == (version, reason): del state[source] else: cur.execute(""" DELETE FROM depstate WHERE source = ? AND version = ? AND buildarch = ? AND hostarch = ?;""", (source, version, buildarch, hostarch)) cur.executemany(""" INSERT INTO depstate (source, buildarch, hostarch, version, satisfiable, reason) VALUES (?, ?, ?, ?, ?, ?);""", ((source, buildarch, hostarch, version, reason is None, reason) for source, (version, reason) in state.items())) cur.execute(""" UPDATE depcheck SET releasetime = ?, updatetime = ?, giveback = 0 WHERE buildarch = ? AND hostarch = ?""", (mirror.releasetime, updatetime, buildarch, hostarch)) db.commit() def main_docheck(mirror, archpair): return (*archpair, check_bdsat(mirror, *archpair)) class SequentialPool: """Sequential variant of multiprocessing.Pool for debugging.""" def __enter__(self): return self def __exit__(self, *args): pass def close(self): pass def join(self): pass imap_unordered = map def main(): argp = argparse.ArgumentParser() argp.add_argument('-m', '--mirror', default='http://deb.debian.org/debian', help="debian mirror to use") argp.add_argument('-p', '--parallel', action="store_true", help="enable parallel checking") args = argp.parse_args() mirror = DebianMirror(args.mirror) mirror.update_release() db = sqlite3.connect("db") cur = db.cursor() cur.execute(""" SELECT buildarch, hostarch FROM depcheck WHERE releasetime < ?;""", (mirror.releasetime,)) archpairs = set(cur.fetchall()) if not archpairs: return print("checking %s" % ", ".join(sorted(map("%s -> %s".__mod__, archpairs)))) now = datetime.datetime.utcnow().replace(microsecond=0) with multiprocessing.Pool() if args.parallel else SequentialPool() as pool: docheck = functools.partial(main_docheck, mirror) try: for buildarch, hostarch, state in pool.imap_unordered(docheck, archpairs): print("update %s -> %s" % (buildarch, hostarch)) update_depcheck(mirror, db, now, buildarch, hostarch, state) finally: pool.close() pool.join() if __name__ == "__main__": main()