Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/handlers/compression/xz.py: 100%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

107 statements  

1import io 

2from typing import Optional 

3 

4import attrs 

5from pyperscan import Flag, Pattern, Scan, StreamDatabase 

6from structlog import get_logger 

7 

8from unblob.extractors import Command 

9 

10from ...file_utils import ( 

11 Endian, 

12 convert_int8, 

13 convert_int16, 

14 convert_int32, 

15 decode_multibyte_integer, 

16 read_until_past, 

17 round_up, 

18 stream_scan, 

19) 

20from ...models import ( 

21 File, 

22 Handler, 

23 HandlerDoc, 

24 HandlerType, 

25 HexString, 

26 InvalidInputFormat, 

27 Reference, 

28 ValidChunk, 

29) 

30 

31logger = get_logger() 

32 

33# The .xz file format definition: https://tukaani.org/xz/xz-file-format-1.0.4.txt 

34 

35STREAM_START_MAGIC = b"\xfd\x37\x7a\x58\x5a\x00" 

36 

37STREAM_END_MAGIC_PATTERNS = [ 

38 HexString("00 00 59 5A"), # None 

39 HexString("00 01 59 5A"), # CRC32 

40 HexString("00 04 59 5A"), # CRC64 

41 HexString("00 0A 59 5A"), # SHA-256 

42] 

43 

44NONE_STREAM_FLAG = 0x0 

45CRC32_STREAM_FLAG = 0x1 

46CRC64_STREAM_FLAG = 0x4 

47SHA256_STREAM_FLAG = 0xA 

48VALID_FLAGS = [ 

49 NONE_STREAM_FLAG, 

50 CRC32_STREAM_FLAG, 

51 CRC64_STREAM_FLAG, 

52 SHA256_STREAM_FLAG, 

53] 

54BACKWARD_SIZE_LEN = 4 

55MAX_MBI_LEN = 9 # maximum multi-byte integer size is 9, per XZ standard 

56XZ_PADDING = 4 # XZ format byte alignment 

57FLAG_LEN = 2 

58EOS_MAGIC_LEN = 2 

59CRC32_LEN = 4 

60STREAM_HEADER_LEN = len(STREAM_START_MAGIC) + FLAG_LEN + CRC32_LEN 

61STREAM_FOOTER_LEN = CRC32_LEN + BACKWARD_SIZE_LEN + FLAG_LEN + EOS_MAGIC_LEN 

62 

63 

64def build_stream_end_scan_db(pattern_list): 

65 return StreamDatabase( 

66 *(Pattern(p.as_regex(), Flag.SOM_LEFTMOST, Flag.DOTALL) for p in pattern_list) 

67 ) 

68 

69 

70hyperscan_stream_end_magic_db = build_stream_end_scan_db(STREAM_END_MAGIC_PATTERNS) 

71 

72 

73@attrs.define 

74class XZSearchContext: 

75 start_offset: int 

76 file: File 

77 end_streams_offset: int 

78 stream_flag: int 

79 

80 

81def read_multibyte_int(file: File) -> tuple[int, int]: 

82 """Read a multibyte integer and return the number of bytes read and the integer itself.""" 

83 data = bytearray(file.read(MAX_MBI_LEN)) 

84 file.seek(-MAX_MBI_LEN, io.SEEK_CUR) 

85 size, mbi = decode_multibyte_integer(data) 

86 file.seek(size, io.SEEK_CUR) 

87 return size, mbi 

88 

89 

90def get_stream_size(footer_offset: int, file: File) -> int: 

91 file.seek(footer_offset - BACKWARD_SIZE_LEN, io.SEEK_SET) 

92 backward_bytes = file.read(BACKWARD_SIZE_LEN) 

93 stored_backward_size = convert_int32(backward_bytes, Endian.LITTLE) 

94 real_backward_size = (stored_backward_size + 1) * 4 

95 

96 if real_backward_size > footer_offset - CRC32_LEN - BACKWARD_SIZE_LEN: 

97 raise InvalidInputFormat("Invalid backward size.") 

98 

99 # skip backwards to the end of the Index 

100 file.seek(-CRC32_LEN - BACKWARD_SIZE_LEN, io.SEEK_CUR) 

101 

102 # skip backwards of backward size to the start of index 

103 file.seek(-real_backward_size, io.SEEK_CUR) 

104 

105 index_size = 0 

106 index_indicator = convert_int8(file.read(1), Endian.LITTLE) 

107 # index indicator must be 0, per xz standard 

