Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/handlers/compression/lz4.py: 98%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

102 statements  

1"""LZ4 handler. 

2 

3Frame format definition: https://github.com/lz4/lz4/blob/dev/doc/lz4_Frame_format.md. 

4""" 

5 

6import io 

7from typing import Optional 

8 

9from lz4.block import LZ4BlockError, decompress 

10from structlog import get_logger 

11 

12from unblob.extractors import Command 

13 

14from ...file_utils import Endian, InvalidInputFormat, convert_int8, convert_int32 

15from ...models import ( 

16 File, 

17 Handler, 

18 HandlerDoc, 

19 HandlerType, 

20 HexString, 

21 Reference, 

22 ValidChunk, 

23) 

24 

25logger = get_logger() 

26 

27SKIPPABLE_FRAMES_MAGIC = [0x184D2A50 + i for i in range(16)] 

28FRAME_MAGIC = 0x184D2204 

29LEGACY_FRAME_MAGIC = 0x184C2102 

30FRAME_MAGICS = [*SKIPPABLE_FRAMES_MAGIC, FRAME_MAGIC, LEGACY_FRAME_MAGIC] 

31 

32_1BIT = 0x01 

33_2BITS = 0x03 

34 

35END_MARK = 0x00000000 

36 

37CONTENT_SIZE_LEN = 8 

38BLOCK_SIZE_LEN = FRAME_SIZE_LEN = BLOCK_CHECKSUM_LEN = CONTENT_CHECKSUM_LEN = ( 

39 MAGIC_LEN 

40) = DICTID_LEN = 4 

41FLG_LEN = BD_LEN = HC_LEN = 1 

42MAX_LEGACY_BLOCK_SIZE = 8 * 1024 * 1024 # 8 MB 

43 

44 

45class FLG: 

46 """Represents the FLG field.""" 

47 

48 version: int = 0 

49 block_independence: int = 0 

50 block_checksum: int = 0 

51 content_size: int = 0 

52 content_checksum: int = 0 

53 dictid: int = 0 

54 

55 def __init__(self, raw_flg: int): 

56 self.version = (raw_flg >> 6) & _2BITS 

57 self.block_independence = (raw_flg >> 5) & _1BIT 

58 self.block_checksum = (raw_flg >> 4) & _1BIT 

59 self.content_size = (raw_flg >> 3) & _1BIT 

60 self.content_checksum = (raw_flg >> 2) & _1BIT 

61 self.dictid = raw_flg & _1BIT 

62 

63 def as_dict(self) -> dict: 

64 return { 

65 "version": self.version, 

66 "block_independence": self.block_independence, 

67 "block_checksum": self.block_checksum, 

68 "content_size": self.content_size, 

69 "content_checksum": self.content_checksum, 

70 "dictid": self.dictid, 

71 } 

72 

73 

74class _LZ4HandlerBase(Handler): 

75 """A common base for all LZ4 formats.""" 

76 

77 def _skip_magic_bytes(self, file: File): 

78 file.seek(MAGIC_LEN, io.SEEK_CUR) 

79 

80 EXTRACTOR = Command("lz4", "--decompress", "{inpath}", "{outdir}/lz4.uncompressed") 

81 

82 

83class LegacyFrameHandler(_LZ4HandlerBase): 

84 NAME = "lz4_legacy" 

85 PATTERNS = [HexString("02 21 4C 18")] 

86 

87 DOC = HandlerDoc( 

88 name="LZ4 (legacy)", 

89 description="LZ4 legacy format is an older framing format used prior to the LZ4 Frame specification, featuring a simpler structure and no support for skippable frames or extensive metadata. Unlike the default LZ4 Frame format, it lacks built-in checksums, versioning, or block independence flags, making it less robust and primarily used for backward compatibility.", 

90 handler_type=HandlerType.COMPRESSION, 

91 vendor=None, 

92 references=[ 

93 Reference( 

94 title="LZ4 Frame Format Documentation", 

95 url="https://github.com/lz4/lz4/blob/dev/doc/lz4_Frame_format.md", 

96 ), 

97 Reference( 

98 title="LZ4 Wikipedia", 

99 url="https://en.wikipedia.org/wiki/LZ4_(compression_algorithm)", 

100 ), 

101 ], 

102 limitations=[], 

103 ) 

104 

105 def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]: 

106 self._skip_magic_bytes(file) 

107 

108 while True: 

109 # The last block is detected either because it is followed by the “EOF” (End of File) mark, 

110 # or because it is followed by a known Frame Magic Number. 

111 raw_bsize = file.read(BLOCK_SIZE_LEN) 

112 if raw_bsize == b"": # EOF 

113 break 

114 

115 block_compressed_size = convert_int32(raw_bsize, Endian.LITTLE) 

116 if block_compressed_size in FRAME_MAGICS: 

117 # next magic, read too far 

118 file.seek(-4, io.SEEK_CUR) 

119 break 

120 

