import struct import zlib import re class GzipDecompressor(object): """An interface to gzip which is similar to bz2.BZ2Decompressor and lzma.LZMADecompressor.""" def __init__(self): self.sawheader = False self.inbuffer = b"" self.decompressor = None self.crc = 0 self.size = 0 def decompress(self, data): """ @raises ValueError: if no gzip magic is found @raises zlib.error: from zlib invocations """ while True: if self.decompressor: data = self.decompressor.decompress(data) self.crc = zlib.crc32(data, self.crc) self.size += len(data) unused_data = self.decompressor.unused_data if not unused_data: return data self.decompressor = None return data + self.decompress(unused_data) self.inbuffer += data skip = 10 if len(self.inbuffer) < skip: return b"" if not self.inbuffer.startswith(b"\037\213\010"): raise ValueError("gzip magic not found") flag = ord(self.inbuffer[3]) if flag & 4: if len(self.inbuffer) < skip + 2: return b"" length, = struct.unpack("= length: ret = self.buff[:length] self.buff = self.buff[length:] return ret elif not data: # read EOF in last iteration ret = self.buff self.buff = b"" return ret data = self.fileobj.read(self.blocksize) if data: self.buff += self.decompressor.decompress(data) else: self.buff += self.decompressor.flush() class SpaceCompressor(object): """Not actually a compresor. It squeezes spaces.""" spacerc = re.compile(r"\s+") unused_data = "" def __init__(self): self.lastspace = False def decompress(self, data): data = self.spacerc.sub(" ", data) newlastspace = data[-1:] == " " if self.lastspace and data[0:1] == " ": data = data[1:] self.lastspace = newlastspace return data def flush(self): return "" def copy(self): new = SpaceCompressor() new.lastspace = self.lastspace return new