summaryrefslogtreecommitdiff
path: root/dedup
diff options
context:
space:
mode:
authorHelmut Grohne <helmut@subdivi.de>2023-05-09 15:10:25 +0200
committerHelmut Grohne <helmut@subdivi.de>2023-05-09 15:12:01 +0200
commit924f0c734a7accb87e2ac911cee6e24dd463f237 (patch)
treeeb1bcaa2f25933374d28905bcb56e2e8aabeec62 /dedup
parent8a05a6d8bacea0643a4967eed4cd67019ee0b6d7 (diff)
downloaddebian-dedup-924f0c734a7accb87e2ac911cee6e24dd463f237.tar.gz
add type annotations to most of the codeHEADmaster
Diffstat (limited to 'dedup')
-rw-r--r--dedup/compression.py60
-rw-r--r--dedup/debpkg.py50
-rw-r--r--dedup/filemagic.py20
-rw-r--r--dedup/hashing.py115
-rw-r--r--dedup/image.py22
-rw-r--r--dedup/utils.py18
6 files changed, 195 insertions, 90 deletions
diff --git a/dedup/compression.py b/dedup/compression.py
index da6e9a0..2e9869c 100644
--- a/dedup/compression.py
+++ b/dedup/compression.py
@@ -1,20 +1,38 @@
import bz2
import struct
+import typing
import zlib
import lzma
+
+class Decompressor(typing.Protocol):
+ def copy(self) -> "Decompressor":
+ ...
+
+ def decompress(self, data: bytes) -> bytes:
+ ...
+
+ def flush(self) -> bytes:
+ ...
+
+ @property
+ def unused_data(self) -> bytes:
+ ...
+
+
class GzipDecompressor:
"""An interface to gzip which is similar to bz2.BZ2Decompressor and
lzma.LZMADecompressor."""
- def __init__(self):
+
+ def __init__(self) -> None:
self.sawheader = False
self.inbuffer = b""
- self.decompressor = None
+ self.decompressor: typing.Optional[Decompressor] = None
self.crc = 0
self.size = 0
- def decompress(self, data):
+ def decompress(self, data: bytes) -> bytes:
"""
@raises ValueError: if no gzip magic is found
@raises zlib.error: from zlib invocations
@@ -57,7 +75,7 @@ class GzipDecompressor:
self.decompressor = zlib.decompressobj(-zlib.MAX_WBITS)
@property
- def unused_data(self):
+ def unused_data(self) -> bytes:
if self.decompressor:
return self.decompressor.unused_data
elif not self.sawheader:
@@ -69,7 +87,7 @@ class GzipDecompressor:
return b""
return self.inbuffer
- def flush(self):
+ def flush(self) -> bytes:
"""
@raises zlib.error: from zlib invocations
"""
@@ -77,7 +95,7 @@ class GzipDecompressor:
return b""
return self.decompressor.flush()
- def copy(self):
+ def copy(self) -> "GzipDecompressor":
new = GzipDecompressor()
new.inbuffer = self.inbuffer
if self.decompressor:
@@ -92,20 +110,25 @@ class DecompressedStream:
read(optional length), tell, seek(forward only) and close."""
blocksize = 65536
- def __init__(self, fileobj, decompressor):
+ def __init__(
+ self, fileobj: typing.BinaryIO, decompressor: Decompressor
+ ) -> None:
"""
@param fileobj: a file-like object providing read(size)
@param decompressor: a bz2.BZ2Decompressor or lzma.LZMADecompressor
like object providing methods decompress and flush and an
attribute unused_data
"""
- self.fileobj = fileobj
- self.decompressor = decompressor
+ self.fileobj: typing.Optional[typing.BinaryIO] = fileobj
+ self.decompressor: typing.Optional[Decompressor] = decompressor
self.buff = bytearray()
self.pos = 0
- def _fill_buff_until(self, predicate):
+ def _fill_buff_until(
+ self, predicate: typing.Callable[[bytes], bool]
+ ) -> None:
assert self.fileobj is not None
+ assert self.decompressor is not None
while not predicate(self.buff):
data = self.fileobj.read(self.blocksize)
if data:
@@ -115,13 +138,13 @@ class DecompressedStream:
self.buff += self.decompressor.flush()
break
- def _read_from_buff(self, length):
+ def _read_from_buff(self, length: int) -> bytes:
ret = bytes(self.buff[:length])
self.buff[:length] = b""
self.pos += length
return ret
- def read(self, length=None):
+ def read(self, length: typing.Optional[int] = None) -> bytes:
if length is None:
self._fill_buff_until(lambda _: False)
length = len(self.buff)
@@ -129,7 +152,7 @@ class DecompressedStream:
self._fill_buff_until(lambda b, l=length: len(b) >= l)
return self._read_from_buff(length)
- def readline(self):
+ def readline(self) -> bytes:
self._fill_buff_until(lambda b: b'\n' in b)
try:
length = self.buff.index(b'\n') + 1
@@ -137,14 +160,14 @@ class DecompressedStream:
length = len(self.buff)
return self._read_from_buff(length)
- def __iter__(self):
+ def __iter__(self) -> typing.Iterator[bytes]:
return iter(self.readline, b'')
- def tell(self):
+ def tell(self) -> int:
assert self.fileobj is not None
return self.pos
- def seek(self, pos):
+ def seek(self, pos: int) -> None:
"""Forward seeks by absolute position only."""
assert self.fileobj is not None
if pos < self.pos:
@@ -159,7 +182,7 @@ class DecompressedStream:
self.read(left)
return
- def close(self):
+ def close(self) -> None:
if self.fileobj is not None:
self.fileobj.close()
self.fileobj = None
@@ -173,13 +196,12 @@ decompressors = {
'.xz': lzma.LZMADecompressor,
}
-def decompress(filelike, extension):
+def decompress(filelike: typing.BinaryIO, extension: str) -> typing.BinaryIO:
"""Decompress a stream according to its extension.
@param filelike: is a read-only byte-stream. It must support read(size) and
close().
@param extension: permitted values are "", ".gz", ".bz2", ".lzma", and
".xz"
- @type extension: unicode
@returns: a read-only byte-stream with the decompressed contents of the
original filelike. It supports read(size) and close(). If the
original supports seek(pos) and tell(), then it also supports
diff --git a/dedup/debpkg.py b/dedup/debpkg.py
index de00e60..0d1b7da 100644
--- a/dedup/debpkg.py
+++ b/dedup/debpkg.py
@@ -1,26 +1,29 @@
import tarfile
+import typing
import arpy
from debian import deb822
from dedup.compression import decompress
-from dedup.hashing import hash_file
+from dedup.hashing import HashlibLike, hash_file
class MultiHash:
- def __init__(self, *hashes):
+ def __init__(self, *hashes: HashlibLike):
self.hashes = hashes
- def update(self, data):
+ def update(self, data: bytes) -> None:
for hasher in self.hashes:
hasher.update(data)
-def get_tar_hashes(tar, hash_functions):
+
+def get_tar_hashes(
+ tar: tarfile.TarFile,
+ hash_functions: typing.Sequence[typing.Callable[[], HashlibLike]],
+) -> typing.Iterator[typing.Tuple[str, int, typing.Dict[str, str]]]:
"""Given a TarFile read all regular files and compute all of the given hash
functions on each file.
- @type tar: tarfile.TarFile
@param hash_functions: a sequence of parameter-less functions each creating a
new hashlib-like object
- @rtype: gen((str, int, {str: str}}
@returns: an iterable of (filename, filesize, hashes) tuples where
hashes is a dict mapping hash function names to hash values
"""
@@ -29,7 +32,9 @@ def get_tar_hashes(tar, hash_functions):
if not elem.isreg(): # excludes hard links as well
continue
hasher = MultiHash(*[func() for func in hash_functions])
- hash_file(hasher, tar.extractfile(elem))
+ extracted = tar.extractfile(elem)
+ assert extracted is not None
+ hash_file(hasher, extracted)
hashes = {}
for hashobj in hasher.hashes:
hashvalue = hashobj.hexdigest()
@@ -37,17 +42,18 @@ def get_tar_hashes(tar, hash_functions):
hashes[hashobj.name] = hashvalue
yield (elem.name, elem.size, hashes)
-def opentar(filelike):
+
+def opentar(filelike: typing.BinaryIO) -> tarfile.TarFile:
return tarfile.open(fileobj=filelike, mode="r|", encoding="utf8",
errors="surrogateescape")
class DebExtractor:
"Base class for extracting desired features from a Debian package."
- def __init__(self):
+ def __init__(self) -> None:
self.arstate = "start"
- def process(self, filelike):
+ def process(self, filelike: typing.BinaryIO) -> None:
"""Process a Debian package.
@param filelike: is a file-like object containing the contents of the
Debian packge and can be read once without seeks.
@@ -89,22 +95,20 @@ class DebExtractor:
else:
assert self.arstate == "data"
- def handle_ar_end(self):
+ def handle_ar_end(self) -> None:
"Handle the end of the ar archive of the Debian package."
if self.arstate != "data":
raise ValueError("data.tar not found")
- def handle_debversion(self, version):
+ def handle_debversion(self, version: bytes) -> None:
"""Handle the debian-binary member of the Debian package.
- @type version: bytes
@param version: The full contents of the ar member.
"""
- def handle_control_tar(self, tarfileobj):
+ def handle_control_tar(self, tarfileobj: tarfile.TarFile) -> None:
"""Handle the control.tar member of the Debian package.
If you replace this method, none of handle_control_member,
handle_control_info or handle_control_end are called.
- @type tarfileobj: tarfile.TarFile
@param tarfile: is opened for streaming reads
"""
controlseen = False
@@ -113,7 +117,9 @@ class DebExtractor:
name = elem.name
if name.startswith("./"):
name = name[2:]
- content = tarfileobj.extractfile(elem).read()
+ extracted = tarfileobj.extractfile(elem)
+ assert extracted is not None
+ content = extracted.read()
self.handle_control_member(name, content)
if name == "control":
self.handle_control_info(deb822.Packages(content))
@@ -125,24 +131,20 @@ class DebExtractor:
raise ValueError("control missing from control.tar")
self.handle_control_end()
- def handle_control_member(self, name, content):
+ def handle_control_member(self, name: str, content: bytes) -> None:
"""Handle a file member of the control.tar member of the Debian package.
- @type name: str
@param name: is the plain member name
- @type content: bytes
"""
- def handle_control_info(self, info):
+ def handle_control_info(self, info: deb822.Packages) -> None:
"""Handle the control member of the control.tar member of the Debian
package.
- @type info: deb822.Packages
"""
- def handle_control_end(self):
+ def handle_control_end(self) -> None:
"Handle the end of the control.tar member of the Debian package."
- def handle_data_tar(self, tarfileobj):
+ def handle_data_tar(self, tarfileobj: tarfile.TarFile) -> None:
"""Handle the data.tar member of the Debian package.
- @type tarfileobj: tarfile.TarFile
@param tarfile: is opened for streaming reads
"""
diff --git a/dedup/filemagic.py b/dedup/filemagic.py
index b71c276..a6d09ba 100644
--- a/dedup/filemagic.py
+++ b/dedup/filemagic.py
@@ -1,6 +1,8 @@
"""A very strange "hash" that uses the magic module (python3-magic) to guess
the file type."""
+import typing
+
import magic
# It changed API a few times...
@@ -13,36 +15,38 @@ class FileDigester:
"""A hashlib-like class to guess a filetype using the magic module."""
FILE_BYTES_MAX = 1024 * 1024 # copied from file source
- def __init__(self):
- self.buff = b""
- self.identification = None
+ def __init__(self) -> None:
+ self.buff: typing.Optional[bytes] = b""
+ self.identification: typing.Optional[str] = None
- def _compute_identification(self):
+ def _compute_identification(self) -> str:
+ assert self.buff is not None
try:
return _magic_identify(self.buff)
except UnicodeDecodeError:
return "magic identification is not valid UTF-8"
- def update(self, buff):
+ def update(self, buff: bytes) -> None:
if self.identification:
return
+ assert self.buff is not None
self.buff += buff
if len(self.buff) >= self.FILE_BYTES_MAX:
self.identification = self._compute_identification()
self.buff = None
- def identify(self):
+ def identify(self) -> str:
"""Return the guessed file magic identification."""
if self.identification:
return self.identification
return self._compute_identification()
- def hexdigest(self):
+ def hexdigest(self) -> str:
"""Compatibility with hashlib. An alias of identify. Doesn't return
hex."""
return self.identify()
- def copy(self):
+ def copy(self) -> "FileDigester":
new = FileDigester()
new.buff = self.buff
new.identification = self.identification
diff --git a/dedup/hashing.py b/dedup/hashing.py
index 9cebcbb..21bbed2 100644
--- a/dedup/hashing.py
+++ b/dedup/hashing.py
@@ -1,4 +1,26 @@
import itertools
+import typing
+
+from .compression import Decompressor
+
+
+class HashlibLike(typing.Protocol):
+ def copy(self) -> "HashlibLike":
+ ...
+
+ def digest(self) -> typing.Optional[bytes]:
+ ...
+
+ def hexdigest(self) -> typing.Optional[str]:
+ ...
+
+ @property
+ def name(self) -> str:
+ ...
+
+ def update(self, data: bytes) -> None:
+ ...
+
class HashBlacklist:
"""Turn a hashlib-like object into a hash that returns None for some
@@ -7,7 +29,10 @@ class HashBlacklist:
We only work with hexdigests here, so diget() disappears. The methods
copy and update as well as the name attribute keep working as expected.
"""
- def __init__(self, hashobj, blacklist=()):
+
+ def __init__(
+ self, hashobj: HashlibLike, blacklist: typing.Container[str] = ()
+ ) -> None:
"""
@param hashobj: a hashlib-like object
@param blacklist: an object providing __contains__.
@@ -19,16 +44,16 @@ class HashBlacklist:
self.update = self.hashobj.update
@property
- def name(self):
+ def name(self) -> str:
return self.hashobj.name
- def hexdigest(self):
+ def hexdigest(self) -> typing.Optional[str]:
digest = self.hashobj.hexdigest()
if digest in self.blacklist:
return None
return digest
- def copy(self):
+ def copy(self) -> "HashBlacklist":
return HashBlacklist(self.hashobj.copy(), self.blacklist)
class HashBlacklistContent:
@@ -36,7 +61,12 @@ class HashBlacklistContent:
blacklisted content instead of the real hash value. Unlike HashBlacklist,
not the output of the hash is considered, but its input."""
- def __init__(self, hashobj, blacklist=(), maxlen=None):
+ def __init__(
+ self,
+ hashobj: HashlibLike,
+ blacklist: typing.Collection[bytes] = (),
+ maxlen: typing.Optional[int] = None,
+ ) -> None:
"""
@param hashobj: a hashlib-like object
@param blacklist: an object providing __contains__.
@@ -52,30 +82,30 @@ class HashBlacklistContent:
# the chain avoids passing the empty sequence to max
maxlen = max(itertools.chain((0,), map(len, blacklist)))
self.maxlen = maxlen
- self.stored = b""
+ self.stored: typing.Optional[bytes] = b""
@property
- def name(self):
+ def name(self) -> str:
return self.hashobj.name
- def update(self, data):
+ def update(self, data: bytes) -> None:
if self.stored is not None:
self.stored += data
if len(self.stored) > self.maxlen:
self.stored = None
self.hashobj.update(data)
- def digest(self):
+ def digest(self) -> typing.Optional[bytes]:
if self.stored is not None and self.stored in self.blacklist:
return None
return self.hashobj.digest()
- def hexdigest(self):
+ def hexdigest(self) -> typing.Optional[str]:
if self.stored is not None and self.stored in self.blacklist:
return None
return self.hashobj.hexdigest()
- def copy(self):
+ def copy(self) -> "HashBlacklistContent":
new = HashBlacklistContent(self.hashobj.copy(), self.blacklist,
self.maxlen)
new.stored = self.stored
@@ -84,7 +114,13 @@ class HashBlacklistContent:
class DecompressedHash:
"""Apply a decompression function before the hash. This class provides the
hashlib interface (update, hexdigest, copy) excluding digest and name."""
- def __init__(self, decompressor, hashobj, name="unnamed"):
+
+ def __init__(
+ self,
+ decompressor: Decompressor,
+ hashobj: HashlibLike,
+ name: str = "unnamed",
+ ):
"""
@param decompressor: a decompression object like bz2.BZ2Decompressor or
lzma.LZMADecompressor. It has to provide methods decompress and
@@ -98,23 +134,29 @@ class DecompressedHash:
self.hashobj = hashobj
self.name = name
- def update(self, data):
+ def update(self, data: bytes) -> None:
self.hashobj.update(self.decompressor.decompress(data))
- def hexdigest(self):
+ def _finalize_hashobj(self) -> HashlibLike:
if not hasattr(self.decompressor, "flush"):
if self.decompressor.unused_data:
raise ValueError("decompressor did not consume all data")
- return self.hashobj.hexdigest()
+ return self.hashobj
tmpdecomp = self.decompressor.copy()
data = tmpdecomp.flush()
if tmpdecomp.unused_data:
raise ValueError("decompressor did not consume all data")
tmphash = self.hashobj.copy()
tmphash.update(data)
- return tmphash.hexdigest()
+ return tmphash
- def copy(self):
+ def digest(self) -> typing.Optional[bytes]:
+ return self._finalize_hashobj().digest()
+
+ def hexdigest(self) -> typing.Optional[str]:
+ return self._finalize_hashobj().hexdigest()
+
+ def copy(self) -> "DecompressedHash":
return DecompressedHash(self.decompressor.copy(), self.hashobj.copy(),
self.name)
@@ -122,7 +164,8 @@ class SuppressingHash:
"""A hash that silences exceptions from the update and hexdigest methods of
a hashlib-like object. If an exception has occurred, hexdigest always
returns None."""
- def __init__(self, hashobj, exceptions=()):
+
+ def __init__(self, hashobj: HashlibLike, exceptions) -> None:
"""
@param hashobj: a hashlib-like object providing methods update, copy
and hexdigest. If a name attribute is present, it is mirrored as
@@ -130,19 +173,27 @@ class SuppressingHash:
@type exceptions: tuple
@param exceptions: exception classes to be suppressed
"""
- self.hashobj = hashobj
+ self.hashobj: typing.Optional[HashlibLike] = hashobj
self.exceptions = exceptions
if hasattr(hashobj, "name"):
self.name = hashobj.name
- def update(self, data):
+ def update(self, data: bytes) -> None:
if self.hashobj:
try:
self.hashobj.update(data)
except self.exceptions:
self.hashobj = None
- def hexdigest(self):
+ def digest(self) -> typing.Optional[bytes]:
+ if self.hashobj:
+ try:
+ return self.hashobj.digest()
+ except self.exceptions:
+ self.hashobj is None
+ return None
+
+ def hexdigest(self) -> typing.Optional[str]:
if self.hashobj:
try:
return self.hashobj.hexdigest()
@@ -150,12 +201,18 @@ class SuppressingHash:
self.hashobj = None
return None
- def copy(self):
+ def copy(self) -> "SuppressingHash":
if self.hashobj:
return SuppressingHash(self.hashobj.copy(), self.exceptions)
- return SuppressingHash(None, self.exceptions)
+ ret = SuppressingHash(None, self.exceptions)
+ if hasattr(self, "name"):
+ ret.name = self.name
+ return ret
+
-def hash_file(hashobj, filelike, blocksize=65536):
+def hash_file(
+ hashobj: HashlibLike, filelike: typing.BinaryIO, blocksize: int = 65536
+) -> None:
"""Feed the entire contents from the given filelike to the given hashobj.
@param hashobj: hashlib-like object providing an update method
@param filelike: file-like object providing read(size)
@@ -168,7 +225,9 @@ def hash_file(hashobj, filelike, blocksize=65536):
class HashedStream:
"""A file-like object, that supports sequential reading and hashes the
contents on the fly."""
- def __init__(self, filelike, hashobj):
+ def __init__(
+ self, filelike: typing.BinaryIO, hashobj: HashlibLike
+ ) -> None:
"""
@param filelike: a file-like object, that must support the read method
@param hashobj: a hashlib-like object providing update and hexdigest
@@ -176,15 +235,15 @@ class HashedStream:
self.filelike = filelike
self.hashobj = hashobj
- def read(self, length):
+ def read(self, length: int) -> bytes:
data = self.filelike.read(length)
self.hashobj.update(data)
return data
- def hexdigest(self):
+ def hexdigest(self) -> typing.Optional[str]:
return self.hashobj.hexdigest()
- def validate(self, hexdigest):
+ def validate(self, hexdigest: str) -> None:
"""Soak up any remaining input and validate the read data using the
given hexdigest.
@raises ValueError: when the hash does not match
diff --git a/dedup/image.py b/dedup/image.py
index 91321f4..a417528 100644
--- a/dedup/image.py
+++ b/dedup/image.py
@@ -3,6 +3,8 @@ import struct
import PIL.Image
+from .hashing import HashlibLike
+
class ImageHash:
"""A hash on the contents of an image data type supported by PIL. This
disregards mode, depth and meta information. Note that due to limitations
@@ -11,8 +13,9 @@ class ImageHash:
maxsize = 1024 * 1024 * 32
# max memory usage is about 5 * maxpixels in bytes
maxpixels = 1024 * 1024 * 32
+ name_prefix: str
- def __init__(self, hashobj):
+ def __init__(self, hashobj: HashlibLike) -> None:
"""
@param hashobj: a hashlib-like object
"""
@@ -20,23 +23,26 @@ class ImageHash:
self.imagedetected = False
self.content = io.BytesIO()
- def detect(self):
+ def detect(self) -> bool:
raise NotImplementedError
- def update(self, data):
+ def update(self, data: bytes) -> None:
self.content.write(data)
if self.content.tell() > self.maxsize:
raise ValueError("maximum image size exceeded")
if not self.imagedetected:
self.imagedetected = self.detect()
- def copy(self):
+ def copy(self) -> "ImageHash":
new = self.__class__(self.hashobj.copy())
new.imagedetected = self.imagedetected
new.content = io.BytesIO(self.content.getvalue())
return new
- def hexdigest(self):
+ def digest(self) -> bytes:
+ raise ValueError("an ImageHash cannot produce a raw digest")
+
+ def hexdigest(self) -> str:
if not self.imagedetected:
raise ValueError("not a image")
hashobj = self.hashobj.copy()
@@ -70,7 +76,7 @@ class ImageHash:
return "%s%8.8x%8.8x" % (hashobj.hexdigest(), width, height)
@property
- def name(self):
+ def name(self) -> str:
return self.name_prefix + self.hashobj.name
@@ -78,7 +84,7 @@ class PNGHash(ImageHash):
"""A hash on the contents of a PNG image."""
name_prefix = "png_"
- def detect(self):
+ def detect(self) -> bool:
if self.content.tell() < 33: # header + IHDR
return False
curvalue = self.content.getvalue()
@@ -93,7 +99,7 @@ class GIFHash(ImageHash):
"""A hash on the contents of the first frame of a GIF image."""
name_prefix = "gif_"
- def detect(self):
+ def detect(self) -> bool:
if self.content.tell() < 10: # magic + logical dimension
return False
curvalue = self.content.getvalue()
diff --git a/dedup/utils.py b/dedup/utils.py
index 55cdef0..e1b134f 100644
--- a/dedup/utils.py
+++ b/dedup/utils.py
@@ -1,5 +1,7 @@
import contextlib
import errno
+import sqlite3
+import typing
import urllib.error
import urllib.request
@@ -7,13 +9,17 @@ import debian.deb822
from dedup.compression import decompress
-def fetchiter(cursor):
+
+def fetchiter(cursor: sqlite3.Cursor) -> typing.Iterator[typing.Any]:
rows = cursor.fetchmany()
while rows:
yield from rows
rows = cursor.fetchmany()
-def open_compressed_mirror_url(url, extensions=(".xz", ".gz", "")):
+
+def open_compressed_mirror_url(
+ url: str, extensions: typing.Iterable[str] = (".xz", ".gz", "")
+) -> typing.BinaryIO:
"""Fetch the given url. Try appending each of the given compression
schemes and move on in case it doesn't exist. Decompress the resulting
stream on the fly.
@@ -34,7 +40,13 @@ def open_compressed_mirror_url(url, extensions=(".xz", ".gz", "")):
return decompress(handle, ext)
raise OSError(errno.ENOENT, "No such file or directory")
-def iterate_packages(mirror, architecture, distribution="sid", section="main"):
+
+def iterate_packages(
+ mirror: str,
+ architecture: str,
+ distribution: str = "sid",
+ section: str = "main",
+) -> typing.Iterator[debian.deb822.Packages]:
"""Download the relevant binary package list and generate
debian.deb822.Packages objects per listed package."""
url = "%s/dists/%s/%s/binary-%s/Packages" % \