1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
|
# SPDX-License-Identifier: MIT
"""Common functions used by multiple backends"""
import argparse
import contextlib
import fnmatch
import hashlib
import importlib.resources
import json
import multiprocessing
import pathlib
import shlex
import tarfile
import tempfile
import typing
import urllib.parse
import debian.deb822
import debian.debian_support
import requests
try:
import jsonschema
HAVE_JSONSCHEMA = True
except ImportError:
HAVE_JSONSCHEMA = False
if typing.TYPE_CHECKING:
# pylint: disable=E1101,W0212
Hash = hashlib._Hash
else:
Hash = typing.Any
def json_load(filecontextmanager:
typing.ContextManager[typing.IO[typing.AnyStr]]) -> typing.Any:
"""Load the json context from a file context manager."""
with filecontextmanager as fileobj:
return json.load(fileobj)
JsonObject = typing.Dict[str, typing.Any]
def buildjson_validate(buildobj: JsonObject) -> None:
"""Validate the given build json object against the schema."""
if HAVE_JSONSCHEMA:
jsonschema.validate(
buildobj,
json_load(
importlib.resources.open_text("mdbp", "build_schema.json")))
def buildjson_patch_relative(buildobj: JsonObject,
basedir: pathlib.PurePath) -> None:
"""Resolve relative paths used in the buildobj using the given basedir:
* .input.source_package_path
* .output.directory
The operation is performed in-place and modifes the given buildobj.
"""
for attrs in (("input", "source_package_path"), ("output", "directory")):
obj = buildobj
for attr in attrs[:-1]:
try:
obj = obj[attr]
except KeyError:
break
else:
with contextlib.suppress(KeyError):
obj[attrs[-1]] = str(basedir / pathlib.Path(obj[attrs[-1]]))
def buildjson(filename: str) -> JsonObject:
"""Type constructor for argparse validating a build json file path and
returning the parsed json object."""
buildobj = json_load(argparse.FileType("r")(filename))
buildjson_validate(buildobj)
buildjson_patch_relative(buildobj, pathlib.Path(filename).parent)
assert isinstance(buildobj, dict)
return buildobj
def compute_env(build: JsonObject) -> typing.Dict[str, str]:
"""Compute the process environment from the build object."""
env = dict(PATH="/usr/bin:/bin")
env.update(build.get("environment", {}))
parallel = build.get("parallel")
if parallel == "auto":
parallel = "%d" % multiprocessing.cpu_count()
options = build.get("build_options", [])
if parallel:
options.append("parallel=" + str(parallel))
if options:
env["DEB_BUILD_OPTIONS"] = " ".join(options)
return env
class HashSumMismatch(Exception):
"""Raised from `hash_check` when validation fails."""
def hash_check(iterable: typing.Iterable[bytes], hashobj: Hash,
expected_digest: str) -> \
typing.Iterator[bytes]:
"""Wraps an iterable that yields bytes. It doesn't modify the sequence,
but on the final element it verifies that the concatenation of bytes
yields an expected digest value. Upon failure, the final next() results in
a HashSumMismatch rather than StopIteration.
"""
for data in iterable:
hashobj.update(data)
yield data
if hashobj.hexdigest() != expected_digest:
raise HashSumMismatch()
def download(uri: str, checksums: typing.Dict[str, str],
dest: pathlib.Path) -> None:
"""Download the given uri and save it as the given dest path provided that
the given checksums match. When checksums do not match, raise a
HashSumMismatch.
"""
with requests.get(uri, stream=True) as resp:
resp.raise_for_status()
iterable = resp.iter_content(None)
for algo, csum in checksums.items():
iterable = hash_check(iterable, hashlib.new(algo), csum)
try:
with dest.open("wb") as out:
for chunk in iterable:
out.write(chunk)
except HashSumMismatch:
dest.unlink()
raise
def parse_dsc(dscpath: pathlib.Path) -> debian.deb822.Dsc:
"""Parse a dsc file."""
with dscpath.open("r") as dscf:
return debian.deb822.Dsc(dscf)
def download_dsc(buildinput: JsonObject,
destdir: pathlib.Path) -> pathlib.Path:
"""Download the .input.source_package_url including referenced components
to the given destination directory and return the path to the contained
.dsc file.
"""
dscuri = buildinput["source_package_url"]
dscpath = destdir / dscuri.split("/")[-1]
# mypy doesn't grok this:
assert isinstance(dscpath, pathlib.Path)
download(dscuri, buildinput.get("checksums", {}), dscpath)
files: typing.Dict[str, typing.Dict[str, str]] = {}
for key, value in parse_dsc(dscpath).items():
if key.lower().startswith("checksums-"):
for entry in value:
algo = key[10:].lower()
files.setdefault(entry["name"], dict())[algo] = entry[algo]
for name, checksums in files.items():
download(urllib.parse.urljoin(dscuri, name), checksums, destdir / name)
return dscpath
@contextlib.contextmanager
def get_dsc(build: JsonObject) -> typing.Iterator[pathlib.Path]:
"""A context manager that provides a path pointing at the .dsc file for the
duration of the context. If the .dsc is supplied as a path, it simply is
returned. If it is supplied as a uri, it and the referred components are
downloaded to a temporary location.
"""
try:
dscpath = build["input"]["source_package_path"]
except KeyError:
with tempfile.TemporaryDirectory() as tdir:
yield download_dsc(build["input"], pathlib.Path(tdir))
else:
yield pathlib.Path(dscpath)
def get_dsc_files(dscpath: pathlib.Path,
dscobj: typing.Optional[debian.deb822.Dsc] = None) -> \
typing.List[pathlib.Path]:
"""Get the component names referenced by the .dsc."""
return [dscpath.parent / item["name"]
for item in (dscobj or parse_dsc(dscpath))["Files"]]
def build_subdir(source: str, version: str) -> str:
"""Compute the subdirectory that dpkg-source normally extracts to."""
return "%s-%s" % \
(source,
debian.debian_support.BaseVersion(version).upstream_version)
def make_option(optname: str, value: typing.Optional[str]) -> typing.List[str]:
"""Construct a valued option if a value is given."""
if not value:
return []
if optname.endswith("="):
return [optname + value]
return [optname, value]
def profile_option(build: JsonObject, optname: str) -> typing.List[str]:
"""Construct the option for specifying build profiles if required."""
return make_option(optname, ",".join(build.get("build_profiles", ())))
def tar_add(tarobj: tarfile.TarFile, path: pathlib.Path) -> None:
"""Add the given file as its basename to the tarobj retaining its
modification time, but no mode or ownership information.
"""
info = tarfile.TarInfo(path.name)
statres = path.stat()
info.size = statres.st_size
info.mtime = int(statres.st_mtime)
with path.open("rb") as fobj:
tarobj.addfile(info, fobj)
def clean_dir(directory: pathlib.Path, patterns: typing.List[str]) -> None:
"""Delete all entries of `directory` that match none of the given
`patterns`."""
for entry in directory.iterdir():
if not any(fnmatch.fnmatchcase(entry.name, pattern)
for pattern in patterns):
entry.unlink()
@contextlib.contextmanager
def temporary_static_file(content: str) -> typing.Iterator[pathlib.Path]:
"""Create a named temporary file with given content and return its path."""
with tempfile.NamedTemporaryFile("w") as tmpf:
tmpf.write(content)
tmpf.flush()
yield pathlib.Path(tmpf.name)
class AddSpaceSeparatedValues(argparse.Action):
"""The action extends the destination array with the space-sparated parts
of the passed value."""
def __call__(self, parser: argparse.ArgumentParser,
namespace: argparse.Namespace,
values: typing.Union[str, typing.Sequence[typing.Any], None],
option_string: typing.Optional[str] = None) -> None:
assert isinstance(values, str)
getattr(namespace, self.dest).extend(values.split())
def hook_commands(hook: typing.Dict[str, str], sourcetreedir: str) -> \
typing.Iterator[str]:
"""Generate a sequence of shell commands to run the given hook object. The
hook object is described in build_schema.yaml. The sourcetreedir parameter
specifies the location of the source tree. Its value is assumed to be
properly shell quoted such that variables and globs can be used."""
user = hook.get("user", "root")
cwd = hook.get("cwd", "root")
if user != "root" or cwd != "root":
yield "cd " + sourcetreedir
if user != "root":
yield "BUILD_USER=$(stat -c %U .)"
if cwd == "root":
yield "cd /"
if user == "root":
yield hook["command"]
else:
yield 'exec runuser -c %s "$BUILD_USER"' % \
shlex.quote(hook["command"])
|