1from typing import Optional
2
3import attrs
4from pyperscan import Flag, Pattern, Scan, StreamDatabase
5from structlog import get_logger
6
7from unblob.extractors import Command
8
9from ...file_utils import InvalidInputFormat, SeekError, StructParser, stream_scan
10from ...models import (
11 File,
12 Handler,
13 HandlerDoc,
14 HandlerType,
15 HexString,
16 Reference,
17 Regex,
18 ValidChunk,
19)
20
21logger = get_logger()
22
23C_DEFINITIONS = r"""
24 typedef struct stream_header {
25 char magic[2]; // 'BZ' signature/magic number
26 uint8 version; // 'h' 0x68 for Bzip2 ('H'uffman coding), '0' for Bzip1 (deprecated)
27 uint8 hundred_k_blocksize; // '1'..'9' block-size 100 kB-900 kB (uncompressed)
28 } stream_header_t;
29
30 typedef struct block_header {
31 char magic[6]; // 0x314159265359 (BCD (pi))
32 uint32 crc; // checksum for this block
33 uint8 randomised; // 0=>normal, 1=>randomised (deprecated)
34 } block_header_t;
35"""
36
37
38STREAM_MAGIC = b"BZ"
39HUFFMAN_VERSION = ord("h")
40HUNDRED_K_BLOCK_MIN = ord("1")
41HUNDRED_K_BLOCK_MAX = ord("9")
42
43# 0x314159265359 (BCD (pi))
44BLOCK_MAGIC = b"1AY&SY"
45
46# Stream ends with a magic 0x177245385090 though it is not aligned
47# to byte offsets, so we pre-calculated all possible 8 shifts
48# for bit_shift in range(8):
49# print(hex(0x1772_4538_5090 << bit_shift))
50STREAM_END_MAGIC_PATTERNS = [
51 HexString("17 72 45 38 50 90"),
52 HexString("2e e4 8a 70 a1 2?"),
53 HexString("5d c9 14 e1 42 4?"),
54 HexString("bb 92 29 c2 84 8?"),
55 HexString("?1 77 24 53 85 09"),
56 HexString("?2 ee 48 a7 0a 12"),
57 HexString("?5 dc 91 4e 14 24"),
58 HexString("?b b9 22 9c 28 48"),
59]
60
61# 6 bytes magic + 4 bytes combined CRC
62STREAM_FOOTER_SIZE = 6 + 4
63
64
65def build_stream_end_scan_db(pattern_list):
66 return StreamDatabase(
67 *(Pattern(p.as_regex(), Flag.SOM_LEFTMOST, Flag.DOTALL) for p in pattern_list)
68 )
69
70
71hyperscan_stream_end_magic_db = build_stream_end_scan_db(STREAM_END_MAGIC_PATTERNS)
72parser = StructParser(C_DEFINITIONS)
73
74
75@attrs.define
76class Bzip2SearchContext:
77 start_offset: int
78 file: File
79 end_block_offset: int
80
81
82def _validate_stream_header(file: File):
83 try:
84 header = parser.cparser_be.stream_header_t(file)
85 except EOFError:
86 return False
87
88 return (
89 header.magic == STREAM_MAGIC
90 and header.version == HUFFMAN_VERSION
91 and HUNDRED_K_BLOCK_MIN <= header.hundred_k_blocksize <= HUNDRED_K_BLOCK_MAX
92 )
93
94
95def _validate_block_header(file: File):
96 try:
97 header = parser.cparser_be.block_header_t(file)
98 except EOFError:
99 return False
100
101 return header.magic == BLOCK_MAGIC
102
103
104def _hyperscan_match(
105 context: Bzip2SearchContext, pattern_id: int, offset: int, end: int
106) -> Scan:
107 del end # unused argument
108 # Ignore any match before the start of this chunk
109 if offset < context.start_offset:
110 return Scan.Continue
111
112 last_block_end = offset + STREAM_FOOTER_SIZE
113 if pattern_id > 3:
114 last_block_end += 1
115
116 # We try seek to the end of the stream
117 try:
118 context.file.seek(last_block_end)
119 except SeekError:
120 return Scan.Terminate
121
122 context.end_block_offset = last_block_end
123
124 # Check if there is a next stream starting after the end of this stream
125 # and try to continue processing that as well
126 if _validate_stream_header(context.file) and _validate_block_header(context.file):
127 return Scan.Continue
128
129 return Scan.Terminate
130
131
132class BZip2Handler(Handler):
133 NAME = "bzip2"
134
135 # magic + version + block_size + block header magic
136 PATTERNS = [Regex(r"\x42\x5a\x68[\x31-\x39]\x31\x41\x59\x26\x53\x59")]
137
138 EXTRACTOR = Command("7z", "x", "-y", "{inpath}", "-so", stdout="bzip2.uncompressed")
139
140 DOC = HandlerDoc(
141 name=NAME,
142 description="The bzip2 format is a block-based compression format that uses the Burrows-Wheeler transform and Huffman coding for high compression efficiency. Each stream starts with a header and consists of one or more compressed blocks, ending with a footer containing a checksum.",
143 handler_type=HandlerType.COMPRESSION,
144 vendor=None,
145 references=[
146 Reference(
147 title="bzip2 File Format Documentation",
148 url="https://sourceware.org/bzip2/manual/manual.html",
149 ),
150 Reference(
151 title="bzip2 Technical Specification",
152 url="https://en.wikipedia.org/wiki/Bzip2",
153 ),
154 ],
155 limitations=[],
156 )
157
158 def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]:
159 if not _validate_stream_header(file):
160 raise InvalidInputFormat("Invalid bzip2 stream header")
161
162 if not _validate_block_header(file):
163 raise InvalidInputFormat("Invalid bzip2 block header")
164
165 context = Bzip2SearchContext(
166 start_offset=start_offset, file=file, end_block_offset=-1
167 )
168
169 try:
170 scanner = hyperscan_stream_end_magic_db.build(context, _hyperscan_match) # type: ignore
171 stream_scan(scanner, file)
172 except Exception as e:
173 logger.debug(
174 "Error scanning for bzip2 patterns",
175 error=e,
176 )
177
178 if context.end_block_offset > 0:
179 return ValidChunk(
180 start_offset=start_offset, end_offset=context.end_block_offset
181 )
182
183 raise InvalidInputFormat("No valid bzip2 compression block was detected")