Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/handlers/archive/_safe_tarfile.py: 65%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

139 statements  

1import os 

2import tarfile 

3from pathlib import Path 

4from typing import Literal 

5 

6from structlog import get_logger 

7 

8from unblob.file_utils import is_safe_path 

9from unblob.report import ExtractionProblem 

10 

11logger = get_logger() 

12 

13RUNNING_AS_ROOT = os.getuid() == 0 

14MAX_PATH_LEN = 255 

15 

16 

17class UnblobTarInfo(tarfile.TarInfo): 

18 @classmethod 

19 def frombuf(cls, buf, encoding, errors): 

20 """Parse GNU headers without treating the prefix field as a pathname.""" 

21 return cls._frombuf(buf, encoding, errors) 

22 

23 @classmethod 

24 def _frombuf(cls, buf, encoding, errors, *, dircheck=True): # noqa: C901 

25 if len(buf) == 0: 

26 raise tarfile.EmptyHeaderError("empty header") # pyright: ignore[reportAttributeAccessIssue] 

27 if len(buf) != tarfile.BLOCKSIZE: 

28 raise tarfile.TruncatedHeaderError("truncated header") # pyright: ignore[reportAttributeAccessIssue] 

29 if buf.count(tarfile.NUL) == tarfile.BLOCKSIZE: 

30 raise tarfile.EOFHeaderError("end of file header") # pyright: ignore[reportAttributeAccessIssue] 

31 

32 chksum = tarfile.nti(buf[148:156]) # pyright: ignore[reportAttributeAccessIssue] 

33 if chksum not in tarfile.calc_chksums(buf): # pyright: ignore[reportAttributeAccessIssue] 

34 raise tarfile.InvalidHeaderError("bad checksum") # pyright: ignore[reportAttributeAccessIssue] 

35 

36 obj = cls() 

37 obj.name = tarfile.nts(buf[0:100], encoding, errors) # pyright: ignore[reportAttributeAccessIssue] 

38 obj.mode = tarfile.nti(buf[100:108]) # pyright: ignore[reportAttributeAccessIssue] 

39 obj.uid = tarfile.nti(buf[108:116]) # pyright: ignore[reportAttributeAccessIssue] 

40 obj.gid = tarfile.nti(buf[116:124]) # pyright: ignore[reportAttributeAccessIssue] 

41 obj.size = tarfile.nti(buf[124:136]) # pyright: ignore[reportAttributeAccessIssue] 

42 obj.mtime = tarfile.nti(buf[136:148]) # pyright: ignore[reportAttributeAccessIssue] 

43 obj.chksum = chksum 

44 obj.type = bytes(buf[156:157]) 

45 obj.linkname = tarfile.nts(buf[157:257], encoding, errors) # pyright: ignore[reportAttributeAccessIssue] 

46 obj.uname = tarfile.nts(buf[265:297], encoding, errors) # pyright: ignore[reportAttributeAccessIssue] 

47 obj.gname = tarfile.nts(buf[297:329], encoding, errors) # pyright: ignore[reportAttributeAccessIssue] 

48 obj.devmajor = tarfile.nti(buf[329:337]) # pyright: ignore[reportAttributeAccessIssue] 

49 obj.devminor = tarfile.nti(buf[337:345]) # pyright: ignore[reportAttributeAccessIssue] 

50 prefix = tarfile.nts(buf[345:500], encoding, errors) # pyright: ignore[reportAttributeAccessIssue] 

51 magic = buf[257:265] 

52 

53 if dircheck and obj.type == tarfile.AREGTYPE and obj.name.endswith("/"): 

54 obj.type = tarfile.DIRTYPE 

55 

56 if obj.type == tarfile.GNUTYPE_SPARSE: 

57 pos = 386 

58 structs = [] 

59 for _ in range(4): 

60 try: 

61 offset = tarfile.nti(buf[pos : pos + 12]) # pyright: ignore[reportAttributeAccessIssue] 

62 numbytes = tarfile.nti(buf[pos + 12 : pos + 24]) # pyright: ignore[reportAttributeAccessIssue] 

63 except ValueError: 

64 break 

65 structs.append((offset, numbytes)) 

66 pos += 24 

67 isextended = bool(buf[482]) 

68 origsize = tarfile.nti(buf[483:495]) # pyright: ignore[reportAttributeAccessIssue] 

69 obj._sparse_structs = (structs, isextended, origsize) 

70 

71 if obj.isdir(): 

72 obj.name = obj.name.rstrip("/") 

73 

74 if ( 

75 prefix 

76 and magic == tarfile.POSIX_MAGIC 

77 and obj.type not in tarfile.GNU_TYPES 

78 ): 

79 obj.name = prefix + "/" + obj.name 

80 return obj 

81 

82 

83def open_safe_tarfile( 

84 name=None, 

85 mode: Literal["r", "r:*", "r:", "r:gz", "r:bz2", "r:xz"] = "r", 

86 fileobj=None, 

87 **kwargs, 

88) -> tarfile.TarFile: 

89 return tarfile.open( # pyright: ignore[reportCallIssue] 

90 name=name, 

91 mode=mode, 

92 fileobj=fileobj, 

93 tarinfo=UnblobTarInfo, 

94 **kwargs, 

95 ) 

96 

97 

98class SafeTarFile: 

99 def __init__(self, inpath: Path): 

100 self.inpath = inpath 

101 self.reports = [] 

102 self.tarfile = open_safe_tarfile(inpath) 