121 compressed_block = file.read(block_compressed_size) 

122 try: 

123 uncompressed_block = decompress(compressed_block, MAX_LEGACY_BLOCK_SIZE) 

124 except LZ4BlockError: 

125 raise InvalidInputFormat("Invalid LZ4 legacy frame.") from None 

126 

127 # See 'fixed block size' in https://android.googlesource.com/platform/external/lz4/+/HEAD/doc/lz4_Frame_format.md#legacy-frame 

128 if len(uncompressed_block) < MAX_LEGACY_BLOCK_SIZE: 

129 break 

130 

131 end_offset = file.tell() 

132 return ValidChunk(start_offset=start_offset, end_offset=end_offset) 

133 

134 

135class SkippableFrameHandler(_LZ4HandlerBase): 

136 """Can be anything, basically uncompressed data.""" 

137 

138 NAME = "lz4_skippable" 

139 PATTERNS = [HexString("5? 2A 4D 18")] 

140 

141 DOC = HandlerDoc( 

142 name="LZ4 (skippable)", 

143 description="LZ4 skippable format is designed to encapsulate arbitrary data within an LZ4 stream allowing compliant parsers to skip over it safely. This format does not contain compressed data itself but is often used for embedding metadata or non-LZ4 content alongside standard frames.", 

144 handler_type=HandlerType.COMPRESSION, 

145 vendor=None, 

146 references=[ 

147 Reference( 

148 title="LZ4 Frame Format Documentation", 

149 url="https://github.com/lz4/lz4/blob/dev/doc/lz4_Frame_format.md", 

150 ), 

151 Reference( 

152 title="LZ4 Wikipedia", 

153 url="https://en.wikipedia.org/wiki/LZ4_(compression_algorithm)", 

154 ), 

155 ], 

156 limitations=[], 

157 ) 

158 

159 def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]: 

160 self._skip_magic_bytes(file) 

161 frame_size = convert_int32(file.read(FRAME_SIZE_LEN), Endian.LITTLE) 

162 file.seek(frame_size, io.SEEK_CUR) 

163 end_offset = file.tell() 

164 return ValidChunk(start_offset=start_offset, end_offset=end_offset) 

165 

166 

167class DefaultFrameHandler(_LZ4HandlerBase): 

168 """Modern version, most frequently used.""" 

169 

170 NAME = "lz4_default" 

171 

172 PATTERNS = [HexString("04 22 4D 18")] 

173 

174 DOC = HandlerDoc( 

175 name="LZ4", 

176 description="LZ4 is a high-speed lossless compression algorithm designed for real-time data compression with minimal memory usage.", 

177 handler_type=HandlerType.COMPRESSION, 

178 vendor=None, 

179 references=[ 

180 Reference( 

181 title="LZ4 Frame Format Documentation", 

182 url="https://github.com/lz4/lz4/blob/dev/doc/lz4_Frame_format.md", 

183 ), 

184 Reference( 

185 title="LZ4 Wikipedia", 

186 url="https://en.wikipedia.org/wiki/LZ4_(compression_algorithm)", 

187 ), 

188 ], 

189 limitations=[], 

190 ) 

191 

192 def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]: 

193 self._skip_magic_bytes(file) 

194 

195 # 2. we parse the frame descriptor of dynamic size 

196 flg_bytes = file.read(FLG_LEN) 

197 raw_flg = convert_int8(flg_bytes, Endian.LITTLE) 

198 flg = FLG(raw_flg) 

199 logger.debug("Parsed FLG", **flg.as_dict()) 

200 

201 # skip BD (max blocksize), only useful for decoders that needs to allocate memory 

202 file.seek(BD_LEN, io.SEEK_CUR) 

203 

204 if flg.content_size: 

205 file.seek(CONTENT_SIZE_LEN, io.SEEK_CUR) 

206 if flg.dictid: 

207 file.seek(DICTID_LEN, io.SEEK_CUR) 

208 

209 header_checksum = convert_int8(file.read(HC_LEN), Endian.LITTLE) 

210 logger.debug("Header checksum (HC) read", header_checksum=header_checksum) 

211 

212 # 3. we read block by block until we hit the endmarker 

213 while True: 

214 block_size = convert_int32(file.read(BLOCK_SIZE_LEN), Endian.LITTLE) 

215 logger.debug("block_size", block_size=block_size) 

216 if block_size == END_MARK: 

217 break 

218 file.seek(block_size, io.SEEK_CUR) 

219 if flg.block_checksum: 

220 file.seek(BLOCK_CHECKSUM_LEN, io.SEEK_CUR) 

221 

222 # 4. we reached the endmark (0x00000000) 

223 

224 # 5. if frame descriptor mentions CRC, we add CRC 

225 if flg.content_checksum: 

226 file.seek(CONTENT_CHECKSUM_LEN, io.SEEK_CUR) 

227 

228 end_offset = file.tell() 

229 

230 return ValidChunk(start_offset=start_offset, end_offset=end_offset)