1import io
2from pathlib import Path
3
4from lzallright import LZOCompressor
5
6from unblob.file_utils import (
7 Endian,
8 FileSystem,
9 InvalidInputFormat,
10 StructParser,
11)
12from unblob.handlers.compression._ucl import UCLDecompressor
13from unblob.models import (
14 Extractor,
15 ExtractResult,
16 File,
17 HandlerDoc,
18 HandlerType,
19 HexString,
20 Reference,
21 StructHandler,
22 ValidChunk,
23)
24
25QNX_C_DEFINITION = """
26 struct filehdr {
27 char signature[8];
28 int32 usize; // Uncompressed size of the file
29 uint16 blksize; // Size of compression blocks
30 uint8 cmptype; // Type of compression (CMP_LZO, ...)
31 uint8 flags;
32 } filehdr_t;
33
34 struct cmphdr {
35 uint16 prev; // Offset to previous hdr
36 uint16 next; // Offset to next hdr
37 uint16 pusize; // Size of prev uncompressed blk
38 uint16 usize; // Size of this uncompressed blk
39 } cmphdr_t;
40
41 struct cmpblk {
42 struct cmphdr hdr;
43 uint8_t buf[32*1024];
44 } cmpblk_t;
45"""
46SUPPORTED_BLOCK_SIZES = [2**i for i in range(12, 16)]
47
48
49class QNXDeflateExtractor(Extractor):
50 def __init__(self):
51 self._struct_parser = StructParser(QNX_C_DEFINITION)
52
53 def extract(self, inpath: Path, outdir: Path) -> ExtractResult:
54 fs = FileSystem(outdir)
55 with File.from_path(inpath) as file:
56 fhdr = self._struct_parser.cparser_le.filehdr_t(file)
57 decompressor = LZOCompressor() if fhdr.cmptype == 0 else UCLDecompressor()
58 with fs.open(Path(f"{inpath.stem}.uncompressed")) as outfile:
59 cmphdr = self._struct_parser.cparser_le.cmphdr_t(file)
60 while cmphdr.next != 0:
61 if cmphdr.next < 8:
62 raise InvalidInputFormat(
63 "QNX deflate block offset is smaller than its header."
64 )
65 compressed_part = file.read(cmphdr.next - 8)
66 outfile.write(decompressor.decompress(compressed_part))
67 cmphdr = self._struct_parser.cparser_le.cmphdr_t(file)
68 return ExtractResult(reports=fs.problems)
69
70
71class QNXDeflateHandler(StructHandler):
72 NAME = "qnx_deflate"
73 C_DEFINITIONS = QNX_C_DEFINITION
74 HEADER_STRUCT = "filehdr_t"
75 PATTERNS = [
76 HexString("69 77 6c 79 66 6d 62 70 [6] (00|01)") # iwlyfmbp
77 ]
78 EXTRACTOR = QNXDeflateExtractor()
79 DOC = HandlerDoc(
80 name="QNX Deflate",
81 description="QNX deflate are compressed files using a block-based compression format with either LZO or UCL algorithm.",
82 handler_type=HandlerType.COMPRESSION,
83 vendor="Blackberry",
84 references=[
85 Reference(title="QNX wikipedia", url="https://en.wikipedia.org/wiki/QNX"),
86 ],
87 limitations=[],
88 )
89
90 def is_valid_header(self, header) -> bool:
91 return header.usize > 0 and header.blksize in SUPPORTED_BLOCK_SIZES
92
93 def calculate_chunk(self, file, start_offset) -> ValidChunk:
94 header = self.parse_header(file, Endian.LITTLE)
95 if not self.is_valid_header(header):
96 raise InvalidInputFormat("Invalid QNX header")
97
98 cmphdr = self.cparser_le.cmphdr_t(file)
99 while cmphdr.next != 0:
100 if cmphdr.next < 8:
101 raise InvalidInputFormat(
102 "QNX deflate block offset is smaller than its header."
103 )
104 # return from 8 bytes because it parse the cmpheader
105 file.seek(cmphdr.next - 8, io.SEEK_CUR)
106 cmphdr = self.cparser_le.cmphdr_t(file)
107 end_offset = file.tell()
108
109 return ValidChunk(start_offset=start_offset, end_offset=end_offset)