diff options
author | Helmut Grohne <helmut@subdivi.de> | 2021-04-18 14:42:27 +0200 |
---|---|---|
committer | Helmut Grohne <helmut@subdivi.de> | 2021-04-18 14:42:27 +0200 |
commit | cf999acb17c8123ddee407d0e486ca3b275a5d7c (patch) | |
tree | bfe9307dc9d2dd49fd46111bab0e3fbe324d6687 /mdbp/common.py | |
download | mdbp-cf999acb17c8123ddee407d0e486ca3b275a5d7c.tar.gz |
initial checkin of mdbp
Proof-of-concept status. Some things work.
Diffstat (limited to 'mdbp/common.py')
-rw-r--r-- | mdbp/common.py | 135 |
1 files changed, 135 insertions, 0 deletions
diff --git a/mdbp/common.py b/mdbp/common.py new file mode 100644 index 0000000..d0d2f2d --- /dev/null +++ b/mdbp/common.py @@ -0,0 +1,135 @@ +# SPDX-License-Identifier: MIT +"""Common functions used by multiple backends""" + +from __future__ import annotations +import argparse +import contextlib +import hashlib +import importlib.resources +import json +import pathlib +import tempfile +import typing +import urllib.parse + +import debian.deb822 +import requests + +try: + import jsonschema +except ImportError: + jsonschema = None + +def json_load(filecontextmanager: + typing.ContextManager[typing.IO[typing.AnyStr]]) -> typing.Any: + """Load the json context from a file context manager.""" + with filecontextmanager as fileobj: + return json.load(fileobj) + +JsonObject = typing.Dict[str, typing.Any] + +def buildjson(filename: str) -> JsonObject: + """Type constructor for argparse validating a build json file path and + returning the parsed json object.""" + buildobj = json_load(argparse.FileType("r")(filename)) + if jsonschema: + jsonschema.validate( + buildobj, + json_load( + importlib.resources.open_text("mdbp", "build_schema.json"))) + assert isinstance(buildobj, dict) + return buildobj + +def compute_env(build: JsonObject) -> typing.Dict[str, str]: + """Compute the process environment from the build object.""" + env = dict(PATH="/usr/bin:/bin") + env.update(build.get("environment", {})) + if build.get("options"): + env["DEB_BUILD_OPTIONS"] = " ".join(build["options"]) + return env + +class HashSumMismatch(Exception): + """Raised from `hash_check` when validation fails.""" + +# pylint does not grok from __future__ import annotations yet +# pylint: disable=E1101,W0212 +def hash_check(iterable: typing.Iterable[bytes], hashobj: hashlib._Hash, + expected_digest: str) -> \ + typing.Iterator[bytes]: + """Wraps an iterable that yields bytes. It doesn't modify the sequence, + but on the final element it verifies that the concatenation of bytes + yields an expected digest value. Upon failure, the final next() results in + a HashSumMismatch rather than StopIteration. + """ + for data in iterable: + hashobj.update(data) + yield data + if hashobj.hexdigest() != expected_digest: + raise HashSumMismatch() + +def download(uri: str, checksums: typing.Dict[str, str], + dest: pathlib.Path) -> None: + """Download the given uri and save it as the given dest path provided that + the given checksums match. When checksums do not match, raise a + HashSumMismatch. + """ + with requests.get(uri, stream=True) as resp: + resp.raise_for_status() + iterable = resp.iter_content(None) + for algo, csum in checksums.items(): + iterable = hash_check(iterable, hashlib.new(algo), csum) + try: + with dest.open("wb") as out: + for chunk in iterable: + out.write(chunk) + except HashSumMismatch: + dest.unlink() + raise + +@contextlib.contextmanager +def get_dsc(build: JsonObject) -> typing.Iterator[pathlib.Path]: + """A context manager that provides a path pointing at the .dsc file for the + duration of the context. If the .dsc is supplied as a path, it simply is + returned. If it is supplied as a uri, it and the referred components are + downloaded to a temporary location. + """ + try: + dscpath = build["input"]["dscpath"] + except KeyError: + dscuri = build["input"]["dscuri"] + with tempfile.TemporaryDirectory() as tdirname: + tdir = pathlib.Path(tdirname) + dscpath = tdir / dscuri.split("/")[-1] + download(dscuri, build["input"].get("checksums", {}), dscpath) + files: typing.Dict[str, typing.Dict[str, str]] = {} + with dscpath.open("r") as dscf: + for key, value in debian.deb822.Dsc(dscf).items(): + if key.lower().startswith("checksums-"): + for entry in value: + algo = key[10:].lower() + files.setdefault(entry["name"], dict())[algo] = \ + entry[algo] + for name, checksums in files.items(): + download(urllib.parse.urljoin(dscuri, name), checksums, + tdir / name) + yield dscpath + else: + yield pathlib.Path(dscpath) + +def get_dsc_files(dscpath: pathlib.Path) -> typing.List[pathlib.Path]: + """Get the component names referenced by the .dsc file.""" + with dscpath.open("r") as dscf: + dsc = debian.deb822.Dsc(dscf) + return [dscpath.parent / item["name"] for item in dsc["Files"]] + +def make_option(optname: str, value: typing.Optional[str]) -> typing.List[str]: + """Construct a valued option if a value is given.""" + if not value: + return [] + if optname.endswith("="): + return [optname + value] + return [optname, value] + +def profile_option(build: JsonObject, optname: str) -> typing.List[str]: + """Construct the option for specifying build profiles if required.""" + return make_option(optname, ",".join(build.get("profiles", ()))) |