Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/handlers/archive/tar.py: 94%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

109 statements  

1import contextlib 

2import os 

3import tarfile 

4from pathlib import Path 

5from typing import Optional 

6 

7from structlog import get_logger 

8 

9from ...file_utils import OffsetFile, SeekError, decode_int, round_up, snull 

10from ...models import ( 

11 Extractor, 

12 ExtractResult, 

13 File, 

14 HandlerDoc, 

15 HandlerType, 

16 HexString, 

17 Reference, 

18 Regex, 

19 StructHandler, 

20 ValidChunk, 

21) 

22from ._safe_tarfile import SafeTarFile 

23 

24logger = get_logger() 

25 

26 

27BLOCK_SIZE = 512 

28END_OF_ARCHIVE_MARKER_SIZE = 2 * BLOCK_SIZE 

29 

30MAGIC_OFFSET = 257 

31 

32ZERO_BLOCK = bytes([0]) * BLOCK_SIZE 

33 

34 

35def _get_tar_end_offset(file: File, offset=0): 

36 file_with_offset = OffsetFile(file, offset) 

37 

38 # First find the end of the last entry in the file 

39 last_offset = _get_end_of_last_tar_entry(file_with_offset) 

40 if last_offset == -1: 

41 return -1 

42 

43 # Then find where the final zero blocks end 

44 return offset + _find_end_of_padding(file_with_offset, find_from=last_offset) 

45 

46 

47def _get_end_of_last_tar_entry(file) -> int: 

48 try: 

49 tf = tarfile.TarFile(mode="r", fileobj=file) 

50 except tarfile.TarError: 

51 return -1 

52 

53 last_member = None 

54 

55 try: 

56 for member in tf: 

57 last_member = member 

58 except (tarfile.TarError, SeekError): 

59 # recover what's already been parsed 

60 pass 

61 

62 if last_member is None: 

63 return -1 

64 

65 end_of_last_tar_entry = tf.offset 

66 try: 

67 file.seek(end_of_last_tar_entry) 

68 except SeekError: 

69 # last tar entry is truncated 

70 end_of_last_tar_entry = last_member.offset 

71 file.seek(end_of_last_tar_entry) 

72 

73 return end_of_last_tar_entry 

74 

75 

76def _find_end_of_padding(file, *, find_from: int) -> int: 

77 find_from = round_up(find_from, BLOCK_SIZE) 

78 find_to = round_up(find_from + END_OF_ARCHIVE_MARKER_SIZE, tarfile.RECORDSIZE) 

79 

80 max_padding_blocks = (find_to - find_from) // BLOCK_SIZE 

81 

82 try: 

83 file.seek(find_from) 

84 except SeekError: 

85 # match to end of truncated file 

86 return file.seek(0, os.SEEK_END) 

87 

88 for padding_blocks in range(max_padding_blocks): # noqa: B007 

89 if file.read(BLOCK_SIZE) != ZERO_BLOCK: 

90 break 

91 else: 

92 padding_blocks = max_padding_blocks 

93 

94 return find_from + padding_blocks * BLOCK_SIZE 

95 

96 

97class TarExtractor(Extractor): 

98 def extract(self, inpath: Path, outdir: Path): 

99 with contextlib.closing(SafeTarFile(inpath)) as tarfile: 

100 tarfile.extractall(outdir) # noqa: S202 tarfile-unsafe-members 

101 return ExtractResult(reports=tarfile.reports) 

102 

103 

104class _TarHandler(StructHandler): 

105 NAME = "tar" 

106 

107 PATTERNS = [] 

108 

109 C_DEFINITIONS = r""" 

110 typedef struct posix_header 

111 { /* byte offset */ 

112 char name[100]; /* 0 */ 

113 char mode[8]; /* 100 */ 

114 char uid[8]; /* 108 */ 

115 char gid[8]; /* 116 */ 

116 char size[12]; /* 124 */ 

117 char mtime[12]; /* 136 */ 

118 char chksum[8]; /* 148 */ 

119 char typeflag; /* 156 */ 

120 char linkname[100]; /* 157 */ 

121 char magic[6]; /* 257 */ 

122 char version[2]; /* 263 */ 

123 char uname[32]; /* 265 */ 

124 char gname[32]; /* 297 */ 

125 char devmajor[8]; /* 329 */ 

126 char devminor[8]; /* 337 */ 

127 char prefix[155]; /* 345 */ 

128 /* 500 */ 

129 } posix_header_t; 

130 """ 

131 HEADER_STRUCT = "posix_header_t" 

132 

133 EXTRACTOR = TarExtractor() 

134 

135 def calculate_chunk(self, file: File, start_offset: int) -> Optional[ValidChunk]: 

136 file.seek(start_offset) 

137 header = self.parse_header(file) 

138 header_size = snull(header.size) 

139 decode_int(header_size, 8) 

140 

141 def signed_sum(octets) -> int: 

142 return sum(b if b < 128 else 256 - b for b in octets) 

143 

144 if header.chksum[6:8] not in (b"\x00 ", b" \x00"): 

