Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/handlers/compression/lz4.py: 98%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

101 statements  

1"""LZ4 handler. 

2 

3Frame format definition: https://github.com/lz4/lz4/blob/dev/doc/lz4_Frame_format.md. 

4""" 

5 

6import io 

7 

8from lz4.block import LZ4BlockError, decompress 

9from structlog import get_logger 

10 

11from unblob.extractors import Command 

12 

13from ...file_utils import Endian, InvalidInputFormat, convert_int8, convert_int32 

14from ...models import ( 

15 File, 

16 Handler, 

17 HandlerDoc, 

18 HandlerType, 

19 HexString, 

20 Reference, 

21 ValidChunk, 

22) 

23 

24logger = get_logger() 

25 

26SKIPPABLE_FRAMES_MAGIC = [0x184D2A50 + i for i in range(16)] 

27FRAME_MAGIC = 0x184D2204 

28LEGACY_FRAME_MAGIC = 0x184C2102 

29FRAME_MAGICS = [*SKIPPABLE_FRAMES_MAGIC, FRAME_MAGIC, LEGACY_FRAME_MAGIC] 

30 

31_1BIT = 0x01 

32_2BITS = 0x03 

33 

34END_MARK = 0x00000000 

35 

36CONTENT_SIZE_LEN = 8 

37BLOCK_SIZE_LEN = FRAME_SIZE_LEN = BLOCK_CHECKSUM_LEN = CONTENT_CHECKSUM_LEN = ( 

38 MAGIC_LEN 

39) = DICTID_LEN = 4 

40FLG_LEN = BD_LEN = HC_LEN = 1 

41MAX_LEGACY_BLOCK_SIZE = 8 * 1024 * 1024 # 8 MB 

42 

43 

44class FLG: 

45 """Represents the FLG field.""" 

46 

47 version: int = 0 

48 block_independence: int = 0 

49 block_checksum: int = 0 

50 content_size: int = 0 

51 content_checksum: int = 0 

52 dictid: int = 0 

53 

54 def __init__(self, raw_flg: int): 

55 self.version = (raw_flg >> 6) & _2BITS 

56 self.block_independence = (raw_flg >> 5) & _1BIT 

57 self.block_checksum = (raw_flg >> 4) & _1BIT 

58 self.content_size = (raw_flg >> 3) & _1BIT 

59 self.content_checksum = (raw_flg >> 2) & _1BIT 

60 self.dictid = raw_flg & _1BIT 

61 

62 def as_dict(self) -> dict: 

63 return { 

64 "version": self.version, 

65 "block_independence": self.block_independence, 

66 "block_checksum": self.block_checksum, 

67 "content_size": self.content_size, 

68 "content_checksum": self.content_checksum, 

69 "dictid": self.dictid, 

70 } 

71 

72 

73class _LZ4HandlerBase(Handler): 

74 """A common base for all LZ4 formats.""" 

75 

76 def _skip_magic_bytes(self, file: File): 

77 file.seek(MAGIC_LEN, io.SEEK_CUR) 

78 

79 EXTRACTOR = Command("lz4", "--decompress", "{inpath}", "{outdir}/lz4.uncompressed") 

80 

81 

82class LegacyFrameHandler(_LZ4HandlerBase): 

83 NAME = "lz4_legacy" 

84 PATTERNS = [HexString("02 21 4C 18")] 

85 

86 DOC = HandlerDoc( 

87 name="LZ4 (legacy)", 

88 description="LZ4 legacy format is an older framing format used prior to the LZ4 Frame specification, featuring a simpler structure and no support for skippable frames or extensive metadata. Unlike the default LZ4 Frame format, it lacks built-in checksums, versioning, or block independence flags, making it less robust and primarily used for backward compatibility.", 

89 handler_type=HandlerType.COMPRESSION, 

90 vendor=None, 

91 references=[ 

92 Reference( 

93 title="LZ4 Frame Format Documentation", 

94 url="https://github.com/lz4/lz4/blob/dev/doc/lz4_Frame_format.md", 

95 ), 

96 Reference( 

97 title="LZ4 Wikipedia", 

98 url="https://en.wikipedia.org/wiki/LZ4_(compression_algorithm)", 

99 ), 

100 ], 

101 limitations=[], 

102 ) 

103 

104 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None: 

105 self._skip_magic_bytes(file) 

106 

107 while True: 

108 # The last block is detected either because it is followed by the “EOF” (End of File) mark, 

109 # or because it is followed by a known Frame Magic Number. 

110 raw_bsize = file.read(BLOCK_SIZE_LEN) 

111 if raw_bsize == b"": # EOF 

112 break 

113 

114 block_compressed_size = convert_int32(raw_bsize, Endian.LITTLE) 

115 if block_compressed_size in FRAME_MAGICS: 

116 # next magic, read too far 

117 file.seek(-4, io.SEEK_CUR) 

118 break 

119 

120 compressed_block = file.read(block_compressed_size) 

