Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/handlers/compression/compress.py: 100%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

99 statements  

1"""Unix compress'ed chunk identification. 

2 

3We identify the end offset of any identified unix compress'ed chunk by 

4performing Lempel-Ziv-Welch decompression on a chunk starting from the 

5identified start offset, and ending at the end of the whole file being 

6analyzed. 

7 

8If we reach an invalid code or the stream ends in the middle of a 

9code, we do not recursively call the decompression with -1 size, 

10rather just fail on the chunk as we have seen too many false-positives 

11picked up by this heuristic. 

12 

13Once the decompression procedure works without errors, that means we 

14have a valid chunk and can return its current end offset. 

15 

16We use a small heuristic to return the right end offset. This 

17heuristic tends to work well when arbitrary data appended at the end 

18of the stream is made of random bytes (no repeating letters, no large 

19set of ASCII letters). 

20 

21It obviously can be wrong from time to time, leading to a compress'ed 

22chunk that we can decompress (obviously), but uncompressed data will 

23contain garbage bytes at the end. 

24 

25Sadly, there is no way we can identify with 100% probability the end 

26offset of a compress'ed stream with byte precision if it is followed 

27by other content. 

28 

29The good news is that because of this behavior, it's highly unlikely 

30we will observe compress'ed chunks followed by other chunks in the 

31wild. The only ones I observed were followed by null bytes sentinels, 

32which helps identifying the exact end offset. 

33""" 

34 

35import io 

36from typing import Optional 

37 

38from structlog import get_logger 

39 

40from unblob.extractors import Command 

41 

42from ...file_utils import Endian, InvalidInputFormat, convert_int8, convert_int16 

43from ...models import ( 

44 File, 

45 HandlerDoc, 

46 HandlerType, 

47 HexString, 

48 Reference, 

49 StructHandler, 

50 ValidChunk, 

51) 

52 

53logger = get_logger() 

54 

55 

56class UnixCompressHandler(StructHandler): 

57 NAME = "compress" 

58 

59 PATTERNS = [ 

60 # reference: https://fuchsia.googlesource.com/third_party/wuffs/+/HEAD/std/lzw/README.md 

61 HexString("1f 9d") 

62 ] 

63 

64 C_DEFINITIONS = r""" 

65 struct compress_header { 

66 char magic[2]; // compress signature/magic number 

67 uint8 flags; // blocks = flags&0x80, bits = flags&0x1f 

68 }; 

69 """ 

70 HEADER_STRUCT = "compress_header" 

71 

72 EXTRACTOR = Command("7z", "x", "-y", "{inpath}", "-so", stdout="lzw.uncompressed") 

73 

74 DOC = HandlerDoc( 

75 name=NAME, 

76 description="Unix compress files use the Lempel-Ziv-Welch (LZW) algorithm for data compression and are identified by a 2-byte magic number (0x1F 0x9D). This format supports optional block compression and variable bit lengths ranging from 9 to 16 bits.", 

77 handler_type=HandlerType.COMPRESSION, 

78 vendor=None, 

79 references=[ 

80 Reference( 

81 title="Unix Compress File Format Documentation", 

82 url="https://fuchsia.googlesource.com/third_party/wuffs/+/HEAD/std/lzw/README.md", 

83 ), 

84 Reference( 

85 title="LZW Compression Algorithm", 

86 url="https://en.wikipedia.org/wiki/Lempel%E2%80%93Ziv%E2%80%93Welch", 

87 ), 

88 ], 

89 limitations=[], 

90 ) 

91 

92 def unlzw(self, file: File, start_offset: int, max_len: int) -> int: # noqa: C901 

93 """Calculate the end of a unix compress stream. 

94 

95 It performs decompression on a stream read from <file> from 

96 <start_offset> up until <max_len>. 

97 

98 Adapted from Brandon Owen works 

99 (https://github.com/umeat/unlzw). 

100 

101 Adapted from original work by Mark Adler - orginal copyright 

102 notice below 

103 

104 Copyright (C) 2014, 2015 Mark Adler This software is provided 

105 'as-is', without any express or implied warranty. In no event 

106 will the authors be held liable for any damages arising from 

107 the use of this software. Permission is granted to anyone to 

108 use this software for any purpose, including commercial 

109 applications, and to alter it and redistribute it freely, 

110 subject to the following restrictions: 

111 

112 1. The origin of this software must not be misrepresented; 

113 you must not claim that you wrote the original 

114 software. If you use this software in a product, an 

115 acknowledgment in the product documentation would be 

116 appreciated but is not required. 

117 

118 2. Altered source versions must be plainly marked as such, 

119 and must not be misrepresented as being the original 

120 software. 

121 

122 3. This notice may not be removed or altered from any 

123 source distribution. Mark Adler 

124 madler@alumni.caltech.edu 

125 """ 

126 file.seek(start_offset) 

