Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/handlers/compression/lz4.py: 98%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

102 statements  

1"""LZ4 handler. 

2 

3Frame format definition: https://github.com/lz4/lz4/blob/dev/doc/lz4_Frame_format.md. 

4""" 

5 

6import io 

7 

8from lz4.block import LZ4BlockError, decompress 

9from structlog import get_logger 

10 

11from unblob.extractors import Command 

12 

13from ...file_utils import Endian, InvalidInputFormat, convert_int8, convert_int32 

14from ...models import ( 

15 File, 

16 Handler, 

17 HandlerDoc, 

18 HandlerType, 

19 HexString, 

20 Reference, 

21 ValidChunk, 

22) 

23 

24logger = get_logger() 

25 

26SKIPPABLE_FRAMES_MAGIC = [0x184D2A50 + i for i in range(16)] 

27FRAME_MAGIC = 0x184D2204 

28LEGACY_FRAME_MAGIC = 0x184C2102 

29FRAME_MAGICS = [*SKIPPABLE_FRAMES_MAGIC, FRAME_MAGIC, LEGACY_FRAME_MAGIC] 

30 

31_1BIT = 0x01 

32_2BITS = 0x03 

33 

34# highest bit of the block size flags an uncompressed block, the lower 31 bits hold the size 

35BLOCK_SIZE_MASK = 0x7FFFFFFF 

36 

37END_MARK = 0x00000000 

38 

39CONTENT_SIZE_LEN = 8 

40BLOCK_SIZE_LEN = FRAME_SIZE_LEN = BLOCK_CHECKSUM_LEN = CONTENT_CHECKSUM_LEN = ( 

41 MAGIC_LEN 

42) = DICTID_LEN = 4 

43FLG_LEN = BD_LEN = HC_LEN = 1 

44MAX_LEGACY_BLOCK_SIZE = 8 * 1024 * 1024 # 8 MB 

45 

46 

47class FLG: 

48 """Represents the FLG field.""" 

49 

50 version: int = 0 

51 block_independence: int = 0 

52 block_checksum: int = 0 

53 content_size: int = 0 

54 content_checksum: int = 0 

55 dictid: int = 0 

56 

57 def __init__(self, raw_flg: int): 

58 self.version = (raw_flg >> 6) & _2BITS 

59 self.block_independence = (raw_flg >> 5) & _1BIT 

60 self.block_checksum = (raw_flg >> 4) & _1BIT 

61 self.content_size = (raw_flg >> 3) & _1BIT 

62 self.content_checksum = (raw_flg >> 2) & _1BIT 

63 self.dictid = raw_flg & _1BIT 

64 

65 def as_dict(self) -> dict: 

66 return { 

67 "version": self.version, 

68 "block_independence": self.block_independence, 

69 "block_checksum": self.block_checksum, 

70 "content_size": self.content_size, 

71 "content_checksum": self.content_checksum, 

72 "dictid": self.dictid, 

73 } 

74 

75 

76class _LZ4HandlerBase(Handler): 

77 """A common base for all LZ4 formats.""" 

78 

79 def _skip_magic_bytes(self, file: File): 

80 file.seek(MAGIC_LEN, io.SEEK_CUR) 

81 

82 EXTRACTOR = Command("lz4", "--decompress", "{inpath}", "{outdir}/lz4.uncompressed") 

83 

84 

85class LegacyFrameHandler(_LZ4HandlerBase): 

86 NAME = "lz4_legacy" 

87 PATTERNS = [HexString("02 21 4C 18")] 

88 

89 DOC = HandlerDoc( 

90 name="LZ4 (legacy)", 

91 description="LZ4 legacy format is an older framing format used prior to the LZ4 Frame specification, featuring a simpler structure and no support for skippable frames or extensive metadata. Unlike the default LZ4 Frame format, it lacks built-in checksums, versioning, or block independence flags, making it less robust and primarily used for backward compatibility.", 

92 handler_type=HandlerType.COMPRESSION, 

93 vendor=None, 

94 references=[ 

95 Reference( 

96 title="LZ4 Frame Format Documentation", 

97 url="https://github.com/lz4/lz4/blob/dev/doc/lz4_Frame_format.md", 

98 ), 

99 Reference( 

100 title="LZ4 Wikipedia", 

101 url="https://en.wikipedia.org/wiki/LZ4_(compression_algorithm)", 

102 ), 

103 ], 

104 limitations=[], 

105 ) 

106 

107 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None: 

108 self._skip_magic_bytes(file) 

109 

110 while True: 

111 # The last block is detected either because it is followed by the “EOF” (End of File) mark, 

112 # or because it is followed by a known Frame Magic Number. 

113 raw_bsize = file.read(BLOCK_SIZE_LEN) 

114 if raw_bsize == b"": # EOF 

115 break 

116 

117 block_compressed_size = convert_int32(raw_bsize, Endian.LITTLE) 

118 if block_compressed_size in FRAME_MAGICS: 

119 # next magic, read too far 

120 file.seek(-4, io.SEEK_CUR) 

