1import io
2from pathlib import Path
3
4from lzallright import LZOCompressor
5
6from unblob.file_utils import (
7 Endian,
8 FileSystem,
9 InvalidInputFormat,
10 StructParser,
11)
12from unblob.handlers.compression._ucl import UCLDecompressor
13from unblob.models import (
14 Extractor,
15 ExtractResult,
16 File,
17 HandlerDoc,
18 HandlerType,
19 HexString,
20 Reference,
21 StructHandler,
22 ValidChunk,
23)
24
25QNX_C_DEFINITION = """
26 struct filehdr {
27 char signature[8];
28 int32 usize; // Uncompressed size of the file
29 uint16 blksize; // Size of compression blocks
30 uint8 cmptype; // Type of compression (CMP_LZO, ...)
31 uint8 flags;
32 } filehdr_t;
33
34 struct cmphdr {
35 uint16 prev; // Offset to previous hdr
36 uint16 next; // Offset to next hdr
37 uint16 pusize; // Size of prev uncompressed blk
38 uint16 usize; // Size of this uncompressed blk
39 } cmphdr_t;
40
41 struct cmpblk {
42 struct cmphdr hdr;
43 uint8_t buf[32*1024];
44 } cmpblk_t;
45"""
46SUPPORTED_BLOCK_SIZES = [2**i for i in range(12, 16)]
47
48
49class QNXDeflateExtractor(Extractor):
50 def __init__(self):
51 self._struct_parser = StructParser(QNX_C_DEFINITION)
52
53 def extract(self, inpath: Path, outdir: Path) -> ExtractResult:
54 fs = FileSystem(outdir)
55 with File.from_path(inpath) as file:
56 fhdr = self._struct_parser.cparser_le.filehdr_t(file)
57 decompressor = LZOCompressor() if fhdr.cmptype == 0 else UCLDecompressor()
58 with fs.open(Path(f"{inpath.stem}.uncompressed")) as outfile:
59 cmphdr = self._struct_parser.cparser_le.cmphdr_t(file)
60 while cmphdr.next != 0:
61 compressed_part = file.read(cmphdr.next - 8)
62 outfile.write(decompressor.decompress(compressed_part))
63 cmphdr = self._struct_parser.cparser_le.cmphdr_t(file)
64 return ExtractResult(reports=fs.problems)
65
66
67class QNXDeflateHandler(StructHandler):
68 NAME = "qnx_deflate"
69 C_DEFINITIONS = QNX_C_DEFINITION
70 HEADER_STRUCT = "filehdr_t"
71 PATTERNS = [
72 HexString("69 77 6c 79 66 6d 62 70 [6] (00|01)") # iwlyfmbp
73 ]
74 EXTRACTOR = QNXDeflateExtractor()
75 DOC = HandlerDoc(
76 name="QNX Deflate",
77 description="QNX deflate are compressed files using a block-based compression format with either LZO or UCL algorithm.",
78 handler_type=HandlerType.COMPRESSION,
79 vendor="Blackberry",
80 references=[
81 Reference(title="QNX wikipedia", url="https://en.wikipedia.org/wiki/QNX"),
82 ],
83 limitations=[],
84 )
85
86 def is_valid_header(self, header) -> bool:
87 return header.usize > 0 and header.blksize in SUPPORTED_BLOCK_SIZES
88
89 def calculate_chunk(self, file, start_offset) -> ValidChunk:
90 header = self.parse_header(file, Endian.LITTLE)
91 if not self.is_valid_header(header):
92 raise InvalidInputFormat("Invalid QNX header")
93
94 cmphdr = self.cparser_le.cmphdr_t(file)
95 while cmphdr.next != 0:
96 # return from 8 bytes because it parse the cmpheader
97 file.seek(cmphdr.next - 8, io.SEEK_CUR)
98 cmphdr = self.cparser_le.cmphdr_t(file)
99 end_offset = file.tell()
100
101 return ValidChunk(start_offset=start_offset, end_offset=end_offset)