summaryrefslogtreecommitdiff
path: root/mdbp/common.py
diff options
context:
space:
mode:
authorHelmut Grohne <helmut@subdivi.de>2021-04-18 14:42:27 +0200
committerHelmut Grohne <helmut@subdivi.de>2021-04-18 14:42:27 +0200
commitcf999acb17c8123ddee407d0e486ca3b275a5d7c (patch)
treebfe9307dc9d2dd49fd46111bab0e3fbe324d6687 /mdbp/common.py
downloadmdbp-cf999acb17c8123ddee407d0e486ca3b275a5d7c.tar.gz
initial checkin of mdbp
Proof-of-concept status. Some things work.
Diffstat (limited to 'mdbp/common.py')
-rw-r--r--mdbp/common.py135
1 files changed, 135 insertions, 0 deletions
diff --git a/mdbp/common.py b/mdbp/common.py
new file mode 100644
index 0000000..d0d2f2d
--- /dev/null
+++ b/mdbp/common.py
@@ -0,0 +1,135 @@
+# SPDX-License-Identifier: MIT
+"""Common functions used by multiple backends"""
+
+from __future__ import annotations
+import argparse
+import contextlib
+import hashlib
+import importlib.resources
+import json
+import pathlib
+import tempfile
+import typing
+import urllib.parse
+
+import debian.deb822
+import requests
+
+try:
+ import jsonschema
+except ImportError:
+ jsonschema = None
+
+def json_load(filecontextmanager:
+ typing.ContextManager[typing.IO[typing.AnyStr]]) -> typing.Any:
+ """Load the json context from a file context manager."""
+ with filecontextmanager as fileobj:
+ return json.load(fileobj)
+
+JsonObject = typing.Dict[str, typing.Any]
+
+def buildjson(filename: str) -> JsonObject:
+ """Type constructor for argparse validating a build json file path and
+ returning the parsed json object."""
+ buildobj = json_load(argparse.FileType("r")(filename))
+ if jsonschema:
+ jsonschema.validate(
+ buildobj,
+ json_load(
+ importlib.resources.open_text("mdbp", "build_schema.json")))
+ assert isinstance(buildobj, dict)
+ return buildobj
+
+def compute_env(build: JsonObject) -> typing.Dict[str, str]:
+ """Compute the process environment from the build object."""
+ env = dict(PATH="/usr/bin:/bin")
+ env.update(build.get("environment", {}))
+ if build.get("options"):
+ env["DEB_BUILD_OPTIONS"] = " ".join(build["options"])
+ return env
+
+class HashSumMismatch(Exception):
+ """Raised from `hash_check` when validation fails."""
+
+# pylint does not grok from __future__ import annotations yet
+# pylint: disable=E1101,W0212
+def hash_check(iterable: typing.Iterable[bytes], hashobj: hashlib._Hash,
+ expected_digest: str) -> \
+ typing.Iterator[bytes]:
+ """Wraps an iterable that yields bytes. It doesn't modify the sequence,
+ but on the final element it verifies that the concatenation of bytes
+ yields an expected digest value. Upon failure, the final next() results in
+ a HashSumMismatch rather than StopIteration.
+ """
+ for data in iterable:
+ hashobj.update(data)
+ yield data
+ if hashobj.hexdigest() != expected_digest:
+ raise HashSumMismatch()
+
+def download(uri: str, checksums: typing.Dict[str, str],
+ dest: pathlib.Path) -> None:
+ """Download the given uri and save it as the given dest path provided that
+ the given checksums match. When checksums do not match, raise a
+ HashSumMismatch.
+ """
+ with requests.get(uri, stream=True) as resp:
+ resp.raise_for_status()
+ iterable = resp.iter_content(None)
+ for algo, csum in checksums.items():
+ iterable = hash_check(iterable, hashlib.new(algo), csum)
+ try:
+ with dest.open("wb") as out:
+ for chunk in iterable:
+ out.write(chunk)
+ except HashSumMismatch:
+ dest.unlink()
+ raise
+
+@contextlib.contextmanager
+def get_dsc(build: JsonObject) -> typing.Iterator[pathlib.Path]:
+ """A context manager that provides a path pointing at the .dsc file for the
+ duration of the context. If the .dsc is supplied as a path, it simply is
+ returned. If it is supplied as a uri, it and the referred components are
+ downloaded to a temporary location.
+ """
+ try:
+ dscpath = build["input"]["dscpath"]
+ except KeyError:
+ dscuri = build["input"]["dscuri"]
+ with tempfile.TemporaryDirectory() as tdirname:
+ tdir = pathlib.Path(tdirname)
+ dscpath = tdir / dscuri.split("/")[-1]
+ download(dscuri, build["input"].get("checksums", {}), dscpath)
+ files: typing.Dict[str, typing.Dict[str, str]] = {}
+ with dscpath.open("r") as dscf:
+ for key, value in debian.deb822.Dsc(dscf).items():
+ if key.lower().startswith("checksums-"):
+ for entry in value:
+ algo = key[10:].lower()
+ files.setdefault(entry["name"], dict())[algo] = \
+ entry[algo]
+ for name, checksums in files.items():
+ download(urllib.parse.urljoin(dscuri, name), checksums,
+ tdir / name)
+ yield dscpath
+ else:
+ yield pathlib.Path(dscpath)
+
+def get_dsc_files(dscpath: pathlib.Path) -> typing.List[pathlib.Path]:
+ """Get the component names referenced by the .dsc file."""
+ with dscpath.open("r") as dscf:
+ dsc = debian.deb822.Dsc(dscf)
+ return [dscpath.parent / item["name"] for item in dsc["Files"]]
+
+def make_option(optname: str, value: typing.Optional[str]) -> typing.List[str]:
+ """Construct a valued option if a value is given."""
+ if not value:
+ return []
+ if optname.endswith("="):
+ return [optname + value]
+ return [optname, value]
+
+def profile_option(build: JsonObject, optname: str) -> typing.List[str]:
+ """Construct the option for specifying build profiles if required."""
+ return make_option(optname, ",".join(build.get("profiles", ())))