Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/handlers/compression/qnx_deflate.py: 46%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

46 statements  

1import io 

2from pathlib import Path 

3 

4from lzallright import LZOCompressor 

5 

6from unblob.file_utils import ( 

7 Endian, 

8 FileSystem, 

9 InvalidInputFormat, 

10 StructParser, 

11) 

12from unblob.handlers.compression._ucl import UCLDecompressor 

13from unblob.models import ( 

14 Extractor, 

15 ExtractResult, 

16 File, 

17 HandlerDoc, 

18 HandlerType, 

19 HexString, 

20 Reference, 

21 StructHandler, 

22 ValidChunk, 

23) 

24 

25QNX_C_DEFINITION = """ 

26 struct filehdr { 

27 char signature[8]; 

28 int32 usize; // Uncompressed size of the file 

29 uint16 blksize; // Size of compression blocks 

30 uint8 cmptype; // Type of compression (CMP_LZO, ...) 

31 uint8 flags; 

32 } filehdr_t; 

33 

34 struct cmphdr { 

35 uint16 prev; // Offset to previous hdr 

36 uint16 next; // Offset to next hdr 

37 uint16 pusize; // Size of prev uncompressed blk 

38 uint16 usize; // Size of this uncompressed blk 

39 } cmphdr_t; 

40 

41 struct cmpblk { 

42 struct cmphdr hdr; 

43 uint8_t buf[32*1024]; 

44 } cmpblk_t; 

45""" 

46SUPPORTED_BLOCK_SIZES = [2**i for i in range(12, 16)] 

47 

48 

49class QNXDeflateExtractor(Extractor): 

50 def __init__(self): 

51 self._struct_parser = StructParser(QNX_C_DEFINITION) 

52 

53 def extract(self, inpath: Path, outdir: Path) -> ExtractResult: 

54 fs = FileSystem(outdir) 

55 with File.from_path(inpath) as file: 

56 fhdr = self._struct_parser.cparser_le.filehdr_t(file) 

57 decompressor = LZOCompressor() if fhdr.cmptype == 0 else UCLDecompressor() 

58 with fs.open(Path(f"{inpath.stem}.uncompressed")) as outfile: 

59 cmphdr = self._struct_parser.cparser_le.cmphdr_t(file) 

60 while cmphdr.next != 0: 

61 if cmphdr.next < 8: 

62 raise InvalidInputFormat( 

63 "QNX deflate block offset is smaller than its header." 

64 ) 

65 compressed_part = file.read(cmphdr.next - 8) 

66 outfile.write(decompressor.decompress(compressed_part)) 

67 cmphdr = self._struct_parser.cparser_le.cmphdr_t(file) 

68 return ExtractResult(reports=fs.problems) 

69 

70 

71class QNXDeflateHandler(StructHandler): 

72 NAME = "qnx_deflate" 

73 C_DEFINITIONS = QNX_C_DEFINITION 

74 HEADER_STRUCT = "filehdr_t" 

75 PATTERNS = [ 

76 HexString("69 77 6c 79 66 6d 62 70 [6] (00|01)") # iwlyfmbp 

77 ] 

78 EXTRACTOR = QNXDeflateExtractor() 

79 DOC = HandlerDoc( 

80 name="QNX Deflate", 

81 description="QNX deflate are compressed files using a block-based compression format with either LZO or UCL algorithm.", 

82 handler_type=HandlerType.COMPRESSION, 

83 vendor="Blackberry", 

84 references=[ 

85 Reference(title="QNX wikipedia", url="https://en.wikipedia.org/wiki/QNX"), 

86 ], 

87 limitations=[], 

88 ) 

89 

90 def is_valid_header(self, header) -> bool: 

91 return header.usize > 0 and header.blksize in SUPPORTED_BLOCK_SIZES 

92 

93 def calculate_chunk(self, file, start_offset) -> ValidChunk: 

94 header = self.parse_header(file, Endian.LITTLE) 

95 if not self.is_valid_header(header): 

96 raise InvalidInputFormat("Invalid QNX header") 

97 

98 cmphdr = self.cparser_le.cmphdr_t(file) 

99 while cmphdr.next != 0: 

100 if cmphdr.next < 8: 

101 raise InvalidInputFormat( 

102 "QNX deflate block offset is smaller than its header." 

103 ) 

104 # return from 8 bytes because it parse the cmpheader 

105 file.seek(cmphdr.next - 8, io.SEEK_CUR) 

106 cmphdr = self.cparser_le.cmphdr_t(file) 

107 end_offset = file.tell() 

108 

109 return ValidChunk(start_offset=start_offset, end_offset=end_offset)