1 # SPDX-License-Identifier: GPL-2.0+
5 ByteIterable = typing.Iterable[bytes]
7 def yield_lines(iterable: ByteIterable) -> ByteIterable:
8 """Converts an arbitrary bytes iterable into an iterable that yields whole
9 lines. The final byte of each returned value (except possibly the last one)
10 is a newline or carriage return character. The concatenation of the input
11 iterable equals the concatenation of the output iterable."""
15 parts = buff.splitlines(True)
21 def decompress_stream(iterable: ByteIterable, decompressor) -> ByteIterable:
22 """Decompress an iterable of bytes using the given decompressor into
23 another (decompressed) iterable of bytes. The decompressor can be a
24 bz2.BZ2Decompressor or lzma.LZMADecompressor instance."""
26 data = decompressor.decompress(data)
28 if hasattr(decompressor, "flush"):
29 yield decompressor.flush()
31 def yield_chunks(filelike, chunksize=65536) -> ByteIterable:
32 """Read the given file in chunks of the given size. Returns an itrable
33 of contents. If the file is binary, it yields bytes, otherwise str."""
35 data = filelike.read(chunksize)