Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/handlers/compression/compress.py: 100%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

98 statements  

1"""Unix compress'ed chunk identification. 

2 

3We identify the end offset of any identified unix compress'ed chunk by 

4performing Lempel-Ziv-Welch decompression on a chunk starting from the 

5identified start offset, and ending at the end of the whole file being 

6analyzed. 

7 

8If we reach an invalid code or the stream ends in the middle of a 

9code, we do not recursively call the decompression with -1 size, 

10rather just fail on the chunk as we have seen too many false-positives 

11picked up by this heuristic. 

12 

13Once the decompression procedure works without errors, that means we 

14have a valid chunk and can return its current end offset. 

15 

16We use a small heuristic to return the right end offset. This 

17heuristic tends to work well when arbitrary data appended at the end 

18of the stream is made of random bytes (no repeating letters, no large 

19set of ASCII letters). 

20 

21It obviously can be wrong from time to time, leading to a compress'ed 

22chunk that we can decompress (obviously), but uncompressed data will 

23contain garbage bytes at the end. 

24 

25Sadly, there is no way we can identify with 100% probability the end 

26offset of a compress'ed stream with byte precision if it is followed 

27by other content. 

28 

29The good news is that because of this behavior, it's highly unlikely 

30we will observe compress'ed chunks followed by other chunks in the 

31wild. The only ones I observed were followed by null bytes sentinels, 

32which helps identifying the exact end offset. 

33""" 

34 

35import io 

36 

37from structlog import get_logger 

38 

39from unblob.extractors import Command 

40 

41from ...file_utils import Endian, InvalidInputFormat, convert_int8, convert_int16 

42from ...models import ( 

43 File, 

44 HandlerDoc, 

45 HandlerType, 

46 HexString, 

47 Reference, 

48 StructHandler, 

49 ValidChunk, 

50) 

51 

52logger = get_logger() 

53 

54 

55class UnixCompressHandler(StructHandler): 

56 NAME = "compress" 

57 

58 PATTERNS = [ 

59 # reference: https://fuchsia.googlesource.com/third_party/wuffs/+/HEAD/std/lzw/README.md 

60 HexString("1f 9d") 

61 ] 

62 

63 C_DEFINITIONS = r""" 

64 struct compress_header { 

65 char magic[2]; // compress signature/magic number 

66 uint8 flags; // blocks = flags&0x80, bits = flags&0x1f 

67 }; 

68 """ 

69 HEADER_STRUCT = "compress_header" 

70 

71 EXTRACTOR = Command("7z", "x", "-y", "{inpath}", "-so", stdout="lzw.uncompressed") 

72 

73 DOC = HandlerDoc( 

74 name=NAME, 

75 description="Unix compress files use the Lempel-Ziv-Welch (LZW) algorithm for data compression and are identified by a 2-byte magic number (0x1F 0x9D). This format supports optional block compression and variable bit lengths ranging from 9 to 16 bits.", 

76 handler_type=HandlerType.COMPRESSION, 

77 vendor=None, 

78 references=[ 

79 Reference( 

80 title="Unix Compress File Format Documentation", 

81 url="https://fuchsia.googlesource.com/third_party/wuffs/+/HEAD/std/lzw/README.md", 

82 ), 

83 Reference( 

84 title="LZW Compression Algorithm", 

85 url="https://en.wikipedia.org/wiki/Lempel%E2%80%93Ziv%E2%80%93Welch", 

86 ), 

87 ], 

88 limitations=[], 

89 ) 

90 

91 def unlzw(self, file: File, start_offset: int, max_len: int) -> int: # noqa: C901 

92 """Calculate the end of a unix compress stream. 

93 

94 It performs decompression on a stream read from <file> from 

95 <start_offset> up until <max_len>. 

96 

97 Adapted from Brandon Owen works 

98 (https://github.com/umeat/unlzw). 

99 

100 Adapted from original work by Mark Adler - orginal copyright 

101 notice below 

102 

103 Copyright (C) 2014, 2015 Mark Adler This software is provided 

104 'as-is', without any express or implied warranty. In no event 

105 will the authors be held liable for any damages arising from 

106 the use of this software. Permission is granted to anyone to 

107 use this software for any purpose, including commercial 

108 applications, and to alter it and redistribute it freely, 

109 subject to the following restrictions: 

110 

111 1. The origin of this software must not be misrepresented; 

112 you must not claim that you wrote the original 

113 software. If you use this software in a product, an 

114 acknowledgment in the product documentation would be 

115 appreciated but is not required. 

116 

117 2. Altered source versions must be plainly marked as such, 

118 and must not be misrepresented as being the original 

119 software. 

120 

121 3. This notice may not be removed or altered from any 

122 source distribution. Mark Adler 

123 madler@alumni.caltech.edu 

124 """ 

125 file.seek(start_offset) 

126 

