Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/unblob/handlers/archive/_safe_tarfile.py: 67%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

136 statements  

1import os 

2import tarfile 

3from pathlib import Path 

4from typing import Literal 

5 

6from structlog import get_logger 

7 

8from unblob.file_utils import is_safe_path 

9from unblob.report import ExtractionProblem 

10 

11logger = get_logger() 

12 

13RUNNING_AS_ROOT = os.getuid() == 0 

14MAX_PATH_LEN = 255 

15 

16 

17class UnblobTarInfo(tarfile.TarInfo): 

18 @classmethod 

19 def frombuf(cls, buf, encoding, errors): # noqa: C901 

20 """Parse GNU headers without treating the prefix field as a pathname.""" 

21 if len(buf) == 0: 

22 raise tarfile.EmptyHeaderError("empty header") # pyright: ignore[reportAttributeAccessIssue] 

23 if len(buf) != tarfile.BLOCKSIZE: 

24 raise tarfile.TruncatedHeaderError("truncated header") # pyright: ignore[reportAttributeAccessIssue] 

25 if buf.count(tarfile.NUL) == tarfile.BLOCKSIZE: 

26 raise tarfile.EOFHeaderError("end of file header") # pyright: ignore[reportAttributeAccessIssue] 

27 

28 chksum = tarfile.nti(buf[148:156]) # pyright: ignore[reportAttributeAccessIssue] 

29 if chksum not in tarfile.calc_chksums(buf): # pyright: ignore[reportAttributeAccessIssue] 

30 raise tarfile.InvalidHeaderError("bad checksum") # pyright: ignore[reportAttributeAccessIssue] 

31 

32 obj = cls() 

33 obj.name = tarfile.nts(buf[0:100], encoding, errors) # pyright: ignore[reportAttributeAccessIssue] 

34 obj.mode = tarfile.nti(buf[100:108]) # pyright: ignore[reportAttributeAccessIssue] 

35 obj.uid = tarfile.nti(buf[108:116]) # pyright: ignore[reportAttributeAccessIssue] 

36 obj.gid = tarfile.nti(buf[116:124]) # pyright: ignore[reportAttributeAccessIssue] 

37 obj.size = tarfile.nti(buf[124:136]) # pyright: ignore[reportAttributeAccessIssue] 

38 obj.mtime = tarfile.nti(buf[136:148]) # pyright: ignore[reportAttributeAccessIssue] 

39 obj.chksum = chksum 

40 obj.type = bytes(buf[156:157]) 

41 obj.linkname = tarfile.nts(buf[157:257], encoding, errors) # pyright: ignore[reportAttributeAccessIssue] 

42 obj.uname = tarfile.nts(buf[265:297], encoding, errors) # pyright: ignore[reportAttributeAccessIssue] 

43 obj.gname = tarfile.nts(buf[297:329], encoding, errors) # pyright: ignore[reportAttributeAccessIssue] 

44 obj.devmajor = tarfile.nti(buf[329:337]) # pyright: ignore[reportAttributeAccessIssue] 

45 obj.devminor = tarfile.nti(buf[337:345]) # pyright: ignore[reportAttributeAccessIssue] 

46 prefix = tarfile.nts(buf[345:500], encoding, errors) # pyright: ignore[reportAttributeAccessIssue] 

47 magic = buf[257:265] 

48 

49 if obj.type == tarfile.AREGTYPE and obj.name.endswith("/"): 

50 obj.type = tarfile.DIRTYPE 

51 

52 if obj.type == tarfile.GNUTYPE_SPARSE: 

53 pos = 386 

54 structs = [] 

55 for _ in range(4): 

56 try: 

57 offset = tarfile.nti(buf[pos : pos + 12]) # pyright: ignore[reportAttributeAccessIssue] 

58 numbytes = tarfile.nti(buf[pos + 12 : pos + 24]) # pyright: ignore[reportAttributeAccessIssue] 

59 except ValueError: 

60 break 

61 structs.append((offset, numbytes)) 

62 pos += 24 

63 isextended = bool(buf[482]) 

64 origsize = tarfile.nti(buf[483:495]) # pyright: ignore[reportAttributeAccessIssue] 

65 obj._sparse_structs = (structs, isextended, origsize) 

66 

67 if obj.isdir(): 

68 obj.name = obj.name.rstrip("/") 

69 

70 if ( 

71 prefix 

72 and magic == tarfile.POSIX_MAGIC 

73 and obj.type not in tarfile.GNU_TYPES 

74 ): 

75 obj.name = prefix + "/" + obj.name 

76 return obj 

77 

78 

79def open_safe_tarfile( 

80 name=None, 

81 mode: Literal["r", "r:*", "r:", "r:gz", "r:bz2", "r:xz"] = "r", 

82 fileobj=None, 

83 **kwargs, 

84) -> tarfile.TarFile: 

85 return tarfile.open( # pyright: ignore[reportCallIssue] 

86 name=name, 

87 mode=mode, 

88 fileobj=fileobj, 

89 tarinfo=UnblobTarInfo, 

90 **kwargs, 

91 ) 

92 

93 

94class SafeTarFile: 

95 def __init__(self, inpath: Path): 

96 self.inpath = inpath 

97 self.reports = [] 

98 self.tarfile = open_safe_tarfile(inpath) 

99 if hasattr(self.tarfile, "extraction_filter") and hasattr( 

100 tarfile, "fully_trusted_filter" 

101 ): 

