1"""Unix compress'ed chunk identification.
2
3We identify the end offset of any identified unix compress'ed chunk by
4performing Lempel-Ziv-Welch decompression on a chunk starting from the
5identified start offset, and ending at the end of the whole file being
6analyzed.
7
8If we reach an invalid code or the stream ends in the middle of a
9code, we do not recursively call the decompression with -1 size,
10rather just fail on the chunk as we have seen too many false-positives
11picked up by this heuristic.
12
13Once the decompression procedure works without errors, that means we
14have a valid chunk and can return its current end offset.
15
16We use a small heuristic to return the right end offset. This
17heuristic tends to work well when arbitrary data appended at the end
18of the stream is made of random bytes (no repeating letters, no large
19set of ASCII letters).
20
21It obviously can be wrong from time to time, leading to a compress'ed
22chunk that we can decompress (obviously), but uncompressed data will
23contain garbage bytes at the end.
24
25Sadly, there is no way we can identify with 100% probability the end
26offset of a compress'ed stream with byte precision if it is followed
27by other content.
28
29The good news is that because of this behavior, it's highly unlikely
30we will observe compress'ed chunks followed by other chunks in the
31wild. The only ones I observed were followed by null bytes sentinels,
32which helps identifying the exact end offset.
33"""
34
35import io
36
37from structlog import get_logger
38
39from unblob.extractors import Command
40
41from ...file_utils import Endian, InvalidInputFormat, convert_int8, convert_int16
42from ...models import (
43 File,
44 HandlerDoc,
45 HandlerType,
46 HexString,
47 Reference,
48 StructHandler,
49 ValidChunk,
50)
51
52logger = get_logger()
53
54
55class UnixCompressHandler(StructHandler):
56 NAME = "compress"
57
58 PATTERNS = [
59 # reference: https://fuchsia.googlesource.com/third_party/wuffs/+/HEAD/std/lzw/README.md
60 HexString("1f 9d")
61 ]
62
63 C_DEFINITIONS = r"""
64 struct compress_header {
65 char magic[2]; // compress signature/magic number
66 uint8 flags; // blocks = flags&0x80, bits = flags&0x1f
67 };
68 """
69 HEADER_STRUCT = "compress_header"
70
71 EXTRACTOR = Command("7z", "x", "-y", "{inpath}", "-so", stdout="lzw.uncompressed")
72
73 DOC = HandlerDoc(
74 name=NAME,
75 description="Unix compress files use the Lempel-Ziv-Welch (LZW) algorithm for data compression and are identified by a 2-byte magic number (0x1F 0x9D). This format supports optional block compression and variable bit lengths ranging from 9 to 16 bits.",
76 handler_type=HandlerType.COMPRESSION,
77 vendor=None,
78 references=[
79 Reference(
80 title="Unix Compress File Format Documentation",
81 url="https://fuchsia.googlesource.com/third_party/wuffs/+/HEAD/std/lzw/README.md",
82 ),
83 Reference(
84 title="LZW Compression Algorithm",
85 url="https://en.wikipedia.org/wiki/Lempel%E2%80%93Ziv%E2%80%93Welch",
86 ),
87 ],
88 limitations=[],
89 )
90
91 def unlzw(self, file: File, start_offset: int, max_len: int) -> int: # noqa: C901
92 """Calculate the end of a unix compress stream.
93
94 It performs decompression on a stream read from <file> from
95 <start_offset> up until <max_len>.
96
97 Adapted from Brandon Owen works
98 (https://github.com/umeat/unlzw).
99
100 Adapted from original work by Mark Adler - orginal copyright
101 notice below
102
103 Copyright (C) 2014, 2015 Mark Adler This software is provided
104 'as-is', without any express or implied warranty. In no event
105 will the authors be held liable for any damages arising from
106 the use of this software. Permission is granted to anyone to
107 use this software for any purpose, including commercial
108 applications, and to alter it and redistribute it freely,
109 subject to the following restrictions:
110
111 1. The origin of this software must not be misrepresented;
112 you must not claim that you wrote the original
113 software. If you use this software in a product, an
114 acknowledgment in the product documentation would be
115 appreciated but is not required.
116
117 2. Altered source versions must be plainly marked as such,
118 and must not be misrepresented as being the original
119 software.
120
121 3. This notice may not be removed or altered from any
122 source distribution. Mark Adler
123 madler@alumni.caltech.edu
124 """
125 file.seek(start_offset)
126
127 prefix: list[int] = [0] * 65536 # index to LZW prefix string
128
129 header = self.parse_header(file, Endian.LITTLE)
130
131 if header.flags & 0x60:
132 raise InvalidInputFormat("Flag & 0x60")
133
134 max_ = header.flags & 0x1F
135 if not (9 <= max_ <= 16):
136 raise InvalidInputFormat("Invalid max")
137
138 if max_ == 9:
139 max_ = 10 # 9 doesn't really mean 9
140
141 block_compressed = header.flags & 0x80
142 end = 256 if block_compressed else 255
143
144 # Clear table, start at nine bits per symbol
145 bits_per_symbol = 9
146 mask = 0x1FF
147 code = 0
148
149 # Set up: get the first 9-bit code, which is the first decompressed byte,
150 # but don't create a table entry until the next code
151
152 buf = convert_int16(file.read(2), Endian.LITTLE)
153 prev = buf & mask # code
154 buf >>= bits_per_symbol
155 left = 16 - bits_per_symbol
156 if prev > 255:
157 raise InvalidInputFormat("Invalid Data: First code must be a literal")
158
159 # Decode codes
160 mark = 3 # start of compressed data
161 nxt = 5 # consumed five bytes so far
162 while nxt < max_len:
163 # If the table will be full after this, increment the code size
164 if (end >= mask) and (bits_per_symbol < max_):
165 # Flush unused input bits and bytes to next 8*bits bit boundary
166 # (this is a vestigial aspect of the compressed data format
167 # derived from an implementation that made use of a special VAX
168 # machine instruction!)
169 remaining_bits = (nxt - mark) % bits_per_symbol
170
171 if remaining_bits:
172 remaining_bits = bits_per_symbol - remaining_bits
173 if remaining_bits >= max_len - nxt:
174 break
175 nxt += remaining_bits
176
177 buf = left = 0
178
179 # mark this new location for computing the next flush
180 mark = nxt
181
182 # increment the number of bits per symbol
183 bits_per_symbol += 1
184 mask <<= 1
185 mask += 1
186
187 # Get a code of bits bits
188 buf += convert_int8(file.read(1), Endian.LITTLE) << left
189 nxt += 1
190 left += 8
191 if left < bits_per_symbol:
192 if nxt == max_len:
193 raise InvalidInputFormat(
194 "Invalid Data: Stream ended in the middle of a code",
195 )
196 buf += convert_int8(file.read(1), Endian.LITTLE) << left
197 nxt += 1
198
199 left += 8
200 code = buf & mask
201 buf >>= bits_per_symbol
202 left -= bits_per_symbol
203
204 # process clear code (256)
205 if (code == 256) and block_compressed:
206 # Flush unused input bits and bytes to next 8*bits bit boundary
207 remaining_bits = (nxt - mark) % bits_per_symbol
208 if remaining_bits:
209 remaining_bits = bits_per_symbol - remaining_bits
210 if remaining_bits > max_len - nxt:
211 break
212 nxt += remaining_bits
213 buf = left = 0
214
215 # Mark this location for computing the next flush
216 mark = nxt
217
218 # Go back to nine bits per symbol
219 bits_per_symbol = 9 # initialize bits and mask
220 mask = 0x1FF
221 end = 255 # empty table
222 continue # get next code
223
224 # Process LZW code
225 temp = code # save the current code
226
227 # Special code to reuse last match
228 if code > end:
229 # Be picky on the allowed code here, and make sure that the
230 # code we drop through (prev) will be a valid index so that
231 # random input does not cause an exception
232 if (code != end + 1) or (prev > end):
233 raise InvalidInputFormat("Invalid Data: Invalid code detected")
234 code = prev
235
236 # Walk through linked list to generate output in reverse order
237 while code >= 256:
238 code = prefix[code]
239
240 # Link new table entry
241 if end < mask:
242 end += 1
243 prefix[end] = prev
244
245 # Set previous code for next iteration
246 prev = temp
247
248 if code == nxt - 1:
249 return file.tell()
250
251 return file.tell() - 1
252
253 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None:
254 file.seek(0, io.SEEK_END)
255 max_len = file.tell()
256
257 end_offset = self.unlzw(file, start_offset, max_len)
258
259 chunk_length = end_offset - start_offset
260 if chunk_length <= 5:
261 raise InvalidInputFormat("Compressed chunk is too short")
262
263 return ValidChunk(
264 start_offset=start_offset,
265 end_offset=end_offset,
266 )