Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/handlers/archive/tar.py: 94%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

108 statements  

1import contextlib 

2import os 

3import tarfile 

4from pathlib import Path 

5 

6from structlog import get_logger 

7 

8from ...file_utils import OffsetFile, SeekError, decode_int, round_up, snull 

9from ...models import ( 

10 Extractor, 

11 ExtractResult, 

12 File, 

13 HandlerDoc, 

14 HandlerType, 

15 HexString, 

16 Reference, 

17 Regex, 

18 StructHandler, 

19 ValidChunk, 

20) 

21from ._safe_tarfile import SafeTarFile 

22 

23logger = get_logger() 

24 

25 

26BLOCK_SIZE = 512 

27END_OF_ARCHIVE_MARKER_SIZE = 2 * BLOCK_SIZE 

28 

29MAGIC_OFFSET = 257 

30 

31ZERO_BLOCK = bytes([0]) * BLOCK_SIZE 

32 

33 

34def _get_tar_end_offset(file: File, offset=0): 

35 file_with_offset = OffsetFile(file, offset) 

36 

37 # First find the end of the last entry in the file 

38 last_offset = _get_end_of_last_tar_entry(file_with_offset) 

39 if last_offset == -1: 

40 return -1 

41 

42 # Then find where the final zero blocks end 

43 return offset + _find_end_of_padding(file_with_offset, find_from=last_offset) 

44 

45 

46def _get_end_of_last_tar_entry(file) -> int: 

47 try: 

48 tf = tarfile.TarFile(mode="r", fileobj=file) 

49 except tarfile.TarError: 

50 return -1 

51 

52 last_member = None 

53 

54 try: 

55 for member in tf: 

56 last_member = member 

57 except (tarfile.TarError, SeekError): 

58 # recover what's already been parsed 

59 pass 

60 

61 if last_member is None: 

62 return -1 

63 

64 end_of_last_tar_entry = tf.offset 

65 try: 

66 file.seek(end_of_last_tar_entry) 

67 except SeekError: 

68 # last tar entry is truncated 

69 end_of_last_tar_entry = last_member.offset 

70 file.seek(end_of_last_tar_entry) 

71 

72 return end_of_last_tar_entry 

73 

74 

75def _find_end_of_padding(file, *, find_from: int) -> int: 

76 find_from = round_up(find_from, BLOCK_SIZE) 

77 find_to = round_up(find_from + END_OF_ARCHIVE_MARKER_SIZE, tarfile.RECORDSIZE) 

78 

79 max_padding_blocks = (find_to - find_from) // BLOCK_SIZE 

80 

81 try: 

82 file.seek(find_from) 

83 except SeekError: 

84 # match to end of truncated file 

85 return file.seek(0, os.SEEK_END) 

86 

87 for padding_blocks in range(max_padding_blocks): # noqa: B007 

88 if file.read(BLOCK_SIZE) != ZERO_BLOCK: 

89 break 

90 else: 

91 padding_blocks = max_padding_blocks 

92 

93 return find_from + padding_blocks * BLOCK_SIZE 

94 

95 

96class TarExtractor(Extractor): 

97 def extract(self, inpath: Path, outdir: Path): 

98 with contextlib.closing(SafeTarFile(inpath)) as tarfile: 

99 tarfile.extractall(outdir) # noqa: S202 tarfile-unsafe-members 

100 return ExtractResult(reports=tarfile.reports) 

101 

102 

103class _TarHandler(StructHandler): 

104 NAME = "tar" 

105 

106 PATTERNS = [] 

107 

108 C_DEFINITIONS = r""" 

109 typedef struct posix_header 

110 { /* byte offset */ 

111 char name[100]; /* 0 */ 

112 char mode[8]; /* 100 */ 

113 char uid[8]; /* 108 */ 

114 char gid[8]; /* 116 */ 

115 char size[12]; /* 124 */ 

116 char mtime[12]; /* 136 */ 

117 char chksum[8]; /* 148 */ 

118 char typeflag; /* 156 */ 

119 char linkname[100]; /* 157 */ 

120 char magic[6]; /* 257 */ 

121 char version[2]; /* 263 */ 

122 char uname[32]; /* 265 */ 

123 char gname[32]; /* 297 */ 

124 char devmajor[8]; /* 329 */ 

125 char devminor[8]; /* 337 */ 

126 char prefix[155]; /* 345 */ 

127 /* 500 */ 

128 } posix_header_t; 

129 """ 

130 HEADER_STRUCT = "posix_header_t" 

131 

132 EXTRACTOR = TarExtractor() 

133 

134 def calculate_chunk(self, file: File, start_offset: int) -> ValidChunk | None: 

135 file.seek(start_offset) 

136 header = self.parse_header(file) 

137 header_size = snull(header.size) 

138 decode_int(header_size, 8) 

139 

140 def signed_sum(octets) -> int: 

141 return sum(b if b < 128 else 256 - b for b in octets) 

142 

143 if header.chksum[6:8] not in (b"\x00 ", b" \x00"): 

144 logger.debug( 

145 "Invalid checksum format", 

146 actual_last_2_bytes=header.chksum[6:8], 

147 handler=self.NAME, 

148 _verbosity=3, 

149 ) 