108 if index_indicator != 0: 

109 raise InvalidInputFormat("Invalid index indicator") 

110 

111 index_size += 1 

112 

113 # read Index 'Number of Records' 

114 size, num_records = read_multibyte_int(file) 

115 index_size += size 

116 

117 # read Record 'Unpadded Size' and 'Uncompressed Size' for every Record 

118 blocks_size = 0 

119 for _ in range(num_records): 

120 size, unpadded_size = read_multibyte_int(file) 

121 index_size += size 

122 

123 size, _ = read_multibyte_int(file) 

124 index_size += size 

125 

126 blocks_size += round_up(unpadded_size, XZ_PADDING) 

127 

128 index_size += CRC32_LEN 

129 

130 return round_up( 

131 (STREAM_HEADER_LEN + blocks_size + index_size + STREAM_FOOTER_LEN), 

132 XZ_PADDING, 

133 ) 

134 

135 

136def _hyperscan_match( 

137 context: XZSearchContext, pattern_id: int, offset: int, end: int 

138) -> Scan: 

139 del pattern_id, end # unused arguments 

140 # if we matched before our start offset, continue looking 

141 end_offset = offset + FLAG_LEN + EOS_MAGIC_LEN 

142 if end_offset < context.start_offset: 

143 return Scan.Continue 

144 

145 try: 

146 stream_size = get_stream_size(offset, context.file) 

147 except InvalidInputFormat: 

148 return Scan.Continue 

149 

150 # stream_size does not match, we continue our search 

151 if stream_size != (end_offset - context.start_offset): 

152 return Scan.Continue 

153 

154 # stream padding validation 

155 # padding MUST contain only null bytes and be 4 bytes aligned 

156 context.file.seek(end_offset) 

157 end_padding_offset = read_until_past(context.file, b"\x00") 

158 padding_size = end_padding_offset - end_offset 

159 if padding_size % 4 != 0: 

160 context.end_streams_offset = end_offset 

161 return Scan.Continue 

162 

163 # next magic validation 

164 context.end_streams_offset = end_padding_offset 

165 context.file.seek(end_padding_offset, io.SEEK_SET) 

166 magic = context.file.read(len(STREAM_START_MAGIC)) 

167 if magic == STREAM_START_MAGIC: 

168 context.start_offset = end_padding_offset 

169 return Scan.Continue 

170 return Scan.Terminate 

171 

172 

173class XZHandler(Handler): 

174 NAME = "xz" 

175 

176 PATTERNS = [HexString("FD 37 7A 58 5A 00")] 

177 

178 EXTRACTOR = Command("7z", "x", "-y", "{inpath}", "-so", stdout="xz.uncompressed") 

179 

180 DOC = HandlerDoc( 

181 name="XZ", 

182 description="XZ is a compressed file format that uses the LZMA2 algorithm for high compression efficiency. It is designed for general-purpose data compression with support for integrity checks and padding for alignment.", 

183 handler_type=HandlerType.COMPRESSION, 

184 vendor=None, 

185 references=[ 

186 Reference( 

187 title="XZ File Format Specification", 

188 url="https://tukaani.org/xz/xz-file-format-1.0.4.txt", 

189 ), 

190 Reference( 

191 title="XZ Wikipedia", 

192 url="https://en.wikipedia.org/wiki/XZ_Utils", 

193 ), 

194 ], 

195 limitations=[], 

196 ) 

197 

198 def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]: 

199 file.seek(start_offset + len(STREAM_START_MAGIC), io.SEEK_SET) 

200 stream_flag = convert_int16(file.read(2), Endian.BIG) 

201 if stream_flag not in VALID_FLAGS: 

202 raise InvalidInputFormat("Invalid stream flag for xz stream.") 

203 

204 context = XZSearchContext( 

205 start_offset=start_offset, 

206 file=file, 

207 end_streams_offset=-1, 

208 stream_flag=stream_flag, 

209 ) 

210 

211 try: 

212 scanner = hyperscan_stream_end_magic_db.build(context, _hyperscan_match) # type: ignore 

213 stream_scan(scanner, file) 

214 except Exception as e: 

215 logger.debug( 

216 "Error scanning for xz patterns", 

217 error=e, 

218 ) 

219 

220 if context.end_streams_offset > 0: 

221 return ValidChunk( 

222 start_offset=start_offset, end_offset=context.end_streams_offset 

223 ) 

224 

225 raise InvalidInputFormat("No valid xz compression stream was detected")