103 if hasattr(self.tarfile, "extraction_filter") and hasattr( 

104 tarfile, "fully_trusted_filter" 

105 ): 

106 # Path and link safety checks happen in SafeTarFile before extraction. 

107 self.tarfile.extraction_filter = tarfile.fully_trusted_filter 

108 self.directories = {} 

109 

110 def close(self): 

111 self.tarfile.close() 

112 

113 def extractall(self, extract_root: Path): 

114 for member in self.tarfile.getmembers(): 

115 try: 

116 self.extract(member, extract_root) 

117 except Exception as e: 

118 self.record_problem(member, str(e), "Ignored.") 

119 self.fix_directories(extract_root) 

120 

121 def extract(self, tarinfo: tarfile.TarInfo, extract_root: Path): # noqa: C901 

122 if not tarinfo.name: 

123 self.record_problem( 

124 tarinfo, 

125 "File with empty filename in tar archive.", 

126 "Skipped.", 

127 ) 

128 return 

129 

130 if len(tarinfo.name) > MAX_PATH_LEN: 

131 self.record_problem( 

132 tarinfo, 

133 "File with filename too long in tar archive.", 

134 "Skipped.", 

135 ) 

136 return 

137 

138 if not RUNNING_AS_ROOT and (tarinfo.ischr() or tarinfo.isblk()): 

139 self.record_problem( 

140 tarinfo, 

141 "Missing elevated permissions for block and character device creation.", 

142 "Skipped.", 

143 ) 

144 return 

145 

146 # we do want to extract absolute paths, but they must be changed to prevent path traversal 

147 if Path(tarinfo.name).is_absolute(): 

148 self.record_problem( 

149 tarinfo, 

150 "Absolute path.", 

151 "Converted to extraction relative path.", 

152 ) 

153 tarinfo.name = str(Path(tarinfo.name).relative_to("/")) 

154 

155 # prevent traversal attempts through file name 

156 if not is_safe_path(basedir=extract_root, path=extract_root / tarinfo.name): 

157 self.record_problem( 

158 tarinfo, 

159 "Traversal attempt.", 

160 "Skipped.", 

161 ) 

162 return 

163 

164 # prevent traversal attempts through links 

165 if tarinfo.islnk() or tarinfo.issym(): 

166 if Path(tarinfo.linkname).is_absolute(): 

167 

168 def calculate_linkname(): 

169 root = extract_root.resolve() 

170 path = (extract_root / tarinfo.name).resolve() 

171 

172 if path.parts[: len(root.parts)] != root.parts: 

173 return None 

174 

175 depth = max(0, len(path.parts) - len(root.parts) - 1) 

176 return ("/".join([".."] * depth) or ".") + tarinfo.linkname 

177 

178 relative_linkname = calculate_linkname() 

179 if relative_linkname is None: 

180 self.record_problem( 

181 tarinfo, 

182 "Absolute path conversion to extraction relative failed - would escape root.", 

183 "Skipped.", 

184 ) 

185 return 

186 

187 assert not Path(relative_linkname).is_absolute() 

188 self.record_problem( 

189 tarinfo, 

190 "Absolute path as link target.", 

191 "Converted to extraction relative path.", 

192 ) 

193 tarinfo.linkname = relative_linkname 

194 

195 resolved_path = (extract_root / tarinfo.name).parent / tarinfo.linkname 

196 if not is_safe_path(basedir=extract_root, path=resolved_path): 

197 self.record_problem( 

198 tarinfo, 

199 "Traversal attempt through link path.", 

200 "Skipped.", 

201 ) 

202 return 

203 

204 target_path = extract_root / tarinfo.name 

205 # directories are special: we can not set their metadata now + they might also be already existing 

206 if tarinfo.isdir(): 

207 # save (potentially duplicate) dir metadata for applying at the end of the extraction 

208 self.directories[tarinfo.name] = tarinfo 

209 target_path.mkdir(parents=True, exist_ok=True) 

210 return 

211 

212 if target_path.exists(): 

213 self.record_problem( 

214 tarinfo, 

215 "Duplicate tar entry.", 

216 "Removed older version.", 

217 ) 

218 target_path.unlink() 

219 

220 self.tarfile.extract(tarinfo, extract_root) 

221 

222 def fix_directories(self, extract_root): 

223 """Complete directory extraction. 

224 

225 When extracting directories, setting metadata was intentionally skipped, 

226 so that entries under the directory can be extracted, even if the directory 

227 is write protected. 

228 """ 

229 # need to set the permissions from leafs to root 

230 directories = sorted( 

231 self.directories.values(), key=lambda d: d.name, reverse=True 

232 ) 

233 

234 # copied from tarfile.extractall(), it is somewhat ugly, as uses private helpers! 

235 for tarinfo in directories: 

236 dirpath = str(extract_root / tarinfo.name) 

237 try: 

238 self.tarfile.chown(tarinfo, dirpath, numeric_owner=True) 

239 self.tarfile.utime(tarinfo, dirpath) 

240 self.tarfile.chmod(tarinfo, dirpath) 

241 except tarfile.ExtractError as e: 

242 self.record_problem(tarinfo, str(e), "Ignored.") 

243 

244 def record_problem(self, tarinfo, problem, resolution): 

245 logger.warning(f"{problem} {resolution}", path=tarinfo.name) # noqa: G004 

246 self.reports.append( 

247 ExtractionProblem( 

248 path=tarinfo.name, 

249 problem=problem, 

250 resolution=resolution, 

251 ) 

252 )