150 return None 

151 checksum = decode_int(header.chksum[:6], 8) 

152 header_bytes_for_checksum = ( 

153 file[start_offset : start_offset + 148] 

154 + b" " * 8 # chksum field is replaced with "blanks" 

155 + file[start_offset + 156 : start_offset + 257] 

156 ) 

157 extended_header_bytes = file[start_offset + 257 : start_offset + 500] 

158 calculated_checksum_unsigned = sum(header_bytes_for_checksum) 

159 calculated_checksum_signed = signed_sum(header_bytes_for_checksum) 

160 checksums = ( 

161 calculated_checksum_unsigned, 

162 calculated_checksum_unsigned + sum(extended_header_bytes), 

163 # signed is of historical interest, calculating for the extended header is not needed 

164 calculated_checksum_signed, 

165 ) 

166 if checksum not in checksums: 

167 logger.error( 

168 "Tar header checksum mismatch", expected=str(checksum), actual=checksums 

169 ) 

170 return None 

171 

172 end_offset = _get_tar_end_offset(file, start_offset) 

173 if end_offset == -1: 

174 return None 

175 return ValidChunk(start_offset=start_offset, end_offset=end_offset) 

176 

177 

178class TarUstarHandler(_TarHandler): 

179 PATTERNS = [ 

180 HexString("75 73 74 61 72 20 20 00"), 

181 HexString("75 73 74 61 72 00 30 30"), 

182 ] 

183 

184 # Since the magic is at 257, we have to subtract that from the match offset 

185 # to get to the start of the file. 

186 PATTERN_MATCH_OFFSET = -MAGIC_OFFSET 

187 

188 DOC = HandlerDoc( 

189 name="TAR (USTAR)", 

190 description="USTAR (Uniform Standard Tape Archive) tar files are extensions of the original tar format with additional metadata fields.", 

191 handler_type=HandlerType.ARCHIVE, 

192 vendor=None, 

193 references=[ 

194 Reference( 

195 title="USTAR Format Documentation", 

196 url="https://en.wikipedia.org/wiki/Tar_(computing)#USTAR_format", 

197 ), 

198 Reference( 

199 title="POSIX Tar Format Specification", 

200 url="https://pubs.opengroup.org/onlinepubs/9699919799/utilities/pax.html", 

201 ), 

202 ], 

203 limitations=[], 

204 ) 

205 

206 

207def _re_frame(regexp: str): 

208 """Wrap regexp to ensure its integrity from concatenation. 

209 

210 E.g.: when the regex 

211 a|b 

212 is naively appended by regex c, the result 

213 a|bc 

214 will not match "ac", while 

215 (a|b)c 

216 will match "ac" as intended. 

217 """ 

218 return f"({regexp})" 

219 

220 

221def _re_alternatives(regexps): 

222 return _re_frame("|".join(_re_frame(regexp) for regexp in regexps)) 

223 

224 

225def _padded_field(re_content_char, size, leftpad_re=" ", rightpad_re=r"[ \0x00]"): 

226 field_regexes = [] 

227 

228 for padsize in range(size): 

229 content_re = f"{re_content_char}{{{size - padsize}}}" 

230 

231 for leftpadsize in range(padsize + 1): 

232 rightpadsize = padsize - leftpadsize 

233 

234 left_re = f"{leftpad_re}{{{leftpadsize}}}" if leftpadsize else "" 

235 right_re = f"{rightpad_re}{{{rightpadsize}}}" if rightpadsize else "" 

236 

237 field_regexes.append(f"{left_re}{content_re}{right_re}") 

238 

239 return _re_alternatives(field_regexes) 

240 

241 

242class TarUnixHandler(_TarHandler): 

243 PATTERNS = [ 

244 Regex( 

245 r"" 

246 # (pattern would be too big) char name[100] 

247 + _padded_field(r"[0-7]", 8) # char mode[8] 

248 + _padded_field(r"[0-7]", 8) # char uid[8] 

249 + _padded_field(r"[0-7]", 8) # char gid[8] 

250 + _padded_field(r"[0-7]", 12) # char size[12] 

251 + _padded_field(r"[0-7]", 12) # char mtime[12] 

252 + _padded_field(r"[0-7]", 8) # char chksum[8] 

253 + r"[0-7\x00]" # char typeflag[1] - no extensions 

254 ), 

255 ] 

256 PATTERN_MATCH_OFFSET = -100 # go back to beginning of skipped name 

257 

258 DOC = HandlerDoc( 

259 name="TAR (Unix)", 

260 description="Unix tar files are a widely used archive format for storing files and directories with metadata.", 

261 handler_type=HandlerType.ARCHIVE, 

262 vendor=None, 

263 references=[ 

264 Reference( 

265 title="Unix Tar Format Documentation", 

266 url="https://en.wikipedia.org/wiki/Tar_(computing)", 

267 ), 

268 Reference( 

269 title="GNU Tar Manual", 

270 url="https://www.gnu.org/software/tar/manual/", 

271 ), 

272 ], 

273 limitations=[], 

274 )