diff options
Diffstat (limited to 'dedup/hashing.py')
-rw-r--r-- | dedup/hashing.py | 115 |
1 files changed, 87 insertions, 28 deletions
diff --git a/dedup/hashing.py b/dedup/hashing.py index 9cebcbb..21bbed2 100644 --- a/dedup/hashing.py +++ b/dedup/hashing.py @@ -1,4 +1,26 @@ import itertools +import typing + +from .compression import Decompressor + + +class HashlibLike(typing.Protocol): + def copy(self) -> "HashlibLike": + ... + + def digest(self) -> typing.Optional[bytes]: + ... + + def hexdigest(self) -> typing.Optional[str]: + ... + + @property + def name(self) -> str: + ... + + def update(self, data: bytes) -> None: + ... + class HashBlacklist: """Turn a hashlib-like object into a hash that returns None for some @@ -7,7 +29,10 @@ class HashBlacklist: We only work with hexdigests here, so diget() disappears. The methods copy and update as well as the name attribute keep working as expected. """ - def __init__(self, hashobj, blacklist=()): + + def __init__( + self, hashobj: HashlibLike, blacklist: typing.Container[str] = () + ) -> None: """ @param hashobj: a hashlib-like object @param blacklist: an object providing __contains__. @@ -19,16 +44,16 @@ class HashBlacklist: self.update = self.hashobj.update @property - def name(self): + def name(self) -> str: return self.hashobj.name - def hexdigest(self): + def hexdigest(self) -> typing.Optional[str]: digest = self.hashobj.hexdigest() if digest in self.blacklist: return None return digest - def copy(self): + def copy(self) -> "HashBlacklist": return HashBlacklist(self.hashobj.copy(), self.blacklist) class HashBlacklistContent: @@ -36,7 +61,12 @@ class HashBlacklistContent: blacklisted content instead of the real hash value. Unlike HashBlacklist, not the output of the hash is considered, but its input.""" - def __init__(self, hashobj, blacklist=(), maxlen=None): + def __init__( + self, + hashobj: HashlibLike, + blacklist: typing.Collection[bytes] = (), + maxlen: typing.Optional[int] = None, + ) -> None: """ @param hashobj: a hashlib-like object @param blacklist: an object providing __contains__. @@ -52,30 +82,30 @@ class HashBlacklistContent: # the chain avoids passing the empty sequence to max maxlen = max(itertools.chain((0,), map(len, blacklist))) self.maxlen = maxlen - self.stored = b"" + self.stored: typing.Optional[bytes] = b"" @property - def name(self): + def name(self) -> str: return self.hashobj.name - def update(self, data): + def update(self, data: bytes) -> None: if self.stored is not None: self.stored += data if len(self.stored) > self.maxlen: self.stored = None self.hashobj.update(data) - def digest(self): + def digest(self) -> typing.Optional[bytes]: if self.stored is not None and self.stored in self.blacklist: return None return self.hashobj.digest() - def hexdigest(self): + def hexdigest(self) -> typing.Optional[str]: if self.stored is not None and self.stored in self.blacklist: return None return self.hashobj.hexdigest() - def copy(self): + def copy(self) -> "HashBlacklistContent": new = HashBlacklistContent(self.hashobj.copy(), self.blacklist, self.maxlen) new.stored = self.stored @@ -84,7 +114,13 @@ class HashBlacklistContent: class DecompressedHash: """Apply a decompression function before the hash. This class provides the hashlib interface (update, hexdigest, copy) excluding digest and name.""" - def __init__(self, decompressor, hashobj, name="unnamed"): + + def __init__( + self, + decompressor: Decompressor, + hashobj: HashlibLike, + name: str = "unnamed", + ): """ @param decompressor: a decompression object like bz2.BZ2Decompressor or lzma.LZMADecompressor. It has to provide methods decompress and @@ -98,23 +134,29 @@ class DecompressedHash: self.hashobj = hashobj self.name = name - def update(self, data): + def update(self, data: bytes) -> None: self.hashobj.update(self.decompressor.decompress(data)) - def hexdigest(self): + def _finalize_hashobj(self) -> HashlibLike: if not hasattr(self.decompressor, "flush"): if self.decompressor.unused_data: raise ValueError("decompressor did not consume all data") - return self.hashobj.hexdigest() + return self.hashobj tmpdecomp = self.decompressor.copy() data = tmpdecomp.flush() if tmpdecomp.unused_data: raise ValueError("decompressor did not consume all data") tmphash = self.hashobj.copy() tmphash.update(data) - return tmphash.hexdigest() + return tmphash - def copy(self): + def digest(self) -> typing.Optional[bytes]: + return self._finalize_hashobj().digest() + + def hexdigest(self) -> typing.Optional[str]: + return self._finalize_hashobj().hexdigest() + + def copy(self) -> "DecompressedHash": return DecompressedHash(self.decompressor.copy(), self.hashobj.copy(), self.name) @@ -122,7 +164,8 @@ class SuppressingHash: """A hash that silences exceptions from the update and hexdigest methods of a hashlib-like object. If an exception has occurred, hexdigest always returns None.""" - def __init__(self, hashobj, exceptions=()): + + def __init__(self, hashobj: HashlibLike, exceptions) -> None: """ @param hashobj: a hashlib-like object providing methods update, copy and hexdigest. If a name attribute is present, it is mirrored as @@ -130,19 +173,27 @@ class SuppressingHash: @type exceptions: tuple @param exceptions: exception classes to be suppressed """ - self.hashobj = hashobj + self.hashobj: typing.Optional[HashlibLike] = hashobj self.exceptions = exceptions if hasattr(hashobj, "name"): self.name = hashobj.name - def update(self, data): + def update(self, data: bytes) -> None: if self.hashobj: try: self.hashobj.update(data) except self.exceptions: self.hashobj = None - def hexdigest(self): + def digest(self) -> typing.Optional[bytes]: + if self.hashobj: + try: + return self.hashobj.digest() + except self.exceptions: + self.hashobj is None + return None + + def hexdigest(self) -> typing.Optional[str]: if self.hashobj: try: return self.hashobj.hexdigest() @@ -150,12 +201,18 @@ class SuppressingHash: self.hashobj = None return None - def copy(self): + def copy(self) -> "SuppressingHash": if self.hashobj: return SuppressingHash(self.hashobj.copy(), self.exceptions) - return SuppressingHash(None, self.exceptions) + ret = SuppressingHash(None, self.exceptions) + if hasattr(self, "name"): + ret.name = self.name + return ret + -def hash_file(hashobj, filelike, blocksize=65536): +def hash_file( + hashobj: HashlibLike, filelike: typing.BinaryIO, blocksize: int = 65536 +) -> None: """Feed the entire contents from the given filelike to the given hashobj. @param hashobj: hashlib-like object providing an update method @param filelike: file-like object providing read(size) @@ -168,7 +225,9 @@ def hash_file(hashobj, filelike, blocksize=65536): class HashedStream: """A file-like object, that supports sequential reading and hashes the contents on the fly.""" - def __init__(self, filelike, hashobj): + def __init__( + self, filelike: typing.BinaryIO, hashobj: HashlibLike + ) -> None: """ @param filelike: a file-like object, that must support the read method @param hashobj: a hashlib-like object providing update and hexdigest @@ -176,15 +235,15 @@ class HashedStream: self.filelike = filelike self.hashobj = hashobj - def read(self, length): + def read(self, length: int) -> bytes: data = self.filelike.read(length) self.hashobj.update(data) return data - def hexdigest(self): + def hexdigest(self) -> typing.Optional[str]: return self.hashobj.hexdigest() - def validate(self, hexdigest): + def validate(self, hexdigest: str) -> None: """Soak up any remaining input and validate the read data using the given hexdigest. @raises ValueError: when the hash does not match |