Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/handlers/compression/qnx_deflate.py: 50%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

42 statements  

1import io 

2from pathlib import Path 

3 

4from lzallright import LZOCompressor 

5 

6from unblob.file_utils import ( 

7 Endian, 

8 FileSystem, 

9 InvalidInputFormat, 

10 StructParser, 

11) 

12from unblob.handlers.compression._ucl import UCLDecompressor 

13from unblob.models import ( 

14 Extractor, 

15 ExtractResult, 

16 File, 

17 HandlerDoc, 

18 HandlerType, 

19 HexString, 

20 Reference, 

21 StructHandler, 

22 ValidChunk, 

23) 

24 

25QNX_C_DEFINITION = """ 

26 struct filehdr { 

27 char signature[8]; 

28 int32 usize; // Uncompressed size of the file 

29 uint16 blksize; // Size of compression blocks 

30 uint8 cmptype; // Type of compression (CMP_LZO, ...) 

31 uint8 flags; 

32 } filehdr_t; 

33 

34 struct cmphdr { 

35 uint16 prev; // Offset to previous hdr 

36 uint16 next; // Offset to next hdr 

37 uint16 pusize; // Size of prev uncompressed blk 

38 uint16 usize; // Size of this uncompressed blk 

39 } cmphdr_t; 

40 

41 struct cmpblk { 

42 struct cmphdr hdr; 

43 uint8_t buf[32*1024]; 

44 } cmpblk_t; 

45""" 

46SUPPORTED_BLOCK_SIZES = [2**i for i in range(12, 16)] 

47 

48 

49class QNXDeflateExtractor(Extractor): 

50 def __init__(self): 

51 self._struct_parser = StructParser(QNX_C_DEFINITION) 

52 

53 def extract(self, inpath: Path, outdir: Path) -> ExtractResult: 

54 fs = FileSystem(outdir) 

55 with File.from_path(inpath) as file: 

56 fhdr = self._struct_parser.cparser_le.filehdr_t(file) 

57 decompressor = LZOCompressor() if fhdr.cmptype == 0 else UCLDecompressor() 

58 with fs.open(Path(f"{inpath.stem}.uncompressed")) as outfile: 

59 cmphdr = self._struct_parser.cparser_le.cmphdr_t(file) 

60 while cmphdr.next != 0: 

61 compressed_part = file.read(cmphdr.next - 8) 

62 outfile.write(decompressor.decompress(compressed_part)) 

63 cmphdr = self._struct_parser.cparser_le.cmphdr_t(file) 

64 return ExtractResult(reports=fs.problems) 

65 

66 

67class QNXDeflateHandler(StructHandler): 

68 NAME = "qnx_deflate" 

69 C_DEFINITIONS = QNX_C_DEFINITION 

70 HEADER_STRUCT = "filehdr_t" 

71 PATTERNS = [ 

72 HexString("69 77 6c 79 66 6d 62 70 [6] (00|01)") # iwlyfmbp 

73 ] 

74 EXTRACTOR = QNXDeflateExtractor() 

75 DOC = HandlerDoc( 

76 name="QNX Deflate", 

77 description="QNX deflate are compressed files using a block-based compression format with either LZO or UCL algorithm.", 

78 handler_type=HandlerType.COMPRESSION, 

79 vendor="Blackberry", 

80 references=[ 

81 Reference(title="QNX wikipedia", url="https://en.wikipedia.org/wiki/QNX"), 

82 ], 

83 limitations=[], 

84 ) 

85 

86 def is_valid_header(self, header) -> bool: 

87 return header.usize > 0 and header.blksize in SUPPORTED_BLOCK_SIZES 

88 

89 def calculate_chunk(self, file, start_offset) -> ValidChunk: 

90 header = self.parse_header(file, Endian.LITTLE) 

91 if not self.is_valid_header(header): 

92 raise InvalidInputFormat("Invalid QNX header") 

93 

94 cmphdr = self.cparser_le.cmphdr_t(file) 

95 while cmphdr.next != 0: 

96 # return from 8 bytes because it parse the cmpheader 

97 file.seek(cmphdr.next - 8, io.SEEK_CUR) 

98 cmphdr = self.cparser_le.cmphdr_t(file) 

99 end_offset = file.tell() 

100 

101 return ValidChunk(start_offset=start_offset, end_offset=end_offset)