import itertools import typing from .compression import Decompressor class HashlibLike(typing.Protocol): def copy(self) -> "HashlibLike": ... def digest(self) -> typing.Optional[bytes]: ... def hexdigest(self) -> typing.Optional[str]: ... @property def name(self) -> str: ... def update(self, data: bytes) -> None: ... class HashBlacklist: """Turn a hashlib-like object into a hash that returns None for some blacklisted hashes instead of the real hash value. We only work with hexdigests here, so diget() disappears. The methods copy and update as well as the name attribute keep working as expected. """ def __init__( self, hashobj: HashlibLike, blacklist: typing.Container[str] = () ) -> None: """ @param hashobj: a hashlib-like object @param blacklist: an object providing __contains__. hexdigest values which are contained in the blacklist are turned into None values """ self.hashobj = hashobj self.blacklist = blacklist self.update = self.hashobj.update @property def name(self) -> str: return self.hashobj.name def hexdigest(self) -> typing.Optional[str]: digest = self.hashobj.hexdigest() if digest in self.blacklist: return None return digest def copy(self) -> "HashBlacklist": return HashBlacklist(self.hashobj.copy(), self.blacklist) class HashBlacklistContent: """Turn a hashlib-like object into a hash that returns None for some blacklisted content instead of the real hash value. Unlike HashBlacklist, not the output of the hash is considered, but its input.""" def __init__( self, hashobj: HashlibLike, blacklist: typing.Collection[bytes] = (), maxlen: typing.Optional[int] = None, ) -> None: """ @param hashobj: a hashlib-like object @param blacklist: an object providing __contains__. hash inputs which are contained in the blacklist are turned into None values @param maxlen: the maximum length of a blacklisted input. Defaults to max(map(len, blacklist)), so if it is absent, the blacklist must support iteration. """ self.hashobj = hashobj self.blacklist = blacklist if maxlen is None: # the chain avoids passing the empty sequence to max maxlen = max(itertools.chain((0,), map(len, blacklist))) self.maxlen = maxlen self.stored: typing.Optional[bytes] = b"" @property def name(self) -> str: return self.hashobj.name def update(self, data: bytes) -> None: if self.stored is not None: self.stored += data if len(self.stored) > self.maxlen: self.stored = None self.hashobj.update(data) def digest(self) -> typing.Optional[bytes]: if self.stored is not None and self.stored in self.blacklist: return None return self.hashobj.digest() def hexdigest(self) -> typing.Optional[str]: if self.stored is not None and self.stored in self.blacklist: return None return self.hashobj.hexdigest() def copy(self) -> "HashBlacklistContent": new = HashBlacklistContent(self.hashobj.copy(), self.blacklist, self.maxlen) new.stored = self.stored return new class DecompressedHash: """Apply a decompression function before the hash. This class provides the hashlib interface (update, hexdigest, copy) excluding digest and name.""" def __init__( self, decompressor: Decompressor, hashobj: HashlibLike, name: str = "unnamed", ): """ @param decompressor: a decompression object like bz2.BZ2Decompressor or lzma.LZMADecompressor. It has to provide methods decompress and copy as well as an unused_data attribute. It may provide a flush method. @param hashobj: a hashlib-like obj providing methods update, hexdigest and copy @param name: initialized the name property """ self.decompressor = decompressor self.hashobj = hashobj self.name = name def update(self, data: bytes) -> None: self.hashobj.update(self.decompressor.decompress(data)) def _finalize_hashobj(self) -> HashlibLike: if not hasattr(self.decompressor, "flush"): if self.decompressor.unused_data: raise ValueError("decompressor did not consume all data") return self.hashobj tmpdecomp = self.decompressor.copy() data = tmpdecomp.flush() if tmpdecomp.unused_data: raise ValueError("decompressor did not consume all data") tmphash = self.hashobj.copy() tmphash.update(data) return tmphash def digest(self) -> typing.Optional[bytes]: return self._finalize_hashobj().digest() def hexdigest(self) -> typing.Optional[str]: return self._finalize_hashobj().hexdigest() def copy(self) -> "DecompressedHash": return DecompressedHash(self.decompressor.copy(), self.hashobj.copy(), self.name) class SuppressingHash: """A hash that silences exceptions from the update and hexdigest methods of a hashlib-like object. If an exception has occurred, hexdigest always returns None.""" def __init__(self, hashobj: HashlibLike, exceptions) -> None: """ @param hashobj: a hashlib-like object providing methods update, copy and hexdigest. If a name attribute is present, it is mirrored as well. @type exceptions: tuple @param exceptions: exception classes to be suppressed """ self.hashobj: typing.Optional[HashlibLike] = hashobj self.exceptions = exceptions if hasattr(hashobj, "name"): self.name = hashobj.name def update(self, data: bytes) -> None: if self.hashobj: try: self.hashobj.update(data) except self.exceptions: self.hashobj = None def digest(self) -> typing.Optional[bytes]: if self.hashobj: try: return self.hashobj.digest() except self.exceptions: self.hashobj is None return None def hexdigest(self) -> typing.Optional[str]: if self.hashobj: try: return self.hashobj.hexdigest() except self.exceptions: self.hashobj = None return None def copy(self) -> "SuppressingHash": if self.hashobj: return SuppressingHash(self.hashobj.copy(), self.exceptions) ret = SuppressingHash(None, self.exceptions) if hasattr(self, "name"): ret.name = self.name return ret def hash_file( hashobj: HashlibLike, filelike: typing.BinaryIO, blocksize: int = 65536 ) -> None: """Feed the entire contents from the given filelike to the given hashobj. @param hashobj: hashlib-like object providing an update method @param filelike: file-like object providing read(size) """ data = filelike.read(blocksize) while data: hashobj.update(data) data = filelike.read(blocksize) class HashedStream: """A file-like object, that supports sequential reading and hashes the contents on the fly.""" def __init__( self, filelike: typing.BinaryIO, hashobj: HashlibLike ) -> None: """ @param filelike: a file-like object, that must support the read method @param hashobj: a hashlib-like object providing update and hexdigest """ self.filelike = filelike self.hashobj = hashobj def read(self, length: int) -> bytes: data = self.filelike.read(length) self.hashobj.update(data) return data def hexdigest(self) -> typing.Optional[str]: return self.hashobj.hexdigest() def validate(self, hexdigest: str) -> None: """Soak up any remaining input and validate the read data using the given hexdigest. @raises ValueError: when the hash does not match """ while self.read(65536): pass if self.hexdigest() != hexdigest: raise ValueError("hash sum mismatch")