Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/handlers/compression/xz.py: 100%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

106 statements  

1import io 

2 

3import attrs 

4from pyperscan import Flag, Pattern, Scan, StreamDatabase 

5from structlog import get_logger 

6 

7from unblob.extractors import Command 

8 

9from ...file_utils import ( 

10 Endian, 

11 convert_int8, 

12 convert_int16, 

13 convert_int32, 

14 decode_multibyte_integer, 

15 read_until_past, 

16 round_up, 

17 stream_scan, 

18) 

19from ...models import ( 

20 File, 

21 Handler, 

22 HandlerDoc, 

23 HandlerType, 

24 HexString, 

25 InvalidInputFormat, 

26 Reference, 

27 ValidChunk, 

28) 

29 

30logger = get_logger() 

31 

32# The .xz file format definition: https://tukaani.org/xz/xz-file-format-1.0.4.txt 

33 

34STREAM_START_MAGIC = b"\xfd\x37\x7a\x58\x5a\x00" 

35 

36STREAM_END_MAGIC_PATTERNS = [ 

37 HexString("00 00 59 5A"), # None 

38 HexString("00 01 59 5A"), # CRC32 

39 HexString("00 04 59 5A"), # CRC64 

40 HexString("00 0A 59 5A"), # SHA-256 

41] 

42 

43NONE_STREAM_FLAG = 0x0 

44CRC32_STREAM_FLAG = 0x1 

45CRC64_STREAM_FLAG = 0x4 

46SHA256_STREAM_FLAG = 0xA 

47VALID_FLAGS = [ 

48 NONE_STREAM_FLAG, 

49 CRC32_STREAM_FLAG, 

50 CRC64_STREAM_FLAG, 

51 SHA256_STREAM_FLAG, 

52] 

53BACKWARD_SIZE_LEN = 4 

54MAX_MBI_LEN = 9 # maximum multi-byte integer size is 9, per XZ standard 

55XZ_PADDING = 4 # XZ format byte alignment 

56FLAG_LEN = 2 

57EOS_MAGIC_LEN = 2 

58CRC32_LEN = 4 

59STREAM_HEADER_LEN = len(STREAM_START_MAGIC) + FLAG_LEN + CRC32_LEN 

60STREAM_FOOTER_LEN = CRC32_LEN + BACKWARD_SIZE_LEN + FLAG_LEN + EOS_MAGIC_LEN 

61 

62 

63def build_stream_end_scan_db(pattern_list): 

64 return StreamDatabase( 

65 *(Pattern(p.as_regex(), Flag.SOM_LEFTMOST, Flag.DOTALL) for p in pattern_list) 

66 ) 

67 

68 

69hyperscan_stream_end_magic_db = build_stream_end_scan_db(STREAM_END_MAGIC_PATTERNS) 

70 

71 

72@attrs.define 

73class XZSearchContext: 

74 start_offset: int 

75 file: File 

76 end_streams_offset: int 

77 stream_flag: int 

78 

79 

80def read_multibyte_int(file: File) -> tuple[int, int]: 

81 """Read a multibyte integer and return the number of bytes read and the integer itself.""" 

82 data = bytearray(file.read(MAX_MBI_LEN)) 

83 file.seek(-MAX_MBI_LEN, io.SEEK_CUR) 

84 size, mbi = decode_multibyte_integer(data) 

85 file.seek(size, io.SEEK_CUR) 

86 return size, mbi 

87 

88 

89def get_stream_size(footer_offset: int, file: File) -> int: 

90 file.seek(footer_offset - BACKWARD_SIZE_LEN, io.SEEK_SET) 

91 backward_bytes = file.read(BACKWARD_SIZE_LEN) 

92 stored_backward_size = convert_int32(backward_bytes, Endian.LITTLE) 

93 real_backward_size = (stored_backward_size + 1) * 4 

94 

95 if real_backward_size > footer_offset - CRC32_LEN - BACKWARD_SIZE_LEN: 

96 raise InvalidInputFormat("Invalid backward size.") 

97 

98 # skip backwards to the end of the Index 

99 file.seek(-CRC32_LEN - BACKWARD_SIZE_LEN, io.SEEK_CUR) 

100 

101 # skip backwards of backward size to the start of index 

102 file.seek(-real_backward_size, io.SEEK_CUR) 

103 

104 index_size = 0 

105 index_indicator = convert_int8(file.read(1), Endian.LITTLE) 

106 # index indicator must be 0, per xz standard 

