summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHelmut Grohne <helmut@subdivi.de>2023-05-09 15:10:25 +0200
committerHelmut Grohne <helmut@subdivi.de>2023-05-09 15:12:01 +0200
commit924f0c734a7accb87e2ac911cee6e24dd463f237 (patch)
treeeb1bcaa2f25933374d28905bcb56e2e8aabeec62
parent8a05a6d8bacea0643a4967eed4cd67019ee0b6d7 (diff)
downloaddebian-dedup-master.tar.gz
add type annotations to most of the codeHEADmaster
-rwxr-xr-xautoimport.py23
-rw-r--r--dedup/compression.py60
-rw-r--r--dedup/debpkg.py50
-rw-r--r--dedup/filemagic.py20
-rw-r--r--dedup/hashing.py115
-rw-r--r--dedup/image.py22
-rw-r--r--dedup/utils.py18
-rwxr-xr-ximportpkg.py45
-rwxr-xr-xreadyaml.py7
-rwxr-xr-xupdate_sharing.py33
10 files changed, 275 insertions, 118 deletions
diff --git a/autoimport.py b/autoimport.py
index 0f518c6..d4e03d0 100755
--- a/autoimport.py
+++ b/autoimport.py
@@ -11,6 +11,7 @@ import sqlite3
import subprocess
import sys
import tempfile
+import typing
import urllib.parse
import concurrent.futures
from debian.debian_support import version_compare
@@ -19,7 +20,13 @@ from dedup.utils import iterate_packages
from readyaml import readyaml
-def process_http(pkgs, url, addhash=True):
+
+PkgDict = typing.Dict[str, str]
+
+
+def process_http(
+ pkgs: typing.Dict[str, PkgDict], url: str, addhash: bool = True
+) -> None:
for pkg in iterate_packages(url, "amd64"):
name = pkg["Package"]
if name in pkgs and \
@@ -31,7 +38,10 @@ def process_http(pkgs, url, addhash=True):
inst["sha256hash"] = pkg["SHA256"]
pkgs[name] = inst
-def process_file(pkgs, filename):
+
+def process_file(
+ pkgs: typing.Dict[str, PkgDict], filename: pathlib.Path
+) -> None:
if filename.suffix != ".deb":
raise ValueError("filename does not end in .deb")
parts = filename.name.split("_")
@@ -43,14 +53,15 @@ def process_file(pkgs, filename):
return
pkgs[name] = dict(version=version, filename=str(filename))
-def process_dir(pkgs, d):
+
+def process_dir(pkgs: typing.Dict[str, PkgDict], d: pathlib.Path) -> None:
for entry in d.iterdir():
try:
process_file(pkgs, entry)
except ValueError:
pass
-def process_pkg(name, pkgdict, outpath):
+def process_pkg(name: str, pkgdict: PkgDict, outpath: pathlib.Path) -> None:
filename = pkgdict["filename"]
print("importing %s" % filename)
importcmd = [sys.executable, "importpkg.py"]
@@ -67,7 +78,7 @@ def process_pkg(name, pkgdict, outpath):
close_fds=True)
print("preprocessed %s" % name)
-def main():
+def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("-n", "--new", action="store_true",
help="avoid reimporting same versions")
@@ -86,7 +97,7 @@ def main():
cur = db.cursor()
cur.execute("PRAGMA foreign_keys = ON;")
e = concurrent.futures.ThreadPoolExecutor(multiprocessing.cpu_count())
- pkgs = {}
+ pkgs: typing.Dict[str, PkgDict] = {}
for d in args.files:
print("processing %s" % d)
if d.startswith(("http://", "https://", "ftp://", "file://")):
diff --git a/dedup/compression.py b/dedup/compression.py
index da6e9a0..2e9869c 100644
--- a/dedup/compression.py
+++ b/dedup/compression.py
@@ -1,20 +1,38 @@
import bz2
import struct
+import typing
import zlib
import lzma
+
+class Decompressor(typing.Protocol):
+ def copy(self) -> "Decompressor":
+ ...
+
+ def decompress(self, data: bytes) -> bytes:
+ ...
+
+ def flush(self) -> bytes:
+ ...
+
+ @property
+ def unused_data(self) -> bytes:
+ ...
+
+
class GzipDecompressor:
"""An interface to gzip which is similar to bz2.BZ2Decompressor and
lzma.LZMADecompressor."""
- def __init__(self):
+
+ def __init__(self) -> None:
self.sawheader = False
self.inbuffer = b""
- self.decompressor = None
+ self.decompressor: typing.Optional[Decompressor] = None
self.crc = 0
self.size = 0
- def decompress(self, data):
+ def decompress(self, data: bytes) -> bytes:
"""
@raises ValueError: if no gzip magic is found
@raises zlib.error: from zlib invocations
@@ -57,7 +75,7 @@ class GzipDecompressor:
self.decompressor = zlib.decompressobj(-zlib.MAX_WBITS)
@property
- def unused_data(self):
+ def unused_data(self) -> bytes:
if self.decompressor:
return self.decompressor.unused_data
elif not self.sawheader:
@@ -69,7 +87,7 @@ class GzipDecompressor:
return b""
return self.inbuffer
- def flush(self):
+ def flush(self) -> bytes:
"""
@raises zlib.error: from zlib invocations
"""
@@ -77,7 +95,7 @@ class GzipDecompressor:
return b""
return self.decompressor.flush()
- def copy(self):
+ def copy(self) -> "GzipDecompressor":
new = GzipDecompressor()
new.inbuffer = self.inbuffer
if self.decompressor:
@@ -92,20 +110,25 @@ class DecompressedStream:
read(optional length), tell, seek(forward only) and close."""
blocksize = 65536
- def __init__(self, fileobj, decompressor):
+ def __init__(
+ self, fileobj: typing.BinaryIO, decompressor: Decompressor
+ ) -> None:
"""
@param fileobj: a file-like object providing read(size)
@param decompressor: a bz2.BZ2Decompressor or lzma.LZMADecompressor
like object providing methods decompress and flush and an
attribute unused_data
"""
- self.fileobj = fileobj
- self.decompressor = decompressor
+ self.fileobj: typing.Optional[typing.BinaryIO] = fileobj
+ self.decompressor: typing.Optional[Decompressor] = decompressor
self.buff = bytearray()
self.pos = 0
- def _fill_buff_until(self, predicate):
+ def _fill_buff_until(
+ self, predicate: typing.Callable[[bytes], bool]
+ ) -> None:
assert self.fileobj is not None
+ assert self.decompressor is not None
while not predicate(self.buff):
data = self.fileobj.read(self.blocksize)
if data:
@@ -115,13 +138,13 @@ class DecompressedStream:
self.buff += self.decompressor.flush()
break
- def _read_from_buff(self, length):
+ def _read_from_buff(self, length: int) -> bytes:
ret = bytes(self.buff[:length])
self.buff[:length] = b""
self.pos += length
return ret
- def read(self, length=None):
+ def read(self, length: typing.Optional[int] = None) -> bytes:
if length is None:
self._fill_buff_until(lambda _: False)
length = len(self.buff)
@@ -129,7 +152,7 @@ class DecompressedStream:
self._fill_buff_until(lambda b, l=length: len(b) >= l)
return self._read_from_buff(length)
- def readline(self):
+ def readline(self) -> bytes:
self._fill_buff_until(lambda b: b'\n' in b)
try:
length = self.buff.index(b'\n') + 1
@@ -137,14 +160,14 @@ class DecompressedStream:
length = len(self.buff)
return self._read_from_buff(length)
- def __iter__(self):
+ def __iter__(self) -> typing.Iterator[bytes]:
return iter(self.readline, b'')
- def tell(self):
+ def tell(self) -> int:
assert self.fileobj is not None
return self.pos
- def seek(self, pos):
+ def seek(self, pos: int) -> None:
"""Forward seeks by absolute position only."""
assert self.fileobj is not None
if pos < self.pos:
@@ -159,7 +182,7 @@ class DecompressedStream:
self.read(left)
return
- def close(self):
+ def close(self) -> None:
if self.fileobj is not None:
self.fileobj.close()
self.fileobj = None
@@ -173,13 +196,12 @@ decompressors = {
'.xz': lzma.LZMADecompressor,
}
-def decompress(filelike, extension):
+def decompress(filelike: typing.BinaryIO, extension: str) -> typing.BinaryIO:
"""Decompress a stream according to its extension.
@param filelike: is a read-only byte-stream. It must support read(size) and
close().
@param extension: permitted values are "", ".gz", ".bz2", ".lzma", and
".xz"
- @type extension: unicode
@returns: a read-only byte-stream with the decompressed contents of the
original filelike. It supports read(size) and close(). If the
original supports seek(pos) and tell(), then it also supports
diff --git a/dedup/debpkg.py b/dedup/debpkg.py
index de00e60..0d1b7da 100644
--- a/dedup/debpkg.py
+++ b/dedup/debpkg.py
@@ -1,26 +1,29 @@
import tarfile
+import typing
import arpy
from debian import deb822
from dedup.compression import decompress
-from dedup.hashing import hash_file
+from dedup.hashing import HashlibLike, hash_file
class MultiHash:
- def __init__(self, *hashes):
+ def __init__(self, *hashes: HashlibLike):
self.hashes = hashes
- def update(self, data):
+ def update(self, data: bytes) -> None:
for hasher in self.hashes:
hasher.update(data)
-def get_tar_hashes(tar, hash_functions):
+
+def get_tar_hashes(
+ tar: tarfile.TarFile,
+ hash_functions: typing.Sequence[typing.Callable[[], HashlibLike]],
+) -> typing.Iterator[typing.Tuple[str, int, typing.Dict[str, str]]]:
"""Given a TarFile read all regular files and compute all of the given hash
functions on each file.
- @type tar: tarfile.TarFile
@param hash_functions: a sequence of parameter-less functions each creating a
new hashlib-like object
- @rtype: gen((str, int, {str: str}}
@returns: an iterable of (filename, filesize, hashes) tuples where
hashes is a dict mapping hash function names to hash values
"""
@@ -29,7 +32,9 @@ def get_tar_hashes(tar, hash_functions):
if not elem.isreg(): # excludes hard links as well
continue
hasher = MultiHash(*[func() for func in hash_functions])
- hash_file(hasher, tar.extractfile(elem))
+ extracted = tar.extractfile(elem)
+ assert extracted is not None
+ hash_file(hasher, extracted)
hashes = {}
for hashobj in hasher.hashes:
hashvalue = hashobj.hexdigest()
@@ -37,17 +42,18 @@ def get_tar_hashes(tar, hash_functions):
hashes[hashobj.name] = hashvalue
yield (elem.name, elem.size, hashes)
-def opentar(filelike):
+
+def opentar(filelike: typing.BinaryIO) -> tarfile.TarFile:
return tarfile.open(fileobj=filelike, mode="r|", encoding="utf8",
errors="surrogateescape")
class DebExtractor:
"Base class for extracting desired features from a Debian package."
- def __init__(self):
+ def __init__(self) -> None:
self.arstate = "start"
- def process(self, filelike):
+ def process(self, filelike: typing.BinaryIO) -> None:
"""Process a Debian package.
@param filelike: is a file-like object containing the contents of the
Debian packge and can be read once without seeks.
@@ -89,22 +95,20 @@ class DebExtractor:
else:
assert self.arstate == "data"
- def handle_ar_end(self):
+ def handle_ar_end(self) -> None:
"Handle the end of the ar archive of the Debian package."
if self.arstate != "data":
raise ValueError("data.tar not found")
- def handle_debversion(self, version):
+ def handle_debversion(self, version: bytes) -> None:
"""Handle the debian-binary member of the Debian package.
- @type version: bytes
@param version: The full contents of the ar member.
"""
- def handle_control_tar(self, tarfileobj):
+ def handle_control_tar(self, tarfileobj: tarfile.TarFile) -> None:
"""Handle the control.tar member of the Debian package.
If you replace this method, none of handle_control_member,
handle_control_info or handle_control_end are called.
- @type tarfileobj: tarfile.TarFile
@param tarfile: is opened for streaming reads
"""
controlseen = False
@@ -113,7 +117,9 @@ class DebExtractor:
name = elem.name
if name.startswith("./"):
name = name[2:]
- content = tarfileobj.extractfile(elem).read()
+ extracted = tarfileobj.extractfile(elem)
+ assert extracted is not None
+ content = extracted.read()
self.handle_control_member(name, content)
if name == "control":
self.handle_control_info(deb822.Packages(content))
@@ -125,24 +131,20 @@ class DebExtractor:
raise ValueError("control missing from control.tar")
self.handle_control_end()
- def handle_control_member(self, name, content):
+ def handle_control_member(self, name: str, content: bytes) -> None:
"""Handle a file member of the control.tar member of the Debian package.
- @type name: str
@param name: is the plain member name
- @type content: bytes
"""
- def handle_control_info(self, info):
+ def handle_control_info(self, info: deb822.Packages) -> None:
"""Handle the control member of the control.tar member of the Debian
package.
- @type info: deb822.Packages
"""
- def handle_control_end(self):
+ def handle_control_end(self) -> None:
"Handle the end of the control.tar member of the Debian package."
- def handle_data_tar(self, tarfileobj):
+ def handle_data_tar(self, tarfileobj: tarfile.TarFile) -> None:
"""Handle the data.tar member of the Debian package.
- @type tarfileobj: tarfile.TarFile
@param tarfile: is opened for streaming reads
"""
diff --git a/dedup/filemagic.py b/dedup/filemagic.py
index b71c276..a6d09ba 100644
--- a/dedup/filemagic.py
+++ b/dedup/filemagic.py
@@ -1,6 +1,8 @@
"""A very strange "hash" that uses the magic module (python3-magic) to guess
the file type."""
+import typing
+
import magic
# It changed API a few times...
@@ -13,36 +15,38 @@ class FileDigester:
"""A hashlib-like class to guess a filetype using the magic module."""
FILE_BYTES_MAX = 1024 * 1024 # copied from file source
- def __init__(self):
- self.buff = b""
- self.identification = None
+ def __init__(self) -> None:
+ self.buff: typing.Optional[bytes] = b""
+ self.identification: typing.Optional[str] = None
- def _compute_identification(self):
+ def _compute_identification(self) -> str:
+ assert self.buff is not None
try:
return _magic_identify(self.buff)
except UnicodeDecodeError:
return "magic identification is not valid UTF-8"
- def update(self, buff):
+ def update(self, buff: bytes) -> None:
if self.identification:
return
+ assert self.buff is not None
self.buff += buff
if len(self.buff) >= self.FILE_BYTES_MAX:
self.identification = self._compute_identification()
self.buff = None
- def identify(self):
+ def identify(self) -> str:
"""Return the guessed file magic identification."""
if self.identification:
return self.identification
return self._compute_identification()
- def hexdigest(self):
+ def hexdigest(self) -> str:
"""Compatibility with hashlib. An alias of identify. Doesn't return
hex."""
return self.identify()
- def copy(self):
+ def copy(self) -> "FileDigester":
new = FileDigester()
new.buff = self.buff
new.identification = self.identification
diff --git a/dedup/hashing.py b/dedup/hashing.py
index 9cebcbb..21bbed2 100644
--- a/dedup/hashing.py
+++ b/dedup/hashing.py
@@ -1,4 +1,26 @@
import itertools
+import typing
+
+from .compression import Decompressor
+
+
+class HashlibLike(typing.Protocol):
+ def copy(self) -> "HashlibLike":
+ ...
+
+ def digest(self) -> typing.Optional[bytes]:
+ ...
+
+ def hexdigest(self) -> typing.Optional[str]:
+ ...
+
+ @property
+ def name(self) -> str:
+ ...
+
+ def update(self, data: bytes) -> None:
+ ...
+
class HashBlacklist:
"""Turn a hashlib-like object into a hash that returns None for some
@@ -7,7 +29,10 @@ class HashBlacklist:
We only work with hexdigests here, so diget() disappears. The methods
copy and update as well as the name attribute keep working as expected.
"""
- def __init__(self, hashobj, blacklist=()):
+
+ def __init__(
+ self, hashobj: HashlibLike, blacklist: typing.Container[str] = ()
+ ) -> None:
"""
@param hashobj: a hashlib-like object
@param blacklist: an object providing __contains__.
@@ -19,16 +44,16 @@ class HashBlacklist:
self.update = self.hashobj.update
@property
- def name(self):
+ def name(self) -> str:
return self.hashobj.name
- def hexdigest(self):
+ def hexdigest(self) -> typing.Optional[str]:
digest = self.hashobj.hexdigest()
if digest in self.blacklist:
return None
return digest
- def copy(self):
+ def copy(self) -> "HashBlacklist":
return HashBlacklist(self.hashobj.copy(), self.blacklist)
class HashBlacklistContent:
@@ -36,7 +61,12 @@ class HashBlacklistContent:
blacklisted content instead of the real hash value. Unlike HashBlacklist,
not the output of the hash is considered, but its input."""
- def __init__(self, hashobj, blacklist=(), maxlen=None):
+ def __init__(
+ self,
+ hashobj: HashlibLike,
+ blacklist: typing.Collection[bytes] = (),
+ maxlen: typing.Optional[int] = None,
+ ) -> None:
"""
@param hashobj: a hashlib-like object
@param blacklist: an object providing __contains__.
@@ -52,30 +82,30 @@ class HashBlacklistContent:
# the chain avoids passing the empty sequence to max
maxlen = max(itertools.chain((0,), map(len, blacklist)))
self.maxlen = maxlen
- self.stored = b""
+ self.stored: typing.Optional[bytes] = b""
@property
- def name(self):
+ def name(self) -> str:
return self.hashobj.name
- def update(self, data):
+ def update(self, data: bytes) -> None:
if self.stored is not None:
self.stored += data
if len(self.stored) > self.maxlen:
self.stored = None
self.hashobj.update(data)
- def digest(self):
+ def digest(self) -> typing.Optional[bytes]:
if self.stored is not None and self.stored in self.blacklist:
return None
return self.hashobj.digest()
- def hexdigest(self):
+ def hexdigest(self) -> typing.Optional[str]:
if self.stored is not None and self.stored in self.blacklist:
return None
return self.hashobj.hexdigest()
- def copy(self):
+ def copy(self) -> "HashBlacklistContent":
new = HashBlacklistContent(self.hashobj.copy(), self.blacklist,
self.maxlen)
new.stored = self.stored
@@ -84,7 +114,13 @@ class HashBlacklistContent:
class DecompressedHash:
"""Apply a decompression function before the hash. This class provides the
hashlib interface (update, hexdigest, copy) excluding digest and name."""
- def __init__(self, decompressor, hashobj, name="unnamed"):
+
+ def __init__(
+ self,
+ decompressor: Decompressor,
+ hashobj: HashlibLike,
+ name: str = "unnamed",
+ ):
"""
@param decompressor: a decompression object like bz2.BZ2Decompressor or
lzma.LZMADecompressor. It has to provide methods decompress and
@@ -98,23 +134,29 @@ class DecompressedHash:
self.hashobj = hashobj
self.name = name
- def update(self, data):
+ def update(self, data: bytes) -> None:
self.hashobj.update(self.decompressor.decompress(data))
- def hexdigest(self):
+ def _finalize_hashobj(self) -> HashlibLike:
if not hasattr(self.decompressor, "flush"):
if self.decompressor.unused_data:
raise ValueError("decompressor did not consume all data")
- return self.hashobj.hexdigest()
+ return self.hashobj
tmpdecomp = self.decompressor.copy()
data = tmpdecomp.flush()
if tmpdecomp.unused_data:
raise ValueError("decompressor did not consume all data")
tmphash = self.hashobj.copy()
tmphash.update(data)
- return tmphash.hexdigest()
+ return tmphash
- def copy(self):
+ def digest(self) -> typing.Optional[bytes]:
+ return self._finalize_hashobj().digest()
+
+ def hexdigest(self) -> typing.Optional[str]:
+ return self._finalize_hashobj().hexdigest()
+
+ def copy(self) -> "DecompressedHash":
return DecompressedHash(self.decompressor.copy(), self.hashobj.copy(),
self.name)
@@ -122,7 +164,8 @@ class SuppressingHash:
"""A hash that silences exceptions from the update and hexdigest methods of
a hashlib-like object. If an exception has occurred, hexdigest always
returns None."""
- def __init__(self, hashobj, exceptions=()):
+
+ def __init__(self, hashobj: HashlibLike, exceptions) -> None:
"""
@param hashobj: a hashlib-like object providing methods update, copy
and hexdigest. If a name attribute is present, it is mirrored as
@@ -130,19 +173,27 @@ class SuppressingHash:
@type exceptions: tuple
@param exceptions: exception classes to be suppressed
"""
- self.hashobj = hashobj
+ self.hashobj: typing.Optional[HashlibLike] = hashobj
self.exceptions = exceptions
if hasattr(hashobj, "name"):
self.name = hashobj.name
- def update(self, data):
+ def update(self, data: bytes) -> None:
if self.hashobj:
try:
self.hashobj.update(data)
except self.exceptions:
self.hashobj = None
- def hexdigest(self):
+ def digest(self) -> typing.Optional[bytes]:
+ if self.hashobj:
+ try:
+ return self.hashobj.digest()
+ except self.exceptions:
+ self.hashobj is None
+ return None
+
+ def hexdigest(self) -> typing.Optional[str]:
if self.hashobj:
try:
return self.hashobj.hexdigest()
@@ -150,12 +201,18 @@ class SuppressingHash:
self.hashobj = None
return None
- def copy(self):
+ def copy(self) -> "SuppressingHash":
if self.hashobj:
return SuppressingHash(self.hashobj.copy(), self.exceptions)
- return SuppressingHash(None, self.exceptions)
+ ret = SuppressingHash(None, self.exceptions)
+ if hasattr(self, "name"):
+ ret.name = self.name
+ return ret
+
-def hash_file(hashobj, filelike, blocksize=65536):
+def hash_file(
+ hashobj: HashlibLike, filelike: typing.BinaryIO, blocksize: int = 65536
+) -> None:
"""Feed the entire contents from the given filelike to the given hashobj.
@param hashobj: hashlib-like object providing an update method
@param filelike: file-like object providing read(size)
@@ -168,7 +225,9 @@ def hash_file(hashobj, filelike, blocksize=65536):
class HashedStream:
"""A file-like object, that supports sequential reading and hashes the
contents on the fly."""
- def __init__(self, filelike, hashobj):
+ def __init__(
+ self, filelike: typing.BinaryIO, hashobj: HashlibLike
+ ) -> None:
"""
@param filelike: a file-like object, that must support the read method
@param hashobj: a hashlib-like object providing update and hexdigest
@@ -176,15 +235,15 @@ class HashedStream:
self.filelike = filelike
self.hashobj = hashobj
- def read(self, length):
+ def read(self, length: int) -> bytes:
data = self.filelike.read(length)
self.hashobj.update(data)
return data
- def hexdigest(self):
+ def hexdigest(self) -> typing.Optional[str]:
return self.hashobj.hexdigest()
- def validate(self, hexdigest):
+ def validate(self, hexdigest: str) -> None:
"""Soak up any remaining input and validate the read data using the
given hexdigest.
@raises ValueError: when the hash does not match
diff --git a/dedup/image.py b/dedup/image.py
index 91321f4..a417528 100644
--- a/dedup/image.py
+++ b/dedup/image.py
@@ -3,6 +3,8 @@ import struct
import PIL.Image
+from .hashing import HashlibLike
+
class ImageHash:
"""A hash on the contents of an image data type supported by PIL. This
disregards mode, depth and meta information. Note that due to limitations
@@ -11,8 +13,9 @@ class ImageHash:
maxsize = 1024 * 1024 * 32
# max memory usage is about 5 * maxpixels in bytes
maxpixels = 1024 * 1024 * 32
+ name_prefix: str
- def __init__(self, hashobj):
+ def __init__(self, hashobj: HashlibLike) -> None:
"""
@param hashobj: a hashlib-like object
"""
@@ -20,23 +23,26 @@ class ImageHash:
self.imagedetected = False
self.content = io.BytesIO()
- def detect(self):
+ def detect(self) -> bool:
raise NotImplementedError
- def update(self, data):
+ def update(self, data: bytes) -> None:
self.content.write(data)
if self.content.tell() > self.maxsize:
raise ValueError("maximum image size exceeded")
if not self.imagedetected:
self.imagedetected = self.detect()
- def copy(self):
+ def copy(self) -> "ImageHash":
new = self.__class__(self.hashobj.copy())
new.imagedetected = self.imagedetected
new.content = io.BytesIO(self.content.getvalue())
return new
- def hexdigest(self):
+ def digest(self) -> bytes:
+ raise ValueError("an ImageHash cannot produce a raw digest")
+
+ def hexdigest(self) -> str:
if not self.imagedetected:
raise ValueError("not a image")
hashobj = self.hashobj.copy()
@@ -70,7 +76,7 @@ class ImageHash:
return "%s%8.8x%8.8x" % (hashobj.hexdigest(), width, height)
@property
- def name(self):
+ def name(self) -> str:
return self.name_prefix + self.hashobj.name
@@ -78,7 +84,7 @@ class PNGHash(ImageHash):
"""A hash on the contents of a PNG image."""
name_prefix = "png_"
- def detect(self):
+ def detect(self) -> bool:
if self.content.tell() < 33: # header + IHDR
return False
curvalue = self.content.getvalue()
@@ -93,7 +99,7 @@ class GIFHash(ImageHash):
"""A hash on the contents of the first frame of a GIF image."""
name_prefix = "gif_"
- def detect(self):
+ def detect(self) -> bool:
if self.content.tell() < 10: # magic + logical dimension
return False
curvalue = self.content.getvalue()
diff --git a/dedup/utils.py b/dedup/utils.py
index 55cdef0..e1b134f 100644
--- a/dedup/utils.py
+++ b/dedup/utils.py
@@ -1,5 +1,7 @@
import contextlib
import errno
+import sqlite3
+import typing
import urllib.error
import urllib.request
@@ -7,13 +9,17 @@ import debian.deb822
from dedup.compression import decompress
-def fetchiter(cursor):
+
+def fetchiter(cursor: sqlite3.Cursor) -> typing.Iterator[typing.Any]:
rows = cursor.fetchmany()
while rows:
yield from rows
rows = cursor.fetchmany()
-def open_compressed_mirror_url(url, extensions=(".xz", ".gz", "")):
+
+def open_compressed_mirror_url(
+ url: str, extensions: typing.Iterable[str] = (".xz", ".gz", "")
+) -> typing.BinaryIO:
"""Fetch the given url. Try appending each of the given compression
schemes and move on in case it doesn't exist. Decompress the resulting
stream on the fly.
@@ -34,7 +40,13 @@ def open_compressed_mirror_url(url, extensions=(".xz", ".gz", "")):
return decompress(handle, ext)
raise OSError(errno.ENOENT, "No such file or directory")
-def iterate_packages(mirror, architecture, distribution="sid", section="main"):
+
+def iterate_packages(
+ mirror: str,
+ architecture: str,
+ distribution: str = "sid",
+ section: str = "main",
+) -> typing.Iterator[debian.deb822.Packages]:
"""Download the relevant binary package list and generate
debian.deb822.Packages objects per listed package."""
url = "%s/dists/%s/%s/binary-%s/Packages" % \
diff --git a/importpkg.py b/importpkg.py
index 4f00407..160fe9e 100755
--- a/importpkg.py
+++ b/importpkg.py
@@ -8,32 +8,48 @@ And finally a document consisting of the string "commit" is emitted."""
import argparse
import hashlib
import sys
+import tarfile
import urllib.request
import zlib
+import debian.deb822
import yaml
from dedup.debpkg import DebExtractor, get_tar_hashes
-from dedup.hashing import DecompressedHash, SuppressingHash, HashedStream, \
- HashBlacklistContent
+from dedup.hashing import (
+ DecompressedHash,
+ HashBlacklistContent,
+ HashedStream,
+ HashlibLike,
+ SuppressingHash,
+)
from dedup.compression import GzipDecompressor
from dedup.image import GIFHash, PNGHash
boring_content = set((b"", b"\n"))
-def sha512_nontrivial():
+
+def sha512_nontrivial() -> HashlibLike:
return HashBlacklistContent(hashlib.sha512(), boring_content)
-def gziphash():
- hashobj = hashlib.sha512()
- hashobj = DecompressedHash(GzipDecompressor(), hashobj, "gzip_sha512")
- hashobj = SuppressingHash(hashobj, (ValueError, zlib.error))
- return HashBlacklistContent(hashobj, boring_content)
-def pnghash():
+def gziphash() -> HashlibLike:
+ return HashBlacklistContent(
+ SuppressingHash(
+ DecompressedHash(
+ GzipDecompressor(), hashlib.sha512(), "gzip_sha512"
+ ),
+ (ValueError, zlib.error),
+ ),
+ boring_content,
+ )
+
+
+def pnghash() -> HashlibLike:
return SuppressingHash(PNGHash(hashlib.sha512()), (ValueError,))
-def gifhash():
+
+def gifhash() -> HashlibLike:
return SuppressingHash(GIFHash(hashlib.sha512()), (ValueError,))
class ProcessingFinished(Exception):
@@ -42,11 +58,11 @@ class ProcessingFinished(Exception):
class ImportpkgExtractor(DebExtractor):
hash_functions = [sha512_nontrivial, gziphash, pnghash, gifhash]
- def __init__(self, callback):
+ def __init__(self, callback) -> None:
DebExtractor.__init__(self)
self.callback = callback
- def handle_control_info(self, info):
+ def handle_control_info(self, info: debian.deb822.Packages) -> None:
try:
source = info["source"].split()[0]
except KeyError:
@@ -60,7 +76,7 @@ class ImportpkgExtractor(DebExtractor):
version=info["version"],
architecture=info["architecture"], depends=depends))
- def handle_data_tar(self, tarfileobj):
+ def handle_data_tar(self, tarfileobj: tarfile.TarFile) -> None:
for name, size, hashes in get_tar_hashes(tarfileobj,
self.hash_functions):
try:
@@ -71,7 +87,8 @@ class ImportpkgExtractor(DebExtractor):
self.callback(dict(name=name, size=size, hashes=hashes))
raise ProcessingFinished()
-def main():
+
+def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("-H", "--hash", action="store",
help="verify that stdin hash given sha256 hash")
diff --git a/readyaml.py b/readyaml.py
index b88d1e0..7008263 100755
--- a/readyaml.py
+++ b/readyaml.py
@@ -5,11 +5,13 @@ updates the database with the contents."""
import argparse
import sqlite3
import sys
+import typing
from debian.debian_support import version_compare
import yaml
-def readyaml(db, stream):
+
+def readyaml(db: sqlite3.Connection, stream: typing.TextIO) -> None:
cur = db.cursor()
cur.execute("PRAGMA foreign_keys = ON;")
gen = yaml.load_all(stream, yaml.CSafeLoader)
@@ -53,7 +55,8 @@ def readyaml(db, stream):
for func, hexhash in entry["hashes"].items()))
raise ValueError("missing commit block")
-def main():
+
+def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("-d", "--database", action="store",
default="test.sqlite3",
diff --git a/update_sharing.py b/update_sharing.py
index 78e6171..3a86268 100755
--- a/update_sharing.py
+++ b/update_sharing.py
@@ -2,10 +2,17 @@
import argparse
import sqlite3
+import typing
from dedup.utils import fetchiter
-def add_values(cursor, insert_key, files, size):
+
+def add_values(
+ cursor: sqlite3.Cursor,
+ insert_key: typing.Tuple[int, int, int, int],
+ files: int,
+ size: int,
+) -> None:
cursor.execute("UPDATE sharing SET files = files + ?, size = size + ? WHERE pid1 = ? AND pid2 = ? AND fid1 = ? AND fid2 = ?;",
(files, size) + insert_key)
if cursor.rowcount > 0:
@@ -13,14 +20,25 @@ def add_values(cursor, insert_key, files, size):
cursor.execute("INSERT INTO sharing (pid1, pid2, fid1, fid2, files, size) VALUES (?, ?, ?, ?, ?, ?);",
insert_key + (files, size))
-def compute_pkgdict(rows):
- pkgdict = dict()
+
+def compute_pkgdict(
+ rows: typing.Iterable[typing.Tuple[int, typing.Any, str, int, int]]
+) -> typing.Dict[int, typing.Dict[int, typing.List[typing.Tuple[int, str]]]]:
+ pkgdict: typing.Dict[
+ int, typing.Dict[int, typing.List[typing.Tuple[int, str]]]
+ ] = {}
for pid, _, filename, size, fid in rows:
funcdict = pkgdict.setdefault(pid, {})
funcdict.setdefault(fid, []).append((size, filename))
return pkgdict
-def process_pkgdict(cursor, pkgdict):
+
+def process_pkgdict(
+ cursor: sqlite3.Cursor,
+ pkgdict: typing.Dict[
+ int, typing.Dict[int, typing.List[typing.Tuple[int, str]]]
+ ],
+) -> None:
for pid1, funcdict1 in pkgdict.items():
for fid1, files in funcdict1.items():
numfiles = len(files)
@@ -38,7 +56,8 @@ def process_pkgdict(cursor, pkgdict):
insert_key = (pid1, pid2, fid1, fid2)
add_values(cursor, insert_key, pkgnumfiles, pkgsize)
-def main(db):
+
+def main(db: sqlite3.Connection) -> None:
cur = db.cursor()
cur.execute("PRAGMA foreign_keys = ON;")
cur.execute("DELETE FROM sharing;")
@@ -49,7 +68,9 @@ def main(db):
for hashvalue, in fetchiter(readcur):
cur.execute("SELECT function.eqclass, content.pid, content.id, content.filename, content.size, hash.fid FROM hash JOIN content ON hash.cid = content.id JOIN function ON hash.fid = function.id AND function.eqclass IS NOT NULL WHERE hash = ?;",
(hashvalue,))
- rowdict = dict()
+ rowdict: typing.Dict[
+ int, typing.List[typing.Tuple[int, int, str, int, int]]
+ ] = {}
for row in cur.fetchall():
rowdict.setdefault(row[0], []).append(row[1:])
for eqclass, rows in rowdict.items():