145 logger.debug( 

146 "Invalid checksum format", 

147 actual_last_2_bytes=header.chksum[6:8], 

148 handler=self.NAME, 

149 _verbosity=3, 

150 ) 

151 return None 

152 checksum = decode_int(header.chksum[:6], 8) 

153 header_bytes_for_checksum = ( 

154 file[start_offset : start_offset + 148] 

155 + b" " * 8 # chksum field is replaced with "blanks" 

156 + file[start_offset + 156 : start_offset + 257] 

157 ) 

158 extended_header_bytes = file[start_offset + 257 : start_offset + 500] 

159 calculated_checksum_unsigned = sum(header_bytes_for_checksum) 

160 calculated_checksum_signed = signed_sum(header_bytes_for_checksum) 

161 checksums = ( 

162 calculated_checksum_unsigned, 

163 calculated_checksum_unsigned + sum(extended_header_bytes), 

164 # signed is of historical interest, calculating for the extended header is not needed 

165 calculated_checksum_signed, 

166 ) 

167 if checksum not in checksums: 

168 logger.error( 

169 "Tar header checksum mismatch", expected=str(checksum), actual=checksums 

170 ) 

171 return None 

172 

173 end_offset = _get_tar_end_offset(file, start_offset) 

174 if end_offset == -1: 

175 return None 

176 return ValidChunk(start_offset=start_offset, end_offset=end_offset) 

177 

178 

179class TarUstarHandler(_TarHandler): 

180 PATTERNS = [ 

181 HexString("75 73 74 61 72 20 20 00"), 

182 HexString("75 73 74 61 72 00 30 30"), 

183 ] 

184 

185 # Since the magic is at 257, we have to subtract that from the match offset 

186 # to get to the start of the file. 

187 PATTERN_MATCH_OFFSET = -MAGIC_OFFSET 

188 

189 DOC = HandlerDoc( 

190 name="TAR (USTAR)", 

191 description="USTAR (Uniform Standard Tape Archive) tar files are extensions of the original tar format with additional metadata fields.", 

192 handler_type=HandlerType.ARCHIVE, 

193 vendor=None, 

194 references=[ 

195 Reference( 

196 title="USTAR Format Documentation", 

197 url="https://en.wikipedia.org/wiki/Tar_(computing)#USTAR_format", 

198 ), 

199 Reference( 

200 title="POSIX Tar Format Specification", 

201 url="https://pubs.opengroup.org/onlinepubs/9699919799/utilities/pax.html", 

202 ), 

203 ], 

204 limitations=[], 

205 ) 

206 

207 

208def _re_frame(regexp: str): 

209 """Wrap regexp to ensure its integrity from concatenation. 

210 

211 E.g.: when the regex 

212 a|b 

213 is naively appended by regex c, the result 

214 a|bc 

215 will not match "ac", while 

216 (a|b)c 

217 will match "ac" as intended. 

218 """ 

219 return f"({regexp})" 

220 

221 

222def _re_alternatives(regexps): 

223 return _re_frame("|".join(_re_frame(regexp) for regexp in regexps)) 

224 

225 

226def _padded_field(re_content_char, size, leftpad_re=" ", rightpad_re=r"[ \0x00]"): 

227 field_regexes = [] 

228 

229 for padsize in range(size): 

230 content_re = f"{re_content_char}{{{size - padsize}}}" 

231 

232 for leftpadsize in range(padsize + 1): 

233 rightpadsize = padsize - leftpadsize 

234 

235 left_re = f"{leftpad_re}{{{leftpadsize}}}" if leftpadsize else "" 

236 right_re = f"{rightpad_re}{{{rightpadsize}}}" if rightpadsize else "" 

237 

238 field_regexes.append(f"{left_re}{content_re}{right_re}") 

239 

240 return _re_alternatives(field_regexes) 

241 

242 

243class TarUnixHandler(_TarHandler): 

244 PATTERNS = [ 

245 Regex( 

246 r"" 

247 # (pattern would be too big) char name[100] 

248 + _padded_field(r"[0-7]", 8) # char mode[8] 

249 + _padded_field(r"[0-7]", 8) # char uid[8] 

250 + _padded_field(r"[0-7]", 8) # char gid[8] 

251 + _padded_field(r"[0-7]", 12) # char size[12] 

252 + _padded_field(r"[0-7]", 12) # char mtime[12] 

253 + _padded_field(r"[0-7]", 8) # char chksum[8] 

254 + r"[0-7\x00]" # char typeflag[1] - no extensions 

255 ), 

256 ] 

257 PATTERN_MATCH_OFFSET = -100 # go back to beginning of skipped name 

258 

259 DOC = HandlerDoc( 

260 name="TAR (Unix)", 

261 description="Unix tar files are a widely used archive format for storing files and directories with metadata.", 

262 handler_type=HandlerType.ARCHIVE, 

263 vendor=None, 

264 references=[ 

265 Reference( 

266 title="Unix Tar Format Documentation", 

267 url="https://en.wikipedia.org/wiki/Tar_(computing)", 

268 ), 

269 Reference( 

270 title="GNU Tar Manual", 

271 url="https://www.gnu.org/software/tar/manual/", 

272 ), 

273 ], 

274 limitations=[], 

275 )