1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
|
#!/usr/bin/python3
"""This tool reads a yaml file as generated by importpkg.py on stdin and
updates the database with the contents."""
import argparse
import sqlite3
import sys
import typing
from debian.debian_support import version_compare
import yaml
def readyaml(db: sqlite3.Connection, stream: typing.TextIO) -> None:
cur = db.cursor()
cur.execute("PRAGMA foreign_keys = ON;")
gen = yaml.load_all(stream, yaml.CSafeLoader)
metadata = next(gen)
package = metadata["package"]
cur.execute("SELECT id, version FROM package WHERE name = ?;",
(package,))
row = cur.fetchone()
if row:
pid, version = row
if version_compare(version, metadata["version"]) > 0:
return
else:
pid = None
cur.execute("BEGIN;")
cur.execute("SELECT name, id FROM function;")
funcmapping = dict(cur.fetchall())
if pid is not None:
cur.execute("DELETE FROM content WHERE pid = ?;", (pid,))
cur.execute("DELETE FROM dependency WHERE pid = ?;", (pid,))
cur.execute("UPDATE package SET version = ?, architecture = ?, source = ? WHERE id = ?;",
(metadata["version"], metadata["architecture"], metadata["source"], pid))
else:
cur.execute("INSERT INTO package (name, version, architecture, source) VALUES (?, ?, ?, ?);",
(package, metadata["version"], metadata["architecture"],
metadata["source"]))
pid = cur.lastrowid
cur.executemany("INSERT INTO dependency (pid, required) VALUES (?, ?);",
((pid, dep) for dep in metadata["depends"]))
for entry in gen:
if entry == "commit":
db.commit()
return
cur.execute("INSERT INTO content (pid, filename, size) VALUES (?, ?, ?);",
(pid, entry["name"], entry["size"]))
cid = cur.lastrowid
cur.executemany("INSERT INTO hash (cid, fid, hash) VALUES (?, ?, ?);",
((cid, funcmapping[func], hexhash)
for func, hexhash in entry["hashes"].items()))
raise ValueError("missing commit block")
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("-d", "--database", action="store",
default="test.sqlite3",
help="path to the sqlite3 database file")
args = parser.parse_args()
db = sqlite3.connect(args.database)
readyaml(db, sys.stdin)
if __name__ == "__main__":
main()
|