Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pip/_internal/utils/unpacking.py: 19%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

183 statements  

1"""Utilities related archives.""" 

2 

3from __future__ import annotations 

4 

5import logging 

6import os 

7import shutil 

8import stat 

9import sys 

10import tarfile 

11import zipfile 

12from collections.abc import Iterable 

13from zipfile import ZipInfo 

14 

15from pip._internal.exceptions import InstallationError 

16from pip._internal.utils.filetypes import ( 

17 BZ2_EXTENSIONS, 

18 TAR_EXTENSIONS, 

19 XZ_EXTENSIONS, 

20 ZIP_EXTENSIONS, 

21) 

22from pip._internal.utils.misc import ensure_dir 

23 

24logger = logging.getLogger(__name__) 

25 

26 

27SUPPORTED_EXTENSIONS = ZIP_EXTENSIONS + TAR_EXTENSIONS 

28 

29try: 

30 import bz2 # noqa 

31 

32 SUPPORTED_EXTENSIONS += BZ2_EXTENSIONS 

33except ImportError: 

34 logger.debug("bz2 module is not available") 

35 

36try: 

37 # Only for Python 3.3+ 

38 import lzma # noqa 

39 

40 SUPPORTED_EXTENSIONS += XZ_EXTENSIONS 

41except ImportError: 

42 logger.debug("lzma module is not available") 

43 

44 

45def current_umask() -> int: 

46 """Get the current umask which involves having to set it temporarily.""" 

47 mask = os.umask(0) 

48 os.umask(mask) 

49 return mask 

50 

51 

52def split_leading_dir(path: str) -> list[str]: 

53 path = path.lstrip("/").lstrip("\\") 

54 if "/" in path and ( 

55 ("\\" in path and path.find("/") < path.find("\\")) or "\\" not in path 

56 ): 

57 return path.split("/", 1) 

58 elif "\\" in path: 

59 return path.split("\\", 1) 

60 else: 

61 return [path, ""] 

62 

63 

64def has_leading_dir(paths: Iterable[str]) -> bool: 

65 """Returns true if all the paths have the same leading path name 

66 (i.e., everything is in one subdirectory in an archive)""" 

67 common_prefix = None 

68 for path in paths: 

69 prefix, rest = split_leading_dir(path) 

70 if not prefix: 

71 return False 

72 elif common_prefix is None: 

73 common_prefix = prefix 

74 elif prefix != common_prefix: 

75 return False 

76 return True 

77 

78 

79def is_within_directory(directory: str, target: str) -> bool: 

80 """ 

81 Return true if the absolute path of target is within the directory 

82 """ 

83 abs_directory = os.path.abspath(directory) 

84 abs_target = os.path.abspath(target) 

85 

86 prefix = os.path.commonpath([abs_directory, abs_target]) 

87 return prefix == abs_directory 

88 

89 

90def _get_default_mode_plus_executable() -> int: 

91 return 0o777 & ~current_umask() | 0o111 

92 

93 

94def set_extracted_file_to_default_mode_plus_executable(path: str) -> None: 

95 """ 

96 Make file present at path have execute for user/group/world 

97 (chmod +x) is no-op on windows per python docs 

98 """ 

99 os.chmod(path, _get_default_mode_plus_executable()) 

100 

101 

102def zip_item_is_executable(info: ZipInfo) -> bool: 

103 mode = info.external_attr >> 16 

104 # if mode and regular file and any execute permissions for 

105 # user/group/world? 

106 return bool(mode and stat.S_ISREG(mode) and mode & 0o111) 

107 

108 

109def unzip_file(filename: str, location: str, flatten: bool = True) -> None: 

110 """ 

111 Unzip the file (with path `filename`) to the destination `location`. All 

112 files are written based on system defaults and umask (i.e. permissions are 

113 not preserved), except that regular file members with any execute 

114 permissions (user, group, or world) have "chmod +x" applied after being 

115 written. Note that for windows, any execute changes using os.chmod are 

116 no-ops per the python docs. 

117 """ 

118 ensure_dir(location) 

119 zipfp = open(filename, "rb") 

120 try: 

121 zip = zipfile.ZipFile(zipfp, allowZip64=True) 

122 leading = has_leading_dir(zip.namelist()) and flatten 

123 for info in zip.infolist(): 

124 name = info.filename 

125 fn = name 

126 if leading: 

127 fn = split_leading_dir(name)[1] 