127 prefix: list[int] = [0] * 65536 # index to LZW prefix string 

128 

129 header = self.parse_header(file, Endian.LITTLE) 

130 

131 if header.flags & 0x60: 

132 raise InvalidInputFormat("Flag & 0x60") 

133 

134 max_ = header.flags & 0x1F 

135 if not (9 <= max_ <= 16): 

136 raise InvalidInputFormat("Invalid max") 

137 

138 if max_ == 9: 

139 max_ = 10 # 9 doesn't really mean 9 

140 

141 block_compressed = header.flags & 0x80 

142 end = 256 if block_compressed else 255 

143 

144 # Clear table, start at nine bits per symbol 

145 bits_per_symbol = 9 

146 mask = 0x1FF 

147 code = 0 

148 

149 # Set up: get the first 9-bit code, which is the first decompressed byte, 

150 # but don't create a table entry until the next code 

151 

152 buf = convert_int16(file.read(2), Endian.LITTLE) 

153 prev = buf & mask # code 

154 buf >>= bits_per_symbol 

155 left = 16 - bits_per_symbol 

156 if prev > 255: 

157 raise InvalidInputFormat("Invalid Data: First code must be a literal") 

158 

159 # Decode codes 

160 mark = 3 # start of compressed data 

161 nxt = 5 # consumed five bytes so far 

162 while nxt < max_len: 

163 # If the table will be full after this, increment the code size 

164 if (end >= mask) and (bits_per_symbol < max_): 

165 # Flush unused input bits and bytes to next 8*bits bit boundary 

166 # (this is a vestigial aspect of the compressed data format 

167 # derived from an implementation that made use of a special VAX 

168 # machine instruction!) 

169 remaining_bits = (nxt - mark) % bits_per_symbol 

170 

171 if remaining_bits: 

172 remaining_bits = bits_per_symbol - remaining_bits 

173 if remaining_bits >= max_len - nxt: 

174 break 

175 nxt += remaining_bits 

176 

177 buf = left = 0 

178 

179 # mark this new location for computing the next flush 

180 mark = nxt 

181 

182 # increment the number of bits per symbol 

183 bits_per_symbol += 1 

184 mask <<= 1 

185 mask += 1 

186 

187 # Get a code of bits bits 

188 buf += convert_int8(file.read(1), Endian.LITTLE) << left 

189 nxt += 1 

190 left += 8 

191 if left < bits_per_symbol: 

192 if nxt == max_len: 

193 raise InvalidInputFormat( 

194 "Invalid Data: Stream ended in the middle of a code", 

195 ) 

196 buf += convert_int8(file.read(1), Endian.LITTLE) << left 

197 nxt += 1 

198 

199 left += 8 

200 code = buf & mask 

201 buf >>= bits_per_symbol 

202 left -= bits_per_symbol 

203 

204 # process clear code (256) 

205 if (code == 256) and block_compressed: 

206 # Flush unused input bits and bytes to next 8*bits bit boundary 

207 remaining_bits = (nxt - mark) % bits_per_symbol 

208 if remaining_bits: 

209 remaining_bits = bits_per_symbol - remaining_bits 

210 if remaining_bits > max_len - nxt: 

211 break 

212 nxt += remaining_bits 

213 buf = left = 0 

214 

215 # Mark this location for computing the next flush 

216 mark = nxt 

217 

218 # Go back to nine bits per symbol 

219 bits_per_symbol = 9 # initialize bits and mask 

220 mask = 0x1FF 

221 end = 255 # empty table 

222 continue # get next code 

223 

224 # Process LZW code 

225 temp = code # save the current code 

226 

227 # Special code to reuse last match 

228 if code > end: 

229 # Be picky on the allowed code here, and make sure that the 

230 # code we drop through (prev) will be a valid index so that 

231 # random input does not cause an exception 

232 if (code != end + 1) or (prev > end): 

233 raise InvalidInputFormat("Invalid Data: Invalid code detected") 

234 code = prev 

235 

236 # Walk through linked list to generate output in reverse order 

237 while code >= 256: 

238 code = prefix[code] 

239 

240 # Link new table entry 

241 if end < mask: 

242 end += 1 

243 prefix[end] = prev 

244 

245 # Set previous code for next iteration 

246 prev = temp 

247 

248 if code == nxt - 1: 

249 return file.tell() 

250 

251 return file.tell() - 1 

252 

253 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None: 

254 file.seek(0, io.SEEK_END) 

255 max_len = file.tell() 

256 

257 end_offset = self.unlzw(file, start_offset, max_len) 

258 

259 chunk_length = end_offset - start_offset 

260 if chunk_length <= 5: 

261 raise InvalidInputFormat("Compressed chunk is too short") 

262 

263 return ValidChunk( 

264 start_offset=start_offset, 

265 end_offset=end_offset, 

266 )