127 

128 prefix: list[int] = [0] * 65536 # index to LZW prefix string 

129 

130 header = self.parse_header(file, Endian.LITTLE) 

131 

132 if header.flags & 0x60: 

133 raise InvalidInputFormat("Flag & 0x60") 

134 

135 max_ = header.flags & 0x1F 

136 if not (9 <= max_ <= 16): 

137 raise InvalidInputFormat("Invalid max") 

138 

139 if max_ == 9: 

140 max_ = 10 # 9 doesn't really mean 9 

141 

142 block_compressed = header.flags & 0x80 

143 end = 256 if block_compressed else 255 

144 

145 # Clear table, start at nine bits per symbol 

146 bits_per_symbol = 9 

147 mask = 0x1FF 

148 code = 0 

149 

150 # Set up: get the first 9-bit code, which is the first decompressed byte, 

151 # but don't create a table entry until the next code 

152 

153 buf = convert_int16(file.read(2), Endian.LITTLE) 

154 prev = buf & mask # code 

155 buf >>= bits_per_symbol 

156 left = 16 - bits_per_symbol 

157 if prev > 255: 

158 raise InvalidInputFormat("Invalid Data: First code must be a literal") 

159 

160 # Decode codes 

161 mark = 3 # start of compressed data 

162 nxt = 5 # consumed five bytes so far 

163 while nxt < max_len: 

164 # If the table will be full after this, increment the code size 

165 if (end >= mask) and (bits_per_symbol < max_): 

166 # Flush unused input bits and bytes to next 8*bits bit boundary 

167 # (this is a vestigial aspect of the compressed data format 

168 # derived from an implementation that made use of a special VAX 

169 # machine instruction!) 

170 remaining_bits = (nxt - mark) % bits_per_symbol 

171 

172 if remaining_bits: 

173 remaining_bits = bits_per_symbol - remaining_bits 

174 if remaining_bits >= max_len - nxt: 

175 break 

176 nxt += remaining_bits 

177 

178 buf = left = 0 

179 

180 # mark this new location for computing the next flush 

181 mark = nxt 

182 

183 # increment the number of bits per symbol 

184 bits_per_symbol += 1 

185 mask <<= 1 

186 mask += 1 

187 

188 # Get a code of bits bits 

189 buf += convert_int8(file.read(1), Endian.LITTLE) << left 

190 nxt += 1 

191 left += 8 

192 if left < bits_per_symbol: 

193 if nxt == max_len: 

194 raise InvalidInputFormat( 

195 "Invalid Data: Stream ended in the middle of a code", 

196 ) 

197 buf += convert_int8(file.read(1), Endian.LITTLE) << left 

198 nxt += 1 

199 

200 left += 8 

201 code = buf & mask 

202 buf >>= bits_per_symbol 

203 left -= bits_per_symbol 

204 

205 # process clear code (256) 

206 if (code == 256) and block_compressed: 

207 # Flush unused input bits and bytes to next 8*bits bit boundary 

208 remaining_bits = (nxt - mark) % bits_per_symbol 

209 if remaining_bits: 

210 remaining_bits = bits_per_symbol - remaining_bits 

211 if remaining_bits > max_len - nxt: 

212 break 

213 nxt += remaining_bits 

214 buf = left = 0 

215 

216 # Mark this location for computing the next flush 

217 mark = nxt 

218 

219 # Go back to nine bits per symbol 

220 bits_per_symbol = 9 # initialize bits and mask 

221 mask = 0x1FF 

222 end = 255 # empty table 

223 continue # get next code 

224 

225 # Process LZW code 

226 temp = code # save the current code 

227 

228 # Special code to reuse last match 

229 if code > end: 

230 # Be picky on the allowed code here, and make sure that the 

231 # code we drop through (prev) will be a valid index so that 

232 # random input does not cause an exception 

233 if (code != end + 1) or (prev > end): 

234 raise InvalidInputFormat("Invalid Data: Invalid code detected") 

235 code = prev 

236 

237 # Walk through linked list to generate output in reverse order 

238 while code >= 256: 

239 code = prefix[code] 

240 

241 # Link new table entry 

242 if end < mask: 

243 end += 1 

244 prefix[end] = prev 

245 

246 # Set previous code for next iteration 

247 prev = temp 

248 

249 if code == nxt - 1: 

250 return file.tell() 

251 

252 return file.tell() - 1 

253 

254 def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]: 

255 file.seek(0, io.SEEK_END) 

256 max_len = file.tell() 

257 

258 end_offset = self.unlzw(file, start_offset, max_len) 

259 

260 chunk_length = end_offset - start_offset 

261 if chunk_length <= 5: 

262 raise InvalidInputFormat("Compressed chunk is too short") 

263 

264 return ValidChunk( 

265 start_offset=start_offset, 

266 end_offset=end_offset, 

267 )