summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHelmut Grohne <helmut@subdivi.de>2013-06-10 18:22:29 +0200
committerHelmut Grohne <helmut@subdivi.de>2013-06-10 18:22:29 +0200
commit1359895781ec1f7887121984abc46aefc61c6717 (patch)
tree92810d3399c93b793b9bf9ad97ae467b07bdb18d
parent3667c29f675599b9f3e098dd9b708c1d099d2b8a (diff)
downloaddebian-dedup-1359895781ec1f7887121984abc46aefc61c6717.tar.gz
split the import phase to a yaml stream
importpkg.py now emits a yaml stream instead of updating the database. The acutual updating now happens in readyaml.py. In this process autoimport.py was significantly reworked to import packages in parallel.
-rw-r--r--README14
-rwxr-xr-xautoimport.py131
-rwxr-xr-ximportpkg.py47
-rwxr-xr-xreadyaml.py48
4 files changed, 177 insertions, 63 deletions
diff --git a/README b/README
index 2d362f9..a5ce9d7 100644
--- a/README
+++ b/README
@@ -1,7 +1,7 @@
Required packages
-----------------
- aptitude install python python-debian python-lzma curl python-jinja2 python-werkzeug sqlite3 python-imaging
+ aptitude install python python-debian python-lzma curl python-jinja2 python-werkzeug sqlite3 python-imaging python-yaml python-concurrent.futures
Create a database
-----------------
@@ -15,13 +15,17 @@ permanent.
Import packages
---------------
-Import individual packages by feeding them to importpkg.py:
+Import individual packages by feeding them to importpkg.py and readyaml.py:
- ls -t /var/cache/apt/archives/*.deb | while read f; do echo $f; ./importpkg.py < $f || break; done
+ ./importpkg.py < somepkg.deb | ./readyaml.py
-Import a full mirror::
+You can import your local apt cache:
- ./autoimport.py http://your.mirror.example/debian
+ ./autoimport.py /var/cache/apt/archives
+
+Import a full mirror (only http supported):
+
+ ./autoimport.py -n -p http://your.mirror.example/debian
Viewing the results
-------------------
diff --git a/autoimport.py b/autoimport.py
index 453a839..f30b36a 100755
--- a/autoimport.py
+++ b/autoimport.py
@@ -1,48 +1,125 @@
#!/usr/bin/python
+"""This scrip takes a directory or a http base url to a mirror and imports all
+packages contained. It has rather strong assumptions on the working directory.
+"""
import gzip
import io
+import multiprocessing
+import optparse
+import os
import sqlite3
import subprocess
-import sys
import urllib
+import concurrent.futures
from debian import deb822
from debian.debian_support import version_compare
+def process_http(pkgs, url):
+ pkglist = urllib.urlopen(url + "/dists/sid/main/binary-amd64/Packages.gz").read()
+ pkglist = gzip.GzipFile(fileobj=io.BytesIO(pkglist)).read()
+ pkglist = io.BytesIO(pkglist)
+ pkglist = deb822.Packages.iter_paragraphs(pkglist)
+ for pkg in pkglist:
+ name = pkg["Package"]
+ if name in pkgs and \
+ version_compare(pkgs[name]["version"], pkg["Version"]) > 0:
+ continue
+ pkgs[name] = dict(version=pkg["Version"],
+ filename="%s/%s" % (url, pkg["Filename"]))
+
+def process_dir(pkgs, d):
+ for entry in os.listdir(d):
+ if not entry.endswith(".deb"):
+ continue
+ parts = entry.split("_")
+ if len(parts) != 3:
+ continue
+ name, version, _ = parts
+ version = urllib.unquote(version)
+ if name in pkgs and version_compare(pkgs[name]["version"], version) > 0:
+ continue
+ pkgs[name] = dict(version=version, filename=os.path.join(d, entry))
+
+def process_pkg(name, filename):
+ print("importing %s" % filename)
+ if filename.startswith("http://"):
+ with open(os.path.join("tmp", name), "w") as outp:
+ dl = subprocess.Popen(["curl", "-s", filename],
+ stdout=subprocess.PIPE, close_fds=True)
+ imp = subprocess.Popen(["python", "importpkg.py"], stdin=dl.stdout,
+ stdout=outp, close_fds=True)
+ if imp.wait():
+ raise ValueError("importpkg failed")
+ if dl.wait():
+ raise ValueError("curl failed")
+ else:
+ with open(filename) as inp:
+ with open(os.path.join("tmp", name), "w") as outp:
+ subprocess.check_call(["python", "importpkg.py"], stdin=inp,
+ stdout=outp, close_fds=True)
+ print("preprocessed %s" % name)
+
def main():
- urlbase = sys.argv[1]
+ parser = optparse.OptionParser()
+ parser.add_option("-n", "--new", action="store_true",
+ help="avoid reimporting same versions")
+ parser.add_option("-p", "--prune", action="store_true",
+ help="prune packages old packages")
+ options, args = parser.parse_args()
+ subprocess.check_call(["mkdir", "-p", "tmp"])
db = sqlite3.connect("test.sqlite3")
cur = db.cursor()
cur.execute("PRAGMA foreign_keys = ON;")
+ e = concurrent.futures.ThreadPoolExecutor(multiprocessing.cpu_count())
+ pkgs = {}
+ for d in args:
+ print("processing %s" % d)
+ if d.startswith("http://"):
+ process_http(pkgs, d)
+ else:
+ process_dir(pkgs, d)
+
+ print("reading database")
cur.execute("SELECT package, version FROM package;")
knownpkgs = dict((row[0], row[1]) for row in cur.fetchall())
+ distpkgs = set(pkgs.keys())
+ if options.new:
+ for name in distpkgs:
+ if name in knownpkgs and version_compare(pkgs[name]["version"],
+ knownpkgs[name]) <= 0:
+ del pkgs[name]
+ knownpkgs = set(knownpkgs)
- pkglist = urllib.urlopen(urlbase + "/dists/sid/main/binary-amd64/Packages.gz").read()
- pkglist = gzip.GzipFile(fileobj=io.BytesIO(pkglist)).read()
- distpkgs = set()
- for pkg in deb822.Packages.iter_paragraphs(io.BytesIO(pkglist)):
- name = pkg["Package"]
- distpkgs.add(name)
- if name in knownpkgs and \
- version_compare(pkg["Version"], knownpkgs[name]) <= 0:
- continue
- pkgurl = "%s/%s" % (urlbase, pkg["Filename"])
- print("importing %s" % name)
- dl = subprocess.Popen(["curl", "-s", pkgurl], stdout=subprocess.PIPE)
- imp = subprocess.Popen("./importpkg.py", stdin=dl.stdout)
- if imp.wait():
- print("import failed")
- if dl.wait():
- print("curl failed")
-
- delpkgs = set(knownpkgs) - distpkgs
- print("clearing packages %s" % " ".join(delpkgs))
- cur.executemany("DELETE FROM package WHERE package = ?;",
- ((pkg,) for pkg in delpkgs))
- # Tables content, dependency and sharing will also be pruned
- # due to ON DELETE CASCADE clauses.
- db.commit()
+ with e:
+ fs = {}
+ for name, pkg in pkgs.items():
+ fs[e.submit(process_pkg, name, pkg["filename"])] = name
+
+ for f in concurrent.futures.as_completed(fs.keys()):
+ name = fs[f]
+ if f.exception():
+ print("%s failed to import: %r" % (name, f.exception()))
+ continue
+ inf = os.path.join("tmp", name)
+ print("sqlimporting %s" % name)
+ with open(inf) as inp:
+ try:
+ subprocess.check_call(["python", "readyaml.py"], stdin=inp)
+ except subprocess.CalledProcessError:
+ print("%s failed sql" % name)
+ else:
+ os.unlink(inf)
+
+ if options.prune:
+ delpkgs = knownpkgs - distpkgs
+ print("clearing packages %s" % " ".join(delpkgs))
+ cur.executemany("DELETE FROM package WHERE package = ?;",
+ ((pkg,) for pkg in delpkgs))
+ # Tables content, dependency and sharing will also be pruned
+ # due to ON DELETE CASCADE clauses.
+ db.commit()
if __name__ == "__main__":
main()
diff --git a/importpkg.py b/importpkg.py
index e0160e6..6e22b54 100755
--- a/importpkg.py
+++ b/importpkg.py
@@ -1,14 +1,18 @@
#!/usr/bin/python
+"""This tool reads a debian package from stdin and emits a yaml stream on
+stdout. It does not access a database. Therefore it can be run in parallel and
+on multiple machines. The generated yaml conatins multiple documents. The first
+document contains package metadata. Then a document is emitted for each file.
+And finally a document consisting of the string "commit" is emitted."""
import hashlib
-import sqlite3
import sys
import tarfile
import zlib
-from debian.debian_support import version_compare
from debian import deb822
import lzma
+import yaml
from dedup.arreader import ArReader
from dedup.hashing import HashBlacklist, DecompressedHash, SuppressingHash, hash_file
@@ -57,17 +61,16 @@ def get_hashes(tar):
hashes[hashobj.name] = hashvalue
yield (elem.name, elem.size, hashes)
-def process_package(db, filelike):
- cur = db.cursor()
- cur.execute("PRAGMA foreign_keys = ON;")
+def process_package(filelike):
af = ArReader(filelike)
af.read_magic()
state = "start"
- while True:
+ while state not in ("finished", "skipped"):
try:
name = af.read_entry()
except EOFError:
- break
+ if state != "finished":
+ raise ValueError("data.tar not found")
if name == "control.tar.gz":
if state != "start":
raise ValueError("unexpected control.tar.gz")
@@ -89,23 +92,11 @@ def process_package(db, filelike):
version = control["version"].encode("ascii")
architecture = control["architecture"].encode("ascii")
- cur.execute("SELECT version FROM package WHERE package = ?;",
- (package,))
- row = cur.fetchone()
- if row and version_compare(row[0], version) > 0:
- return # already seen a newer package
-
- cur.execute("DELETE FROM content WHERE package = ?;",
- (package,))
- cur.execute("INSERT OR REPLACE INTO package (package, version, architecture, source) VALUES (?, ?, ?, ?);",
- (package, version, architecture, source))
depends = control.relations.get("depends", [])
depends = set(dep[0]["name"].encode("ascii")
for dep in depends if len(dep) == 1)
- cur.execute("DELETE FROM dependency WHERE package = ?;",
- (package,))
- cur.executemany("INSERT INTO dependency (package, required) VALUES (?, ?);",
- ((package, dep) for dep in depends))
+ yield dict(package=package, source=source, version=version,
+ architecture=architecture, depends=depends)
break
continue
elif name == "data.tar.gz":
@@ -125,18 +116,12 @@ def process_package(db, filelike):
except UnicodeDecodeError:
print("warning: skipping filename with encoding error")
continue # skip files with non-utf8 encoding for now
- cur.execute("INSERT INTO content (package, filename, size) VALUES (?, ?, ?);",
- (package, name, size))
- cid = cur.lastrowid
- cur.executemany("INSERT INTO hash (cid, function, hash) VALUES (?, ?, ?);",
- ((cid, func, hexhash) for func, hexhash in hashes.items()))
- db.commit()
- return
- raise ValueError("data.tar not found")
+ yield dict(name=name, size=size, hashes=hashes)
+ state = "finished"
+ yield "commit"
def main():
- db = sqlite3.connect("test.sqlite3")
- process_package(db, sys.stdin)
+ yaml.safe_dump_all(process_package(sys.stdin), sys.stdout)
if __name__ == "__main__":
main()
diff --git a/readyaml.py b/readyaml.py
new file mode 100755
index 0000000..b66c7f3
--- /dev/null
+++ b/readyaml.py
@@ -0,0 +1,48 @@
+#!/usr/bin/python
+"""This tool reads a yaml file as generated by importpkg.py on stdin and
+updates the database with the contents."""
+
+import sqlite3
+import sys
+
+from debian.debian_support import version_compare
+import yaml
+
+def main():
+ db = sqlite3.connect("test.sqlite3")
+ cur = db.cursor()
+ cur.execute("PRAGMA foreign_keys = ON;")
+ gen = yaml.safe_load_all(sys.stdin)
+ metadata = next(gen)
+ package = metadata["package"]
+ cur.execute("SELECT version FROM package WHERE package = ?;",
+ (package,))
+ row = cur.fetchone()
+ if row and version_compare(row[0], metadata["version"]) > 0:
+ return
+
+ cur.execute("BEGIN;")
+ cur.execute("DELETE FROM content WHERE package = ?;",
+ (package,))
+ cur.execute("INSERT OR REPLACE INTO package (package, version, architecture, source) VALUES (?, ?, ?, ?);",
+ (package, metadata["version"], metadata["architecture"],
+ metadata["source"]))
+ cur.execute("DELETE FROM dependency WHERE package = ?;",
+ (package,))
+ cur.executemany("INSERT INTO dependency (package, required) VALUES (?, ?);",
+ ((package, dep) for dep in metadata["depends"]))
+ for entry in gen:
+ if entry == "commit":
+ db.commit()
+ return
+
+ cur.execute("INSERT INTO content (package, filename, size) VALUES (?, ?, ?);",
+ (package, entry["name"], entry["size"]))
+ cid = cur.lastrowid
+ cur.executemany("INSERT INTO hash (cid, function, hash) VALUES (?, ?, ?);",
+ ((cid, func, hexhash)
+ for func, hexhash in entry["hashes"].items()))
+ raise ValueError("missing commit block")
+
+if __name__ == "__main__":
+ main()