Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/handlers/compression/zlib.py: 81%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

37 statements  

1import re 

2import zlib 

3from pathlib import Path 

4from typing import Optional 

5 

6from structlog import get_logger 

7 

8from unblob.handlers.archive.dmg import DMGHandler 

9 

10from ...file_utils import DEFAULT_BUFSIZE, InvalidInputFormat 

11from ...models import ( 

12 Extractor, 

13 File, 

14 Handler, 

15 HandlerDoc, 

16 HandlerType, 

17 Reference, 

18 Regex, 

19 ValidChunk, 

20) 

21 

22logger = get_logger() 

23 

24 

25class ZlibExtractor(Extractor): 

26 def extract(self, inpath: Path, outdir: Path): 

27 decompressor = zlib.decompressobj() 

28 outpath = outdir / "zlib.uncompressed" 

29 with File.from_path(inpath) as f, outpath.open("wb") as outfile: 

30 content = f.read(DEFAULT_BUFSIZE) 

31 while content and not decompressor.eof: 

32 outfile.write(decompressor.decompress(content)) 

33 content = f.read(DEFAULT_BUFSIZE) 

34 

35 

36class ZlibHandler(Handler): 

37 NAME = "zlib" 

38 

39 PATTERNS = [ 

40 Regex(r"^\x78\x01"), # low compression 

41 Regex(r"^\x78\x9c"), # default compression 

42 Regex(r"^\x78\xda"), # best compression 

43 Regex(r"^\x78\x5e"), # compressed 

44 ] 

45 

46 EXTRACTOR = ZlibExtractor() 

47 

48 DOC = HandlerDoc( 

49 name=NAME, 

50 description="The zlib format is a compressed data format based on the DEFLATE algorithm, often used for data compression in various applications. It includes a lightweight header and checksum for data integrity.", 

51 handler_type=HandlerType.COMPRESSION, 

52 vendor=None, 

53 references=[ 

54 Reference( 

55 title="zlib File Format Specification", 

56 url="https://www.zlib.net/manual.html", 

57 ), 

58 Reference( 

59 title="zlib Wikipedia", 

60 url="https://en.wikipedia.org/wiki/Zlib", 

61 ), 

62 ], 

63 limitations=[], 

64 ) 

65 

66 def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]: 

67 for pattern in DMGHandler.PATTERNS: 

68 if re.search(pattern.as_regex(), file[-512:]): 

69 raise InvalidInputFormat( 

70 "File is a DMG archive made of zlib streams. Aborting." 

71 ) 

72 

73 decompressor = zlib.decompressobj() 

74 

75 try: 

76 content = file.read(DEFAULT_BUFSIZE) 

77 while content and not decompressor.eof: 

78 decompressor.decompress(content) 

79 content = file.read(DEFAULT_BUFSIZE) 

80 

81 except zlib.error: 

82 raise InvalidInputFormat("invalid zlib stream") from None 

83 

84 end_offset = file.tell() - len(decompressor.unused_data) 

85 

86 return ValidChunk( 

87 start_offset=start_offset, 

88 end_offset=end_offset, 

89 )