121 try: 

122 uncompressed_block = decompress(compressed_block, MAX_LEGACY_BLOCK_SIZE) 

123 except LZ4BlockError: 

124 raise InvalidInputFormat("Invalid LZ4 legacy frame.") from None 

125 

126 # See 'fixed block size' in https://android.googlesource.com/platform/external/lz4/+/HEAD/doc/lz4_Frame_format.md#legacy-frame 

127 if len(uncompressed_block) < MAX_LEGACY_BLOCK_SIZE: 

128 break 

129 

130 end_offset = file.tell() 

131 return ValidChunk(start_offset=start_offset, end_offset=end_offset) 

132 

133 

134class SkippableFrameHandler(_LZ4HandlerBase): 

135 """Can be anything, basically uncompressed data.""" 

136 

137 NAME = "lz4_skippable" 

138 PATTERNS = [HexString("5? 2A 4D 18")] 

139 

140 DOC = HandlerDoc( 

141 name="LZ4 (skippable)", 

142 description="LZ4 skippable format is designed to encapsulate arbitrary data within an LZ4 stream allowing compliant parsers to skip over it safely. This format does not contain compressed data itself but is often used for embedding metadata or non-LZ4 content alongside standard frames.", 

143 handler_type=HandlerType.COMPRESSION, 

144 vendor=None, 

145 references=[ 

146 Reference( 

147 title="LZ4 Frame Format Documentation", 

148 url="https://github.com/lz4/lz4/blob/dev/doc/lz4_Frame_format.md", 

149 ), 

150 Reference( 

151 title="LZ4 Wikipedia", 

152 url="https://en.wikipedia.org/wiki/LZ4_(compression_algorithm)", 

153 ), 

154 ], 

155 limitations=[], 

156 ) 

157 

158 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None: 

159 self._skip_magic_bytes(file) 

160 frame_size = convert_int32(file.read(FRAME_SIZE_LEN), Endian.LITTLE) 

161 file.seek(frame_size, io.SEEK_CUR) 

162 end_offset = file.tell() 

163 return ValidChunk(start_offset=start_offset, end_offset=end_offset) 

164 

165 

166class DefaultFrameHandler(_LZ4HandlerBase): 

167 """Modern version, most frequently used.""" 

168 

169 NAME = "lz4_default" 

170 

171 PATTERNS = [HexString("04 22 4D 18")] 

172 

173 DOC = HandlerDoc( 

174 name="LZ4", 

175 description="LZ4 is a high-speed lossless compression algorithm designed for real-time data compression with minimal memory usage.", 

176 handler_type=HandlerType.COMPRESSION, 

177 vendor=None, 

178 references=[ 

179 Reference( 

180 title="LZ4 Frame Format Documentation", 

181 url="https://github.com/lz4/lz4/blob/dev/doc/lz4_Frame_format.md", 

182 ), 

183 Reference( 

184 title="LZ4 Wikipedia", 

185 url="https://en.wikipedia.org/wiki/LZ4_(compression_algorithm)", 

186 ), 

187 ], 

188 limitations=[], 

189 ) 

190 

191 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None: 

192 self._skip_magic_bytes(file) 

193 

194 # 2. we parse the frame descriptor of dynamic size 

195 flg_bytes = file.read(FLG_LEN) 

196 raw_flg = convert_int8(flg_bytes, Endian.LITTLE) 

197 flg = FLG(raw_flg) 

198 logger.debug("Parsed FLG", **flg.as_dict()) 

199 

200 # skip BD (max blocksize), only useful for decoders that needs to allocate memory 

201 file.seek(BD_LEN, io.SEEK_CUR) 

202 

203 if flg.content_size: 

204 file.seek(CONTENT_SIZE_LEN, io.SEEK_CUR) 

205 if flg.dictid: 

206 file.seek(DICTID_LEN, io.SEEK_CUR) 

207 

208 header_checksum = convert_int8(file.read(HC_LEN), Endian.LITTLE) 

209 logger.debug("Header checksum (HC) read", header_checksum=header_checksum) 

210 

211 # 3. we read block by block until we hit the endmarker 

212 while True: 

213 block_size = convert_int32(file.read(BLOCK_SIZE_LEN), Endian.LITTLE) 

214 logger.debug("block_size", block_size=block_size) 

215 if block_size == END_MARK: 

216 break 

217 file.seek(block_size, io.SEEK_CUR) 

218 if flg.block_checksum: 

219 file.seek(BLOCK_CHECKSUM_LEN, io.SEEK_CUR) 

220 

221 # 4. we reached the endmark (0x00000000) 

222 

223 # 5. if frame descriptor mentions CRC, we add CRC 

224 if flg.content_checksum: 

225 file.seek(CONTENT_CHECKSUM_LEN, io.SEEK_CUR) 

226 

227 end_offset = file.tell() 

228 

229 return ValidChunk(start_offset=start_offset, end_offset=end_offset)