1"""Utilities related archives."""
2
3from __future__ import annotations
4
5import logging
6import os
7import shutil
8import stat
9import sys
10import tarfile
11import zipfile
12from collections.abc import Iterable
13from zipfile import ZipInfo
14
15from pip._internal.exceptions import InstallationError
16from pip._internal.utils.filetypes import (
17 BZ2_EXTENSIONS,
18 TAR_EXTENSIONS,
19 XZ_EXTENSIONS,
20 ZIP_EXTENSIONS,
21)
22from pip._internal.utils.misc import ensure_dir
23
24logger = logging.getLogger(__name__)
25
26
27SUPPORTED_EXTENSIONS = ZIP_EXTENSIONS + TAR_EXTENSIONS
28
29try:
30 import bz2 # noqa
31
32 SUPPORTED_EXTENSIONS += BZ2_EXTENSIONS
33except ImportError:
34 logger.debug("bz2 module is not available")
35
36try:
37 # Only for Python 3.3+
38 import lzma # noqa
39
40 SUPPORTED_EXTENSIONS += XZ_EXTENSIONS
41except ImportError:
42 logger.debug("lzma module is not available")
43
44
45def current_umask() -> int:
46 """Get the current umask which involves having to set it temporarily."""
47 mask = os.umask(0)
48 os.umask(mask)
49 return mask
50
51
52def split_leading_dir(path: str) -> list[str]:
53 path = path.lstrip("/").lstrip("\\")
54 if "/" in path and (
55 ("\\" in path and path.find("/") < path.find("\\")) or "\\" not in path
56 ):
57 return path.split("/", 1)
58 elif "\\" in path:
59 return path.split("\\", 1)
60 else:
61 return [path, ""]
62
63
64def has_leading_dir(paths: Iterable[str]) -> bool:
65 """Returns true if all the paths have the same leading path name
66 (i.e., everything is in one subdirectory in an archive)"""
67 common_prefix = None
68 for path in paths:
69 prefix, rest = split_leading_dir(path)
70 if not prefix:
71 return False
72 elif common_prefix is None:
73 common_prefix = prefix
74 elif prefix != common_prefix:
75 return False
76 return True
77
78
79def is_within_directory(directory: str, target: str) -> bool:
80 """
81 Return true if the absolute path of target is within the directory
82 """
83 abs_directory = os.path.abspath(directory)
84 abs_target = os.path.abspath(target)
85
86 prefix = os.path.commonpath([abs_directory, abs_target])
87 return prefix == abs_directory
88
89
90def _get_default_mode_plus_executable() -> int:
91 return 0o777 & ~current_umask() | 0o111
92
93
94def set_extracted_file_to_default_mode_plus_executable(path: str) -> None:
95 """
96 Make file present at path have execute for user/group/world
97 (chmod +x) is no-op on windows per python docs
98 """
99 os.chmod(path, _get_default_mode_plus_executable())
100
101
102def zip_item_is_executable(info: ZipInfo) -> bool:
103 mode = info.external_attr >> 16
104 # if mode and regular file and any execute permissions for
105 # user/group/world?
106 return bool(mode and stat.S_ISREG(mode) and mode & 0o111)
107
108
109def unzip_file(filename: str, location: str, flatten: bool = True) -> None:
110 """
111 Unzip the file (with path `filename`) to the destination `location`. All
112 files are written based on system defaults and umask (i.e. permissions are
113 not preserved), except that regular file members with any execute
114 permissions (user, group, or world) have "chmod +x" applied after being
115 written. Note that for windows, any execute changes using os.chmod are
116 no-ops per the python docs.
117 """
118 ensure_dir(location)
119 zipfp = open(filename, "rb")
120 try:
121 zip = zipfile.ZipFile(zipfp, allowZip64=True)
122 leading = has_leading_dir(zip.namelist()) and flatten
123 for info in zip.infolist():
124 name = info.filename
125 fn = name
126 if leading:
127 fn = split_leading_dir(name)[1]
128 fn = os.path.join(location, fn)
129 dir = os.path.dirname(fn)
130 if not is_within_directory(location, fn):
131 message = (
132 "The zip file ({}) has a file ({}) trying to install "
133 "outside target directory ({})"
134 )
135 raise InstallationError(message.format(filename, fn, location))
136 if fn.endswith(("/", "\\")):
137 # A directory
138 ensure_dir(fn)
139 else:
140 ensure_dir(dir)
141 # Don't use read() to avoid allocating an arbitrarily large
142 # chunk of memory for the file's content
143 fp = zip.open(name)
144 try:
145 with open(fn, "wb") as destfp:
146 shutil.copyfileobj(fp, destfp)
147 finally:
148 fp.close()
149 if zip_item_is_executable(info):
150 set_extracted_file_to_default_mode_plus_executable(fn)
151 finally:
152 zipfp.close()
153
154
155def untar_file(filename: str, location: str) -> None:
156 """
157 Untar the file (with path `filename`) to the destination `location`.
158 All files are written based on system defaults and umask (i.e. permissions
159 are not preserved), except that regular file members with any execute
160 permissions (user, group, or world) have "chmod +x" applied on top of the
161 default. Note that for windows, any execute changes using os.chmod are
162 no-ops per the python docs.
163 """
164 ensure_dir(location)
165 if filename.lower().endswith(".gz") or filename.lower().endswith(".tgz"):
166 mode = "r:gz"
167 elif filename.lower().endswith(BZ2_EXTENSIONS):
168 mode = "r:bz2"
169 elif filename.lower().endswith(XZ_EXTENSIONS):
170 mode = "r:xz"
171 elif filename.lower().endswith(".tar"):
172 mode = "r"
173 else:
174 logger.warning(
175 "Cannot determine compression type for file %s",
176 filename,
177 )
178 mode = "r:*"
179
180 tar = tarfile.open(filename, mode, encoding="utf-8") # type: ignore
181 try:
182 leading = has_leading_dir([member.name for member in tar.getmembers()])
183
184 # PEP 706 added `tarfile.data_filter`, and made some other changes to
185 # Python's tarfile module (see below). The features were backported to
186 # security releases.
187 try:
188 data_filter = tarfile.data_filter
189 except AttributeError:
190 _untar_without_filter(filename, location, tar, leading)
191 else:
192 default_mode_plus_executable = _get_default_mode_plus_executable()
193
194 if leading:
195 # Strip the leading directory from all files in the archive,
196 # including hardlink targets (which are relative to the
197 # unpack location).
198 for member in tar.getmembers():
199 name_lead, name_rest = split_leading_dir(member.name)
200 member.name = name_rest
201 if member.islnk():
202 lnk_lead, lnk_rest = split_leading_dir(member.linkname)
203 if lnk_lead == name_lead:
204 member.linkname = lnk_rest
205
206 def pip_filter(member: tarfile.TarInfo, path: str) -> tarfile.TarInfo:
207 orig_mode = member.mode
208 try:
209 try:
210 member = data_filter(member, location)
211 except tarfile.LinkOutsideDestinationError:
212 if sys.version_info[:3] in {
213 (3, 9, 17),
214 (3, 10, 12),
215 (3, 11, 4),
216 }:
217 # The tarfile filter in specific Python versions
218 # raises LinkOutsideDestinationError on valid input
219 # (https://github.com/python/cpython/issues/107845)
220 # Ignore the error there, but do use the
221 # more lax `tar_filter`
222 member = tarfile.tar_filter(member, location)
223 else:
224 raise
225 except tarfile.TarError as exc:
226 message = "Invalid member in the tar file {}: {}"
227 # Filter error messages mention the member name.
228 # No need to add it here.
229 raise InstallationError(
230 message.format(
231 filename,
232 exc,
233 )
234 )
235 if member.isfile() and orig_mode & 0o111:
236 member.mode = default_mode_plus_executable
237 else:
238 # See PEP 706 note above.
239 # The PEP changed this from `int` to `Optional[int]`,
240 # where None means "use the default". Mypy doesn't
241 # know this yet.
242 member.mode = None # type: ignore [assignment]
243 return member
244
245 tar.extractall(location, filter=pip_filter)
246
247 finally:
248 tar.close()
249
250
251def is_symlink_target_in_tar(tar: tarfile.TarFile, tarinfo: tarfile.TarInfo) -> bool:
252 """Check if the file pointed to by the symbolic link is in the tar archive"""
253 linkname = os.path.join(os.path.dirname(tarinfo.name), tarinfo.linkname)
254
255 linkname = os.path.normpath(linkname)
256 linkname = linkname.replace("\\", "/")
257
258 try:
259 tar.getmember(linkname)
260 return True
261 except KeyError:
262 return False
263
264
265def _untar_without_filter(
266 filename: str,
267 location: str,
268 tar: tarfile.TarFile,
269 leading: bool,
270) -> None:
271 """Fallback for Python without tarfile.data_filter"""
272 # NOTE: This function can be removed once pip requires CPython ≥ 3.12.
273 # PEP 706 added tarfile.data_filter, made tarfile extraction operations more secure.
274 # This feature is fully supported from CPython 3.12 onward.
275 for member in tar.getmembers():
276 fn = member.name
277 if leading:
278 fn = split_leading_dir(fn)[1]
279 path = os.path.join(location, fn)
280 if not is_within_directory(location, path):
281 message = (
282 "The tar file ({}) has a file ({}) trying to install "
283 "outside target directory ({})"
284 )
285 raise InstallationError(message.format(filename, path, location))
286 if member.isdir():
287 ensure_dir(path)
288 elif member.issym():
289 if not is_symlink_target_in_tar(tar, member):
290 message = (
291 "The tar file ({}) has a file ({}) trying to install "
292 "outside target directory ({})"
293 )
294 raise InstallationError(
295 message.format(filename, member.name, member.linkname)
296 )
297 try:
298 tar._extract_member(member, path)
299 except Exception as exc:
300 # Some corrupt tar files seem to produce this
301 # (specifically bad symlinks)
302 logger.warning(
303 "In the tar file %s the member %s is invalid: %s",
304 filename,
305 member.name,
306 exc,
307 )
308 continue
309 else:
310 try:
311 fp = tar.extractfile(member)
312 except (KeyError, AttributeError) as exc:
313 # Some corrupt tar files seem to produce this
314 # (specifically bad symlinks)
315 logger.warning(
316 "In the tar file %s the member %s is invalid: %s",
317 filename,
318 member.name,
319 exc,
320 )
321 continue
322 ensure_dir(os.path.dirname(path))
323 assert fp is not None
324 with open(path, "wb") as destfp:
325 shutil.copyfileobj(fp, destfp)
326 fp.close()
327 # Update the timestamp (useful for cython compiled files)
328 tar.utime(member, path)
329 # member have any execute permissions for user/group/world?
330 if member.mode & 0o111:
331 set_extracted_file_to_default_mode_plus_executable(path)
332
333
334def unpack_file(
335 filename: str,
336 location: str,
337 content_type: str | None = None,
338) -> None:
339 filename = os.path.realpath(filename)
340 if (
341 content_type == "application/zip"
342 or filename.lower().endswith(ZIP_EXTENSIONS)
343 or zipfile.is_zipfile(filename)
344 ):
345 unzip_file(filename, location, flatten=not filename.endswith(".whl"))
346 elif (
347 content_type == "application/x-gzip"
348 or tarfile.is_tarfile(filename)
349 or filename.lower().endswith(TAR_EXTENSIONS + BZ2_EXTENSIONS + XZ_EXTENSIONS)
350 ):
351 untar_file(filename, location)
352 else:
353 # FIXME: handle?
354 # FIXME: magic signatures?
355 logger.critical(
356 "Cannot unpack file %s (downloaded from %s, content-type: %s); "
357 "cannot detect archive format",
358 filename,
359 location,
360 content_type,
361 )
362 raise InstallationError(f"Cannot determine archive format of {location}")