diff options
Diffstat (limited to 'dedup/compression.py')
-rw-r--r-- | dedup/compression.py | 60 |
1 files changed, 41 insertions, 19 deletions
diff --git a/dedup/compression.py b/dedup/compression.py index da6e9a0..2e9869c 100644 --- a/dedup/compression.py +++ b/dedup/compression.py @@ -1,20 +1,38 @@ import bz2 import struct +import typing import zlib import lzma + +class Decompressor(typing.Protocol): + def copy(self) -> "Decompressor": + ... + + def decompress(self, data: bytes) -> bytes: + ... + + def flush(self) -> bytes: + ... + + @property + def unused_data(self) -> bytes: + ... + + class GzipDecompressor: """An interface to gzip which is similar to bz2.BZ2Decompressor and lzma.LZMADecompressor.""" - def __init__(self): + + def __init__(self) -> None: self.sawheader = False self.inbuffer = b"" - self.decompressor = None + self.decompressor: typing.Optional[Decompressor] = None self.crc = 0 self.size = 0 - def decompress(self, data): + def decompress(self, data: bytes) -> bytes: """ @raises ValueError: if no gzip magic is found @raises zlib.error: from zlib invocations @@ -57,7 +75,7 @@ class GzipDecompressor: self.decompressor = zlib.decompressobj(-zlib.MAX_WBITS) @property - def unused_data(self): + def unused_data(self) -> bytes: if self.decompressor: return self.decompressor.unused_data elif not self.sawheader: @@ -69,7 +87,7 @@ class GzipDecompressor: return b"" return self.inbuffer - def flush(self): + def flush(self) -> bytes: """ @raises zlib.error: from zlib invocations """ @@ -77,7 +95,7 @@ class GzipDecompressor: return b"" return self.decompressor.flush() - def copy(self): + def copy(self) -> "GzipDecompressor": new = GzipDecompressor() new.inbuffer = self.inbuffer if self.decompressor: @@ -92,20 +110,25 @@ class DecompressedStream: read(optional length), tell, seek(forward only) and close.""" blocksize = 65536 - def __init__(self, fileobj, decompressor): + def __init__( + self, fileobj: typing.BinaryIO, decompressor: Decompressor + ) -> None: """ @param fileobj: a file-like object providing read(size) @param decompressor: a bz2.BZ2Decompressor or lzma.LZMADecompressor like object providing methods decompress and flush and an attribute unused_data """ - self.fileobj = fileobj - self.decompressor = decompressor + self.fileobj: typing.Optional[typing.BinaryIO] = fileobj + self.decompressor: typing.Optional[Decompressor] = decompressor self.buff = bytearray() self.pos = 0 - def _fill_buff_until(self, predicate): + def _fill_buff_until( + self, predicate: typing.Callable[[bytes], bool] + ) -> None: assert self.fileobj is not None + assert self.decompressor is not None while not predicate(self.buff): data = self.fileobj.read(self.blocksize) if data: @@ -115,13 +138,13 @@ class DecompressedStream: self.buff += self.decompressor.flush() break - def _read_from_buff(self, length): + def _read_from_buff(self, length: int) -> bytes: ret = bytes(self.buff[:length]) self.buff[:length] = b"" self.pos += length return ret - def read(self, length=None): + def read(self, length: typing.Optional[int] = None) -> bytes: if length is None: self._fill_buff_until(lambda _: False) length = len(self.buff) @@ -129,7 +152,7 @@ class DecompressedStream: self._fill_buff_until(lambda b, l=length: len(b) >= l) return self._read_from_buff(length) - def readline(self): + def readline(self) -> bytes: self._fill_buff_until(lambda b: b'\n' in b) try: length = self.buff.index(b'\n') + 1 @@ -137,14 +160,14 @@ class DecompressedStream: length = len(self.buff) return self._read_from_buff(length) - def __iter__(self): + def __iter__(self) -> typing.Iterator[bytes]: return iter(self.readline, b'') - def tell(self): + def tell(self) -> int: assert self.fileobj is not None return self.pos - def seek(self, pos): + def seek(self, pos: int) -> None: """Forward seeks by absolute position only.""" assert self.fileobj is not None if pos < self.pos: @@ -159,7 +182,7 @@ class DecompressedStream: self.read(left) return - def close(self): + def close(self) -> None: if self.fileobj is not None: self.fileobj.close() self.fileobj = None @@ -173,13 +196,12 @@ decompressors = { '.xz': lzma.LZMADecompressor, } -def decompress(filelike, extension): +def decompress(filelike: typing.BinaryIO, extension: str) -> typing.BinaryIO: """Decompress a stream according to its extension. @param filelike: is a read-only byte-stream. It must support read(size) and close(). @param extension: permitted values are "", ".gz", ".bz2", ".lzma", and ".xz" - @type extension: unicode @returns: a read-only byte-stream with the decompressed contents of the original filelike. It supports read(size) and close(). If the original supports seek(pos) and tell(), then it also supports |