1import gzip
2import zlib
3
4from ...file_utils import DEFAULT_BUFSIZE
5
6# pyright: reportAttributeAccessIssue=false
7
8
9class SingleMemberGzipReader(gzip._GzipReader): # noqa: SLF001
10 def read_header(self):
11 self._init_read()
12 return self._read_gzip_header()
13
14 def _add_read_data(self, data):
15 self._crc = zlib.crc32(data, self._crc)
16 self._stream_size = self._stream_size + len(data)
17
18 def read_all(self):
19 uncompress = b""
20
21 while True:
22 buf = self._fp.read(DEFAULT_BUFSIZE)
23
24 uncompress = self._decompressor.decompress(buf, DEFAULT_BUFSIZE)
25 if hasattr(self._decompressor, "unconsumed_tail"):
26 self._fp.prepend(self._decompressor.unconsumed_tail)
27 self._fp.prepend(self._decompressor.unused_data)
28
29 if uncompress != b"":
30 break
31 if buf == b"":
32 raise EOFError(
33 "Compressed file ended before the end-of-stream marker was reached"
34 )
35
36 self._add_read_data(uncompress)
37
38 return uncompress
39
40 def read_until_eof(self):
41 while not self._decompressor.eof:
42 self.read_all()
43
44 @property
45 def unused_data(self):
46 return self._decompressor.unused_data