128 fn = os.path.join(location, fn) 

129 dir = os.path.dirname(fn) 

130 if not is_within_directory(location, fn): 

131 message = ( 

132 "The zip file ({}) has a file ({}) trying to install " 

133 "outside target directory ({})" 

134 ) 

135 raise InstallationError(message.format(filename, fn, location)) 

136 if fn.endswith(("/", "\\")): 

137 # A directory 

138 ensure_dir(fn) 

139 else: 

140 ensure_dir(dir) 

141 # Don't use read() to avoid allocating an arbitrarily large 

142 # chunk of memory for the file's content 

143 fp = zip.open(name) 

144 try: 

145 with open(fn, "wb") as destfp: 

146 shutil.copyfileobj(fp, destfp) 

147 finally: 

148 fp.close() 

149 if zip_item_is_executable(info): 

150 set_extracted_file_to_default_mode_plus_executable(fn) 

151 finally: 

152 zipfp.close() 

153 

154 

155def untar_file(filename: str, location: str) -> None: 

156 """ 

157 Untar the file (with path `filename`) to the destination `location`. 

158 All files are written based on system defaults and umask (i.e. permissions 

159 are not preserved), except that regular file members with any execute 

160 permissions (user, group, or world) have "chmod +x" applied on top of the 

161 default. Note that for windows, any execute changes using os.chmod are 

162 no-ops per the python docs. 

163 """ 

164 ensure_dir(location) 

165 if filename.lower().endswith(".gz") or filename.lower().endswith(".tgz"): 

166 mode = "r:gz" 

167 elif filename.lower().endswith(BZ2_EXTENSIONS): 

168 mode = "r:bz2" 

169 elif filename.lower().endswith(XZ_EXTENSIONS): 

170 mode = "r:xz" 

171 elif filename.lower().endswith(".tar"): 

172 mode = "r" 

173 else: 

174 logger.warning( 

175 "Cannot determine compression type for file %s", 

176 filename, 

177 ) 

178 mode = "r:*" 

179 

180 tar = tarfile.open(filename, mode, encoding="utf-8") # type: ignore 

181 try: 

182 leading = has_leading_dir([member.name for member in tar.getmembers()]) 

183 

184 # PEP 706 added `tarfile.data_filter`, and made some other changes to 

185 # Python's tarfile module (see below). The features were backported to 

186 # security releases. 

187 try: 

188 data_filter = tarfile.data_filter 

189 except AttributeError: 

190 _untar_without_filter(filename, location, tar, leading) 

191 else: 

192 default_mode_plus_executable = _get_default_mode_plus_executable() 

193 

194 if leading: 

195 # Strip the leading directory from all files in the archive, 

196 # including hardlink targets (which are relative to the 

197 # unpack location). 

198 for member in tar.getmembers(): 

199 name_lead, name_rest = split_leading_dir(member.name) 

200 member.name = name_rest 

201 if member.islnk(): 

202 lnk_lead, lnk_rest = split_leading_dir(member.linkname) 

203 if lnk_lead == name_lead: 

204 member.linkname = lnk_rest 

205 

206 def pip_filter(member: tarfile.TarInfo, path: str) -> tarfile.TarInfo: 

207 orig_mode = member.mode 

208 try: 

209 try: 

210 member = data_filter(member, location) 

211 except tarfile.LinkOutsideDestinationError: 

212 if sys.version_info[:3] in { 

213 (3, 9, 17), 

214 (3, 10, 12), 

215 (3, 11, 4), 

216 }: 

217 # The tarfile filter in specific Python versions 

218 # raises LinkOutsideDestinationError on valid input 

219 # (https://github.com/python/cpython/issues/107845) 

220 # Ignore the error there, but do use the 

221 # more lax `tar_filter` 

222 member = tarfile.tar_filter(member, location) 

223 else: 

224 raise 

225 except tarfile.TarError as exc: 

226 message = "Invalid member in the tar file {}: {}" 

227 # Filter error messages mention the member name. 

228 # No need to add it here. 

229 raise InstallationError( 

230 message.format( 

231 filename, 

232 exc, 

233 ) 

234 ) 

235 if member.isfile() and orig_mode & 0o111: 

236 member.mode = default_mode_plus_executable 

237 else: 

238 # See PEP 706 note above. 

239 # The PEP changed this from `int` to `Optional[int]`, 