121 break 

122 

123 compressed_block = file.read(block_compressed_size) 

124 try: 

125 uncompressed_block = decompress(compressed_block, MAX_LEGACY_BLOCK_SIZE) 

126 except LZ4BlockError: 

127 raise InvalidInputFormat("Invalid LZ4 legacy frame.") from None 

128 

129 # See 'fixed block size' in https://android.googlesource.com/platform/external/lz4/+/HEAD/doc/lz4_Frame_format.md#legacy-frame 

130 if len(uncompressed_block) < MAX_LEGACY_BLOCK_SIZE: 

131 break 

132 

133 end_offset = file.tell() 

134 return ValidChunk(start_offset=start_offset, end_offset=end_offset) 

135 

136 

137class SkippableFrameHandler(_LZ4HandlerBase): 

138 """Can be anything, basically uncompressed data.""" 

139 

140 NAME = "lz4_skippable" 

141 PATTERNS = [HexString("5? 2A 4D 18")] 

142 

143 DOC = HandlerDoc( 

144 name="LZ4 (skippable)", 

145 description="LZ4 skippable format is designed to encapsulate arbitrary data within an LZ4 stream allowing compliant parsers to skip over it safely. This format does not contain compressed data itself but is often used for embedding metadata or non-LZ4 content alongside standard frames.", 

146 handler_type=HandlerType.COMPRESSION, 

147 vendor=None, 

148 references=[ 

149 Reference( 

150 title="LZ4 Frame Format Documentation", 

151 url="https://github.com/lz4/lz4/blob/dev/doc/lz4_Frame_format.md", 

152 ), 

153 Reference( 

154 title="LZ4 Wikipedia", 

155 url="https://en.wikipedia.org/wiki/LZ4_(compression_algorithm)", 

156 ), 

157 ], 

158 limitations=[], 

159 ) 

160 

161 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None: 

162 self._skip_magic_bytes(file) 

163 frame_size = convert_int32(file.read(FRAME_SIZE_LEN), Endian.LITTLE) 

164 file.seek(frame_size, io.SEEK_CUR) 

165 end_offset = file.tell() 

166 return ValidChunk(start_offset=start_offset, end_offset=end_offset) 

167 

168 

169class DefaultFrameHandler(_LZ4HandlerBase): 

170 """Modern version, most frequently used.""" 

171 

172 NAME = "lz4_default" 

173 

174 PATTERNS = [HexString("04 22 4D 18")] 

175 

176 DOC = HandlerDoc( 

177 name="LZ4", 

178 description="LZ4 is a high-speed lossless compression algorithm designed for real-time data compression with minimal memory usage.", 

179 handler_type=HandlerType.COMPRESSION, 

180 vendor=None, 

181 references=[ 

182 Reference( 

183 title="LZ4 Frame Format Documentation", 

184 url="https://github.com/lz4/lz4/blob/dev/doc/lz4_Frame_format.md", 

185 ), 

186 Reference( 

187 title="LZ4 Wikipedia", 

188 url="https://en.wikipedia.org/wiki/LZ4_(compression_algorithm)", 

189 ), 

190 ], 

191 limitations=[], 

192 ) 

193 

194 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None: 

195 self._skip_magic_bytes(file) 

196 

197 # 2. we parse the frame descriptor of dynamic size 

198 flg_bytes = file.read(FLG_LEN) 

199 raw_flg = convert_int8(flg_bytes, Endian.LITTLE) 

200 flg = FLG(raw_flg) 

201 logger.debug("Parsed FLG", **flg.as_dict()) 

202 

203 # skip BD (max blocksize), only useful for decoders that needs to allocate memory 

204 file.seek(BD_LEN, io.SEEK_CUR) 

205 

206 if flg.content_size: 

207 file.seek(CONTENT_SIZE_LEN, io.SEEK_CUR) 

208 if flg.dictid: 

209 file.seek(DICTID_LEN, io.SEEK_CUR) 

210 

211 header_checksum = convert_int8(file.read(HC_LEN), Endian.LITTLE) 

212 logger.debug("Header checksum (HC) read", header_checksum=header_checksum) 

213 

214 # 3. we read block by block until we hit the endmarker 

215 while True: 

216 block_size = convert_int32(file.read(BLOCK_SIZE_LEN), Endian.LITTLE) 

217 logger.debug("block_size", block_size=block_size) 

218 if block_size == END_MARK: 

219 break 

220 file.seek(block_size & BLOCK_SIZE_MASK, io.SEEK_CUR) 

221 if flg.block_checksum: 

222 file.seek(BLOCK_CHECKSUM_LEN, io.SEEK_CUR) 

223 

224 # 4. we reached the endmark (0x00000000) 

225 

226 # 5. if frame descriptor mentions CRC, we add CRC 

227 if flg.content_checksum: 

228 file.seek(CONTENT_CHECKSUM_LEN, io.SEEK_CUR) 

229 

230 end_offset = file.tell() 

231 

232 return ValidChunk(start_offset=start_offset, end_offset=end_offset)