Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/handlers/compression/_gzip_reader.py: 100%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

30 statements  

1import gzip 

2import zlib 

3 

4from ...file_utils import DEFAULT_BUFSIZE 

5 

6# pyright: reportAttributeAccessIssue=false 

7 

8 

9class SingleMemberGzipReader(gzip._GzipReader): # noqa: SLF001 

10 def read_header(self): 

11 self._init_read() 

12 return self._read_gzip_header() 

13 

14 def _add_read_data(self, data): 

15 self._crc = zlib.crc32(data, self._crc) 

16 self._stream_size = self._stream_size + len(data) 

17 

18 def read_all(self): 

19 uncompress = b"" 

20 

21 while True: 

22 buf = self._fp.read(DEFAULT_BUFSIZE) 

23 

24 uncompress = self._decompressor.decompress(buf, DEFAULT_BUFSIZE) 

25 if hasattr(self._decompressor, "unconsumed_tail"): 

26 self._fp.prepend(self._decompressor.unconsumed_tail) 

27 self._fp.prepend(self._decompressor.unused_data) 

28 

29 if uncompress != b"": 

30 break 

31 if buf == b"": 

32 raise EOFError( 

33 "Compressed file ended before the end-of-stream marker was reached" 

34 ) 

35 

36 self._add_read_data(uncompress) 

37 

38 return uncompress 

39 

40 def read_until_eof(self): 

41 while not self._decompressor.eof: 

42 self.read_all() 

43 

44 @property 

45 def unused_data(self): 

46 return self._decompressor.unused_data