1"""Unix compress'ed chunk identification.
2
3We identify the end offset of any identified unix compress'ed chunk by
4performing Lempel-Ziv-Welch decompression on a chunk starting from the
5identified start offset, and ending at the end of the whole file being
6analyzed.
7
8If we reach an invalid code or the stream ends in the middle of a
9code, we do not recursively call the decompression with -1 size,
10rather just fail on the chunk as we have seen too many false-positives
11picked up by this heuristic.
12
13Once the decompression procedure works without errors, that means we
14have a valid chunk and can return its current end offset.
15
16We use a small heuristic to return the right end offset. This
17heuristic tends to work well when arbitrary data appended at the end
18of the stream is made of random bytes (no repeating letters, no large
19set of ASCII letters).
20
21It obviously can be wrong from time to time, leading to a compress'ed
22chunk that we can decompress (obviously), but uncompressed data will
23contain garbage bytes at the end.
24
25Sadly, there is no way we can identify with 100% probability the end
26offset of a compress'ed stream with byte precision if it is followed
27by other content.
28
29The good news is that because of this behavior, it's highly unlikely
30we will observe compress'ed chunks followed by other chunks in the
31wild. The only ones I observed were followed by null bytes sentinels,
32which helps identifying the exact end offset.
33"""
34
35import io
36from typing import Optional
37
38from structlog import get_logger
39
40from unblob.extractors import Command
41
42from ...file_utils import Endian, InvalidInputFormat, convert_int8, convert_int16
43from ...models import (
44 File,
45 HandlerDoc,
46 HandlerType,
47 HexString,
48 Reference,
49 StructHandler,
50 ValidChunk,
51)
52
53logger = get_logger()
54
55
56class UnixCompressHandler(StructHandler):
57 NAME = "compress"
58
59 PATTERNS = [
60 # reference: https://fuchsia.googlesource.com/third_party/wuffs/+/HEAD/std/lzw/README.md
61 HexString("1f 9d")
62 ]
63
64 C_DEFINITIONS = r"""
65 struct compress_header {
66 char magic[2]; // compress signature/magic number
67 uint8 flags; // blocks = flags&0x80, bits = flags&0x1f
68 };
69 """
70 HEADER_STRUCT = "compress_header"
71
72 EXTRACTOR = Command("7z", "x", "-y", "{inpath}", "-so", stdout="lzw.uncompressed")
73
74 DOC = HandlerDoc(
75 name=NAME,
76 description="Unix compress files use the Lempel-Ziv-Welch (LZW) algorithm for data compression and are identified by a 2-byte magic number (0x1F 0x9D). This format supports optional block compression and variable bit lengths ranging from 9 to 16 bits.",
77 handler_type=HandlerType.COMPRESSION,
78 vendor=None,
79 references=[
80 Reference(
81 title="Unix Compress File Format Documentation",
82 url="https://fuchsia.googlesource.com/third_party/wuffs/+/HEAD/std/lzw/README.md",
83 ),
84 Reference(
85 title="LZW Compression Algorithm",
86 url="https://en.wikipedia.org/wiki/Lempel%E2%80%93Ziv%E2%80%93Welch",
87 ),
88 ],
89 limitations=[],
90 )
91
92 def unlzw(self, file: File, start_offset: int, max_len: int) -> int: # noqa: C901
93 """Calculate the end of a unix compress stream.
94
95 It performs decompression on a stream read from <file> from
96 <start_offset> up until <max_len>.
97
98 Adapted from Brandon Owen works
99 (https://github.com/umeat/unlzw).
100
101 Adapted from original work by Mark Adler - orginal copyright
102 notice below
103
104 Copyright (C) 2014, 2015 Mark Adler This software is provided
105 'as-is', without any express or implied warranty. In no event
106 will the authors be held liable for any damages arising from
107 the use of this software. Permission is granted to anyone to
108 use this software for any purpose, including commercial
109 applications, and to alter it and redistribute it freely,
110 subject to the following restrictions:
111
112 1. The origin of this software must not be misrepresented;
113 you must not claim that you wrote the original
114 software. If you use this software in a product, an
115 acknowledgment in the product documentation would be
116 appreciated but is not required.
117
118 2. Altered source versions must be plainly marked as such,
119 and must not be misrepresented as being the original
120 software.
121
122 3. This notice may not be removed or altered from any
123 source distribution. Mark Adler
124 madler@alumni.caltech.edu
125 """
126 file.seek(start_offset)
127
128 prefix: list[int] = [0] * 65536 # index to LZW prefix string
129
130 header = self.parse_header(file, Endian.LITTLE)
131
132 if header.flags & 0x60:
133 raise InvalidInputFormat("Flag & 0x60")
134
135 max_ = header.flags & 0x1F
136 if not (9 <= max_ <= 16):
137 raise InvalidInputFormat("Invalid max")
138
139 if max_ == 9:
140 max_ = 10 # 9 doesn't really mean 9
141
142 block_compressed = header.flags & 0x80
143 end = 256 if block_compressed else 255
144
145 # Clear table, start at nine bits per symbol
146 bits_per_symbol = 9
147 mask = 0x1FF
148 code = 0
149
150 # Set up: get the first 9-bit code, which is the first decompressed byte,
151 # but don't create a table entry until the next code
152
153 buf = convert_int16(file.read(2), Endian.LITTLE)
154 prev = buf & mask # code
155 buf >>= bits_per_symbol
156 left = 16 - bits_per_symbol
157 if prev > 255:
158 raise InvalidInputFormat("Invalid Data: First code must be a literal")
159
160 # Decode codes
161 mark = 3 # start of compressed data
162 nxt = 5 # consumed five bytes so far
163 while nxt < max_len:
164 # If the table will be full after this, increment the code size
165 if (end >= mask) and (bits_per_symbol < max_):
166 # Flush unused input bits and bytes to next 8*bits bit boundary
167 # (this is a vestigial aspect of the compressed data format
168 # derived from an implementation that made use of a special VAX
169 # machine instruction!)
170 remaining_bits = (nxt - mark) % bits_per_symbol
171
172 if remaining_bits:
173 remaining_bits = bits_per_symbol - remaining_bits
174 if remaining_bits >= max_len - nxt:
175 break
176 nxt += remaining_bits
177
178 buf = left = 0
179
180 # mark this new location for computing the next flush
181 mark = nxt
182
183 # increment the number of bits per symbol
184 bits_per_symbol += 1
185 mask <<= 1
186 mask += 1
187
188 # Get a code of bits bits
189 buf += convert_int8(file.read(1), Endian.LITTLE) << left
190 nxt += 1
191 left += 8
192 if left < bits_per_symbol:
193 if nxt == max_len:
194 raise InvalidInputFormat(
195 "Invalid Data: Stream ended in the middle of a code",
196 )
197 buf += convert_int8(file.read(1), Endian.LITTLE) << left
198 nxt += 1
199
200 left += 8
201 code = buf & mask
202 buf >>= bits_per_symbol
203 left -= bits_per_symbol
204
205 # process clear code (256)
206 if (code == 256) and block_compressed:
207 # Flush unused input bits and bytes to next 8*bits bit boundary
208 remaining_bits = (nxt - mark) % bits_per_symbol
209 if remaining_bits:
210 remaining_bits = bits_per_symbol - remaining_bits
211 if remaining_bits > max_len - nxt:
212 break
213 nxt += remaining_bits
214 buf = left = 0
215
216 # Mark this location for computing the next flush
217 mark = nxt
218
219 # Go back to nine bits per symbol
220 bits_per_symbol = 9 # initialize bits and mask
221 mask = 0x1FF
222 end = 255 # empty table
223 continue # get next code
224
225 # Process LZW code
226 temp = code # save the current code
227
228 # Special code to reuse last match
229 if code > end:
230 # Be picky on the allowed code here, and make sure that the
231 # code we drop through (prev) will be a valid index so that
232 # random input does not cause an exception
233 if (code != end + 1) or (prev > end):
234 raise InvalidInputFormat("Invalid Data: Invalid code detected")
235 code = prev
236
237 # Walk through linked list to generate output in reverse order
238 while code >= 256:
239 code = prefix[code]
240
241 # Link new table entry
242 if end < mask:
243 end += 1
244 prefix[end] = prev
245
246 # Set previous code for next iteration
247 prev = temp
248
249 if code == nxt - 1:
250 return file.tell()
251
252 return file.tell() - 1
253
254 def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]:
255 file.seek(0, io.SEEK_END)
256 max_len = file.tell()
257
258 end_offset = self.unlzw(file, start_offset, max_len)
259
260 chunk_length = end_offset - start_offset
261 if chunk_length <= 5:
262 raise InvalidInputFormat("Compressed chunk is too short")
263
264 return ValidChunk(
265 start_offset=start_offset,
266 end_offset=end_offset,
267 )