#!/usr/bin/python3 """This tool reads a Debian package from stdin and emits a yaml stream on stdout. It does not access a database. Therefore it can be run in parallel and on multiple machines. The generated yaml contains multiple documents. The first document contains package metadata. Then a document is emitted for each file. And finally a document consisting of the string "commit" is emitted.""" import argparse import hashlib import sys import tarfile import urllib.request import zlib import debian.deb822 import yaml from dedup.debpkg import DebExtractor, get_tar_hashes from dedup.hashing import ( DecompressedHash, HashBlacklistContent, HashedStream, HashlibLike, SuppressingHash, ) from dedup.compression import GzipDecompressor from dedup.image import GIFHash, PNGHash boring_content = set((b"", b"\n")) def sha512_nontrivial() -> HashlibLike: return HashBlacklistContent(hashlib.sha512(), boring_content) def gziphash() -> HashlibLike: return HashBlacklistContent( SuppressingHash( DecompressedHash( GzipDecompressor(), hashlib.sha512(), "gzip_sha512" ), (ValueError, zlib.error), ), boring_content, ) def pnghash() -> HashlibLike: return SuppressingHash(PNGHash(hashlib.sha512()), (ValueError,)) def gifhash() -> HashlibLike: return SuppressingHash(GIFHash(hashlib.sha512()), (ValueError,)) class ProcessingFinished(Exception): pass class ImportpkgExtractor(DebExtractor): hash_functions = [sha512_nontrivial, gziphash, pnghash, gifhash] def __init__(self, callback) -> None: DebExtractor.__init__(self) self.callback = callback def handle_control_info(self, info: debian.deb822.Packages) -> None: try: source = info["source"].split()[0] except KeyError: source = info["package"] # deb822 currently returns :any dependencies raw. see #670679 deprelations = info.relations.get("depends", []) + \ info.relations.get("pre-depends", []) depends = set(dep[0]["name"].split(':', 1)[0] for dep in deprelations if len(dep) == 1) self.callback(dict(package=info["package"], source=source, version=info["version"], architecture=info["architecture"], depends=depends)) def handle_data_tar(self, tarfileobj: tarfile.TarFile) -> None: for name, size, hashes in get_tar_hashes(tarfileobj, self.hash_functions): try: name.encode("utf8", "strict") except UnicodeEncodeError: print("warning: skipping filename with encoding error") continue # skip files with non-utf8 encoding for now self.callback(dict(name=name, size=size, hashes=hashes)) raise ProcessingFinished() def main() -> None: parser = argparse.ArgumentParser() parser.add_argument("-H", "--hash", action="store", help="verify that stdin hash given sha256 hash") parser.add_argument("input", nargs='?', default=sys.stdin.buffer, type=urllib.request.urlopen, help="read from this location instead of stdin") args = parser.parse_args() dumper = yaml.CSafeDumper(sys.stdout) dumper.open() if args.hash: args.input = HashedStream(args.input, hashlib.sha256()) try: ImportpkgExtractor(dumper.represent).process(args.input) except ProcessingFinished: pass else: raise RuntimeError("unexpected termination of extractor") if args.hash: args.input.validate(args.hash) dumper.represent("commit") dumper.close() if __name__ == "__main__": main()