102 # Path and link safety checks happen in SafeTarFile before extraction. 

103 self.tarfile.extraction_filter = tarfile.fully_trusted_filter 

104 self.directories = {} 

105 

106 def close(self): 

107 self.tarfile.close() 

108 

109 def extractall(self, extract_root: Path): 

110 for member in self.tarfile.getmembers(): 

111 try: 

112 self.extract(member, extract_root) 

113 except Exception as e: 

114 self.record_problem(member, str(e), "Ignored.") 

115 self.fix_directories(extract_root) 

116 

117 def extract(self, tarinfo: tarfile.TarInfo, extract_root: Path): # noqa: C901 

118 if not tarinfo.name: 

119 self.record_problem( 

120 tarinfo, 

121 "File with empty filename in tar archive.", 

122 "Skipped.", 

123 ) 

124 return 

125 

126 if len(tarinfo.name) > MAX_PATH_LEN: 

127 self.record_problem( 

128 tarinfo, 

129 "File with filename too long in tar archive.", 

130 "Skipped.", 

131 ) 

132 return 

133 

134 if not RUNNING_AS_ROOT and (tarinfo.ischr() or tarinfo.isblk()): 

135 self.record_problem( 

136 tarinfo, 

137 "Missing elevated permissions for block and character device creation.", 

138 "Skipped.", 

139 ) 

140 return 

141 

142 # we do want to extract absolute paths, but they must be changed to prevent path traversal 

143 if Path(tarinfo.name).is_absolute(): 

144 self.record_problem( 

145 tarinfo, 

146 "Absolute path.", 

147 "Converted to extraction relative path.", 

148 ) 

149 tarinfo.name = str(Path(tarinfo.name).relative_to("/")) 

150 

151 # prevent traversal attempts through file name 

152 if not is_safe_path(basedir=extract_root, path=extract_root / tarinfo.name): 

153 self.record_problem( 

154 tarinfo, 

155 "Traversal attempt.", 

156 "Skipped.", 

157 ) 

158 return 

159 

160 # prevent traversal attempts through links 

161 if tarinfo.islnk() or tarinfo.issym(): 

162 if Path(tarinfo.linkname).is_absolute(): 

163 

164 def calculate_linkname(): 

165 root = extract_root.resolve() 

166 path = (extract_root / tarinfo.name).resolve() 

167 

168 if path.parts[: len(root.parts)] != root.parts: 

169 return None 

170 

171 depth = max(0, len(path.parts) - len(root.parts) - 1) 

172 return ("/".join([".."] * depth) or ".") + tarinfo.linkname 

173 

174 relative_linkname = calculate_linkname() 

175 if relative_linkname is None: 

176 self.record_problem( 

177 tarinfo, 

178 "Absolute path conversion to extraction relative failed - would escape root.", 

179 "Skipped.", 

180 ) 

181 return 

182 

183 assert not Path(relative_linkname).is_absolute() 

184 self.record_problem( 

185 tarinfo, 

186 "Absolute path as link target.", 

187 "Converted to extraction relative path.", 

188 ) 

189 tarinfo.linkname = relative_linkname 

190 

191 resolved_path = (extract_root / tarinfo.name).parent / tarinfo.linkname 

192 if not is_safe_path(basedir=extract_root, path=resolved_path): 

193 self.record_problem( 

194 tarinfo, 

195 "Traversal attempt through link path.", 

196 "Skipped.", 

197 ) 

198 return 

199 

200 target_path = extract_root / tarinfo.name 

201 # directories are special: we can not set their metadata now + they might also be already existing 

202 if tarinfo.isdir(): 

203 # save (potentially duplicate) dir metadata for applying at the end of the extraction 

204 self.directories[tarinfo.name] = tarinfo 

205 target_path.mkdir(parents=True, exist_ok=True) 

206 return 

207 

208 if target_path.exists(): 

209 self.record_problem( 

210 tarinfo, 

211 "Duplicate tar entry.", 

212 "Removed older version.", 

213 ) 

214 target_path.unlink() 

215 

216 self.tarfile.extract(tarinfo, extract_root) 

217 

218 def fix_directories(self, extract_root): 

219 """Complete directory extraction. 

220 

221 When extracting directories, setting metadata was intentionally skipped, 

222 so that entries under the directory can be extracted, even if the directory 

223 is write protected. 

224 """ 

225 # need to set the permissions from leafs to root 

226 directories = sorted( 

227 self.directories.values(), key=lambda d: d.name, reverse=True 

228 ) 

229 

230 # copied from tarfile.extractall(), it is somewhat ugly, as uses private helpers! 

231 for tarinfo in directories: 

232 dirpath = str(extract_root / tarinfo.name) 

233 try: 

234 self.tarfile.chown(tarinfo, dirpath, numeric_owner=True) 

235 self.tarfile.utime(tarinfo, dirpath) 

236 self.tarfile.chmod(tarinfo, dirpath) 

237 except tarfile.ExtractError as e: 

238 self.record_problem(tarinfo, str(e), "Ignored.") 

239 

240 def record_problem(self, tarinfo, problem, resolution): 

241 logger.warning(f"{problem} {resolution}", path=tarinfo.name) # noqa: G004 

242 self.reports.append( 

243 ExtractionProblem( 

244 path=tarinfo.name, 

245 problem=problem, 

246 resolution=resolution, 

247 ) 

248 )