107 if index_indicator != 0: 

108 raise InvalidInputFormat("Invalid index indicator") 

109 

110 index_size += 1 

111 

112 # read Index 'Number of Records' 

113 size, num_records = read_multibyte_int(file) 

114 index_size += size 

115 

116 # read Record 'Unpadded Size' and 'Uncompressed Size' for every Record 

117 blocks_size = 0 

118 for _ in range(num_records): 

119 size, unpadded_size = read_multibyte_int(file) 

120 index_size += size 

121 

122 size, _ = read_multibyte_int(file) 

123 index_size += size 

124 

125 blocks_size += round_up(unpadded_size, XZ_PADDING) 

126 

127 index_size += CRC32_LEN 

128 

129 return round_up( 

130 (STREAM_HEADER_LEN + blocks_size + index_size + STREAM_FOOTER_LEN), 

131 XZ_PADDING, 

132 ) 

133 

134 

135def _hyperscan_match( 

136 context: XZSearchContext, pattern_id: int, offset: int, end: int 

137) -> Scan: 

138 del pattern_id, end # unused arguments 

139 # if we matched before our start offset, continue looking 

140 end_offset = offset + FLAG_LEN + EOS_MAGIC_LEN 

141 if end_offset < context.start_offset: 

142 return Scan.Continue 

143 

144 try: 

145 stream_size = get_stream_size(offset, context.file) 

146 except InvalidInputFormat: 

147 return Scan.Continue 

148 

149 # stream_size does not match, we continue our search 

150 if stream_size != (end_offset - context.start_offset): 

151 return Scan.Continue 

152 

153 # stream padding validation 

154 # padding MUST contain only null bytes and be 4 bytes aligned 

155 context.file.seek(end_offset) 

156 end_padding_offset = read_until_past(context.file, b"\x00") 

157 padding_size = end_padding_offset - end_offset 

158 if padding_size % 4 != 0: 

159 context.end_streams_offset = end_offset 

160 return Scan.Continue 

161 

162 # next magic validation 

163 context.end_streams_offset = end_padding_offset 

164 context.file.seek(end_padding_offset, io.SEEK_SET) 

165 magic = context.file.read(len(STREAM_START_MAGIC)) 

166 if magic == STREAM_START_MAGIC: 

167 context.start_offset = end_padding_offset 

168 return Scan.Continue 

169 return Scan.Terminate 

170 

171 

172class XZHandler(Handler): 

173 NAME = "xz" 

174 

175 PATTERNS = [HexString("FD 37 7A 58 5A 00")] 

176 

177 EXTRACTOR = Command("7z", "x", "-y", "{inpath}", "-so", stdout="xz.uncompressed") 

178 

179 DOC = HandlerDoc( 

180 name="XZ", 

181 description="XZ is a compressed file format that uses the LZMA2 algorithm for high compression efficiency. It is designed for general-purpose data compression with support for integrity checks and padding for alignment.", 

182 handler_type=HandlerType.COMPRESSION, 

183 vendor=None, 

184 references=[ 

185 Reference( 

186 title="XZ File Format Specification", 

187 url="https://tukaani.org/xz/xz-file-format-1.0.4.txt", 

188 ), 

189 Reference( 

190 title="XZ Wikipedia", 

191 url="https://en.wikipedia.org/wiki/XZ_Utils", 

192 ), 

193 ], 

194 limitations=[], 

195 ) 

196 

197 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None: 

198 file.seek(start_offset + len(STREAM_START_MAGIC), io.SEEK_SET) 

199 stream_flag = convert_int16(file.read(2), Endian.BIG) 

200 if stream_flag not in VALID_FLAGS: 

201 raise InvalidInputFormat("Invalid stream flag for xz stream.") 

202 

203 context = XZSearchContext( 

204 start_offset=start_offset, 

205 file=file, 

206 end_streams_offset=-1, 

207 stream_flag=stream_flag, 

208 ) 

209 

210 try: 

211 scanner = hyperscan_stream_end_magic_db.build(context, _hyperscan_match) # type: ignore 

212 stream_scan(scanner, file) 

213 except Exception as e: 

214 logger.debug( 

215 "Error scanning for xz patterns", 

216 error=e, 

217 ) 

218 

219 if context.end_streams_offset > 0: 

220 return ValidChunk( 

221 start_offset=start_offset, end_offset=context.end_streams_offset 

222 ) 

223 

224 raise InvalidInputFormat("No valid xz compression stream was detected")