1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
|
#!/usr/bin/python
"""This scrip takes a directory or a http base url to a mirror and imports all
packages contained. It has rather strong assumptions on the working directory.
"""
import gzip
import errno
import io
import multiprocessing
import optparse
import os
import subprocess
import tempfile
import urllib
import concurrent.futures
from debian import deb822
from debian.debian_support import version_compare
import sqlalchemy
from dedup.utils import configure_database_engine
from readyaml import readyaml
def process_http(pkgs, url):
pkglist = urllib.urlopen(url + "/dists/sid/main/binary-amd64/Packages.gz").read()
pkglist = gzip.GzipFile(fileobj=io.BytesIO(pkglist)).read()
pkglist = io.BytesIO(pkglist)
pkglist = deb822.Packages.iter_paragraphs(pkglist)
for pkg in pkglist:
name = pkg["Package"]
if name in pkgs and \
version_compare(pkgs[name]["version"], pkg["Version"]) > 0:
continue
pkgs[name] = dict(version=pkg["Version"],
filename="%s/%s" % (url, pkg["Filename"]),
sha256hash=pkg["SHA256"])
def process_file(pkgs, filename):
base = os.path.basename(filename)
if not base.endswith(".deb"):
raise ValueError("filename does not end in .deb")
parts = base.split("_")
if len(parts) != 3:
raise ValueError("filename not in form name_version_arch.deb")
name, version, _ = parts
version = urllib.unquote(version)
if name in pkgs and version_compare(pkgs[name]["version"], version) > 0:
return
pkgs[name] = dict(version=version, filename=filename)
def process_dir(pkgs, d):
for entry in os.listdir(d):
try:
process_file(pkgs, os.path.join(d, entry))
except ValueError:
pass
def process_pkg(name, pkgdict, outpath):
filename = pkgdict["filename"]
print("importing %s" % filename)
importcmd = ["python", "importpkg.py"]
if "sha256hash" in pkgdict:
importcmd.extend(["-H", pkgdict["sha256hash"]])
if filename.startswith("http://"):
with open(outpath, "w") as outp:
dl = subprocess.Popen(["curl", "-s", filename],
stdout=subprocess.PIPE, close_fds=True)
imp = subprocess.Popen(importcmd, stdin=dl.stdout, stdout=outp,
close_fds=True)
if imp.wait():
raise ValueError("importpkg failed")
if dl.wait():
raise ValueError("curl failed")
else:
with open(filename) as inp:
with open(outpath, "w") as outp:
subprocess.check_call(importcmd, stdin=inp, stdout=outp,
close_fds=True)
print("preprocessed %s" % name)
def main():
parser = optparse.OptionParser()
parser.add_option("-n", "--new", action="store_true",
help="avoid reimporting same versions")
parser.add_option("-p", "--prune", action="store_true",
help="prune packages old packages")
parser.add_option("-d", "--database", action="store",
default="sqlite:///test.sqlite3",
help="location of the database")
options, args = parser.parse_args()
tmpdir = tempfile.mkdtemp(prefix=b"debian-dedup")
db = sqlalchemy.create_engine(options.database)
configure_database_engine(db)
e = concurrent.futures.ThreadPoolExecutor(multiprocessing.cpu_count())
pkgs = {}
for d in args:
print("processing %s" % d)
if d.startswith("http://"):
process_http(pkgs, d)
elif os.path.isdir(d):
process_dir(pkgs, d)
else:
process_file(pkgs, d)
print("reading database")
with db.begin() as conn:
cur = conn.execute("SELECT name, version FROM package;")
knownpkgs = dict((row[0], row[1]) for row in cur.fetchall())
distpkgs = set(pkgs.keys())
if options.new:
for name in distpkgs:
if name in knownpkgs and version_compare(pkgs[name]["version"],
knownpkgs[name]) <= 0:
del pkgs[name]
knownpkgs = set(knownpkgs)
with e:
fs = {}
for name, pkg in pkgs.items():
outpath = os.path.join(tmpdir, name)
fs[e.submit(process_pkg, name, pkg, outpath)] = name
for f in concurrent.futures.as_completed(fs.keys()):
name = fs[f]
if f.exception():
print("%s failed to import: %r" % (name, f.exception()))
continue
inf = os.path.join(tmpdir, name)
print("sqlimporting %s" % name)
with open(inf) as inp:
try:
with db.begin() as conn:
readyaml(conn, inp)
except Exception as exc:
print("%s failed sql with exception %r" % (name, exc))
else:
os.unlink(inf)
if options.prune:
delpkgs = knownpkgs - distpkgs
if delpkgs:
print("clearing packages %s" % " ".join(delpkgs))
with db.begin() as conn:
conn.execute(sqlalchemy.text("DELETE FROM package WHERE name = :name;"),
[dict(name=pkg) for pkg in delpkgs])
# Tables content, dependency and sharing will also be pruned
# due to ON DELETE CASCADE clauses.
try:
os.rmdir(tmpdir)
except OSError as err:
if err.errno != errno.ENOTEMPTY:
raise
print("keeping temporary directory %s due to failed packages %s" %
(tmpdir, " ".join(os.listdir(tmpdir))))
if __name__ == "__main__":
main()
|