240 # where None means "use the default". Mypy doesn't 

241 # know this yet. 

242 member.mode = None # type: ignore [assignment] 

243 return member 

244 

245 tar.extractall(location, filter=pip_filter) 

246 

247 finally: 

248 tar.close() 

249 

250 

251def is_symlink_target_in_tar(tar: tarfile.TarFile, tarinfo: tarfile.TarInfo) -> bool: 

252 """Check if the file pointed to by the symbolic link is in the tar archive""" 

253 linkname = os.path.join(os.path.dirname(tarinfo.name), tarinfo.linkname) 

254 

255 linkname = os.path.normpath(linkname) 

256 linkname = linkname.replace("\\", "/") 

257 

258 try: 

259 tar.getmember(linkname) 

260 return True 

261 except KeyError: 

262 return False 

263 

264 

265def _untar_without_filter( 

266 filename: str, 

267 location: str, 

268 tar: tarfile.TarFile, 

269 leading: bool, 

270) -> None: 

271 """Fallback for Python without tarfile.data_filter""" 

272 # NOTE: This function can be removed once pip requires CPython ≥ 3.12.​ 

273 # PEP 706 added tarfile.data_filter, made tarfile extraction operations more secure. 

274 # This feature is fully supported from CPython 3.12 onward. 

275 for member in tar.getmembers(): 

276 fn = member.name 

277 if leading: 

278 fn = split_leading_dir(fn)[1] 

279 path = os.path.join(location, fn) 

280 if not is_within_directory(location, path): 

281 message = ( 

282 "The tar file ({}) has a file ({}) trying to install " 

283 "outside target directory ({})" 

284 ) 

285 raise InstallationError(message.format(filename, path, location)) 

286 if member.isdir(): 

287 ensure_dir(path) 

288 elif member.issym(): 

289 if not is_symlink_target_in_tar(tar, member): 

290 message = ( 

291 "The tar file ({}) has a file ({}) trying to install " 

292 "outside target directory ({})" 

293 ) 

294 raise InstallationError( 

295 message.format(filename, member.name, member.linkname) 

296 ) 

297 try: 

298 tar._extract_member(member, path) 

299 except Exception as exc: 

300 # Some corrupt tar files seem to produce this 

301 # (specifically bad symlinks) 

302 logger.warning( 

303 "In the tar file %s the member %s is invalid: %s", 

304 filename, 

305 member.name, 

306 exc, 

307 ) 

308 continue 

309 else: 

310 try: 

311 fp = tar.extractfile(member) 

312 except (KeyError, AttributeError) as exc: 

313 # Some corrupt tar files seem to produce this 

314 # (specifically bad symlinks) 

315 logger.warning( 

316 "In the tar file %s the member %s is invalid: %s", 

317 filename, 

318 member.name, 

319 exc, 

320 ) 

321 continue 

322 ensure_dir(os.path.dirname(path)) 

323 assert fp is not None 

324 with open(path, "wb") as destfp: 

325 shutil.copyfileobj(fp, destfp) 

326 fp.close() 

327 # Update the timestamp (useful for cython compiled files) 

328 tar.utime(member, path) 

329 # member have any execute permissions for user/group/world? 

330 if member.mode & 0o111: 

331 set_extracted_file_to_default_mode_plus_executable(path) 

332 

333 

334def unpack_file( 

335 filename: str, 

336 location: str, 

337 content_type: str | None = None, 

338) -> None: 

339 filename = os.path.realpath(filename) 

340 if ( 

341 content_type == "application/zip" 

342 or filename.lower().endswith(ZIP_EXTENSIONS) 

343 or zipfile.is_zipfile(filename) 

344 ): 

345 unzip_file(filename, location, flatten=not filename.endswith(".whl")) 

346 elif ( 

347 content_type == "application/x-gzip" 

348 or tarfile.is_tarfile(filename) 

349 or filename.lower().endswith(TAR_EXTENSIONS + BZ2_EXTENSIONS + XZ_EXTENSIONS) 

350 ): 

351 untar_file(filename, location) 

352 else: 

353 # FIXME: handle? 

354 # FIXME: magic signatures? 

355 logger.critical( 

356 "Cannot unpack file %s (downloaded from %s, content-type: %s); " 

357 "cannot detect archive format", 

358 filename, 

359 location, 

360 content_type, 

361 ) 

362 raise InstallationError(f"Cannot determine archive format of {location}")