summaryrefslogtreecommitdiff
path: root/dedup/compression.py
diff options
context:
space:
mode:
Diffstat (limited to 'dedup/compression.py')
-rw-r--r--dedup/compression.py60
1 files changed, 41 insertions, 19 deletions
diff --git a/dedup/compression.py b/dedup/compression.py
index da6e9a0..2e9869c 100644
--- a/dedup/compression.py
+++ b/dedup/compression.py
@@ -1,20 +1,38 @@
import bz2
import struct
+import typing
import zlib
import lzma
+
+class Decompressor(typing.Protocol):
+ def copy(self) -> "Decompressor":
+ ...
+
+ def decompress(self, data: bytes) -> bytes:
+ ...
+
+ def flush(self) -> bytes:
+ ...
+
+ @property
+ def unused_data(self) -> bytes:
+ ...
+
+
class GzipDecompressor:
"""An interface to gzip which is similar to bz2.BZ2Decompressor and
lzma.LZMADecompressor."""
- def __init__(self):
+
+ def __init__(self) -> None:
self.sawheader = False
self.inbuffer = b""
- self.decompressor = None
+ self.decompressor: typing.Optional[Decompressor] = None
self.crc = 0
self.size = 0
- def decompress(self, data):
+ def decompress(self, data: bytes) -> bytes:
"""
@raises ValueError: if no gzip magic is found
@raises zlib.error: from zlib invocations
@@ -57,7 +75,7 @@ class GzipDecompressor:
self.decompressor = zlib.decompressobj(-zlib.MAX_WBITS)
@property
- def unused_data(self):
+ def unused_data(self) -> bytes:
if self.decompressor:
return self.decompressor.unused_data
elif not self.sawheader:
@@ -69,7 +87,7 @@ class GzipDecompressor:
return b""
return self.inbuffer
- def flush(self):
+ def flush(self) -> bytes:
"""
@raises zlib.error: from zlib invocations
"""
@@ -77,7 +95,7 @@ class GzipDecompressor:
return b""
return self.decompressor.flush()
- def copy(self):
+ def copy(self) -> "GzipDecompressor":
new = GzipDecompressor()
new.inbuffer = self.inbuffer
if self.decompressor:
@@ -92,20 +110,25 @@ class DecompressedStream:
read(optional length), tell, seek(forward only) and close."""
blocksize = 65536
- def __init__(self, fileobj, decompressor):
+ def __init__(
+ self, fileobj: typing.BinaryIO, decompressor: Decompressor
+ ) -> None:
"""
@param fileobj: a file-like object providing read(size)
@param decompressor: a bz2.BZ2Decompressor or lzma.LZMADecompressor
like object providing methods decompress and flush and an
attribute unused_data
"""
- self.fileobj = fileobj
- self.decompressor = decompressor
+ self.fileobj: typing.Optional[typing.BinaryIO] = fileobj
+ self.decompressor: typing.Optional[Decompressor] = decompressor
self.buff = bytearray()
self.pos = 0
- def _fill_buff_until(self, predicate):
+ def _fill_buff_until(
+ self, predicate: typing.Callable[[bytes], bool]
+ ) -> None:
assert self.fileobj is not None
+ assert self.decompressor is not None
while not predicate(self.buff):
data = self.fileobj.read(self.blocksize)
if data:
@@ -115,13 +138,13 @@ class DecompressedStream:
self.buff += self.decompressor.flush()
break
- def _read_from_buff(self, length):
+ def _read_from_buff(self, length: int) -> bytes:
ret = bytes(self.buff[:length])
self.buff[:length] = b""
self.pos += length
return ret
- def read(self, length=None):
+ def read(self, length: typing.Optional[int] = None) -> bytes:
if length is None:
self._fill_buff_until(lambda _: False)
length = len(self.buff)
@@ -129,7 +152,7 @@ class DecompressedStream:
self._fill_buff_until(lambda b, l=length: len(b) >= l)
return self._read_from_buff(length)
- def readline(self):
+ def readline(self) -> bytes:
self._fill_buff_until(lambda b: b'\n' in b)
try:
length = self.buff.index(b'\n') + 1
@@ -137,14 +160,14 @@ class DecompressedStream:
length = len(self.buff)
return self._read_from_buff(length)
- def __iter__(self):
+ def __iter__(self) -> typing.Iterator[bytes]:
return iter(self.readline, b'')
- def tell(self):
+ def tell(self) -> int:
assert self.fileobj is not None
return self.pos
- def seek(self, pos):
+ def seek(self, pos: int) -> None:
"""Forward seeks by absolute position only."""
assert self.fileobj is not None
if pos < self.pos:
@@ -159,7 +182,7 @@ class DecompressedStream:
self.read(left)
return
- def close(self):
+ def close(self) -> None:
if self.fileobj is not None:
self.fileobj.close()
self.fileobj = None
@@ -173,13 +196,12 @@ decompressors = {
'.xz': lzma.LZMADecompressor,
}
-def decompress(filelike, extension):
+def decompress(filelike: typing.BinaryIO, extension: str) -> typing.BinaryIO:
"""Decompress a stream according to its extension.
@param filelike: is a read-only byte-stream. It must support read(size) and
close().
@param extension: permitted values are "", ".gz", ".bz2", ".lzma", and
".xz"
- @type extension: unicode
@returns: a read-only byte-stream with the decompressed contents of the
original filelike. It supports read(size) and close(). If the
original supports seek(pos) and tell(), then it also supports