1import attrs
2from pyperscan import Flag, Pattern, Scan, StreamDatabase
3from structlog import get_logger
4
5from unblob.extractors import Command
6
7from ...file_utils import InvalidInputFormat, SeekError, StructParser, stream_scan
8from ...models import (
9 File,
10 Handler,
11 HandlerDoc,
12 HandlerType,
13 HexString,
14 Reference,
15 Regex,
16 ValidChunk,
17)
18
19logger = get_logger()
20
21C_DEFINITIONS = r"""
22 typedef struct stream_header {
23 char magic[2]; // 'BZ' signature/magic number
24 uint8 version; // 'h' 0x68 for Bzip2 ('H'uffman coding), '0' for Bzip1 (deprecated)
25 uint8 hundred_k_blocksize; // '1'..'9' block-size 100 kB-900 kB (uncompressed)
26 } stream_header_t;
27
28 typedef struct block_header {
29 char magic[6]; // 0x314159265359 (BCD (pi))
30 uint32 crc; // checksum for this block
31 uint8 randomised; // 0=>normal, 1=>randomised (deprecated)
32 } block_header_t;
33"""
34
35
36STREAM_MAGIC = b"BZ"
37HUFFMAN_VERSION = ord("h")
38HUNDRED_K_BLOCK_MIN = ord("1")
39HUNDRED_K_BLOCK_MAX = ord("9")
40
41# 0x314159265359 (BCD (pi))
42BLOCK_MAGIC = b"1AY&SY"
43
44# Stream ends with a magic 0x177245385090 though it is not aligned
45# to byte offsets, so we pre-calculated all possible 8 shifts
46# for bit_shift in range(8):
47# print(hex(0x1772_4538_5090 << bit_shift))
48STREAM_END_MAGIC_PATTERNS = [
49 HexString("17 72 45 38 50 90"),
50 HexString("2e e4 8a 70 a1 2?"),
51 HexString("5d c9 14 e1 42 4?"),
52 HexString("bb 92 29 c2 84 8?"),
53 HexString("?1 77 24 53 85 09"),
54 HexString("?2 ee 48 a7 0a 12"),
55 HexString("?5 dc 91 4e 14 24"),
56 HexString("?b b9 22 9c 28 48"),
57]
58
59# 6 bytes magic + 4 bytes combined CRC
60STREAM_FOOTER_SIZE = 6 + 4
61
62
63def build_stream_end_scan_db(pattern_list):
64 return StreamDatabase(
65 *(Pattern(p.as_regex(), Flag.SOM_LEFTMOST, Flag.DOTALL) for p in pattern_list)
66 )
67
68
69hyperscan_stream_end_magic_db = build_stream_end_scan_db(STREAM_END_MAGIC_PATTERNS)
70parser = StructParser(C_DEFINITIONS)
71
72
73@attrs.define
74class Bzip2SearchContext:
75 start_offset: int
76 file: File
77 end_block_offset: int
78
79
80def _validate_stream_header(file: File):
81 try:
82 header = parser.cparser_be.stream_header_t(file)
83 except EOFError:
84 return False
85
86 return (
87 header.magic == STREAM_MAGIC
88 and header.version == HUFFMAN_VERSION
89 and HUNDRED_K_BLOCK_MIN <= header.hundred_k_blocksize <= HUNDRED_K_BLOCK_MAX
90 )
91
92
93def _validate_block_header(file: File):
94 try:
95 header = parser.cparser_be.block_header_t(file)
96 except EOFError:
97 return False
98
99 return header.magic == BLOCK_MAGIC
100
101
102def _hyperscan_match(
103 context: Bzip2SearchContext, pattern_id: int, offset: int, end: int
104) -> Scan:
105 del end # unused argument
106 # Ignore any match before the start of this chunk
107 if offset < context.start_offset:
108 return Scan.Continue
109
110 last_block_end = offset + STREAM_FOOTER_SIZE
111 if pattern_id > 3:
112 last_block_end += 1
113
114 # We try seek to the end of the stream
115 try:
116 context.file.seek(last_block_end)
117 except SeekError:
118 return Scan.Terminate
119
120 context.end_block_offset = last_block_end
121
122 # Check if there is a next stream starting after the end of this stream
123 # and try to continue processing that as well
124 if _validate_stream_header(context.file) and _validate_block_header(context.file):
125 return Scan.Continue
126
127 return Scan.Terminate
128
129
130class BZip2Handler(Handler):
131 NAME = "bzip2"
132
133 # magic + version + block_size + block header magic
134 PATTERNS = [Regex(r"\x42\x5a\x68[\x31-\x39]\x31\x41\x59\x26\x53\x59")]
135
136 EXTRACTOR = Command("7z", "x", "-y", "{inpath}", "-so", stdout="bzip2.uncompressed")
137
138 DOC = HandlerDoc(
139 name=NAME,
140 description="The bzip2 format is a block-based compression format that uses the Burrows-Wheeler transform and Huffman coding for high compression efficiency. Each stream starts with a header and consists of one or more compressed blocks, ending with a footer containing a checksum.",
141 handler_type=HandlerType.COMPRESSION,
142 vendor=None,
143 references=[
144 Reference(
145 title="bzip2 File Format Documentation",
146 url="https://sourceware.org/bzip2/manual/manual.html",
147 ),
148 Reference(
149 title="bzip2 Technical Specification",
150 url="https://en.wikipedia.org/wiki/Bzip2",
151 ),
152 ],
153 limitations=[],
154 )
155
156 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None:
157 if not _validate_stream_header(file):
158 raise InvalidInputFormat("Invalid bzip2 stream header")
159
160 if not _validate_block_header(file):
161 raise InvalidInputFormat("Invalid bzip2 block header")
162
163 context = Bzip2SearchContext(
164 start_offset=start_offset, file=file, end_block_offset=-1
165 )
166
167 try:
168 scanner = hyperscan_stream_end_magic_db.build(context, _hyperscan_match) # type: ignore
169 stream_scan(scanner, file)
170 except Exception as e:
171 logger.debug(
172 "Error scanning for bzip2 patterns",
173 error=e,
174 )
175
176 if context.end_block_offset > 0:
177 return ValidChunk(
178 start_offset=start_offset, end_offset=context.end_block_offset
179 )
180
181 raise InvalidInputFormat("No valid bzip2 compression block was detected")