1"""Support for installing and building the "wheel" binary package format."""
2
3from __future__ import annotations
4
5import collections
6import compileall
7import contextlib
8import csv
9import importlib
10import logging
11import os.path
12import re
13import shutil
14import sys
15import textwrap
16import warnings
17from base64 import urlsafe_b64encode
18from collections.abc import Callable, Generator, Iterable, Iterator, Sequence
19from email.message import Message
20from itertools import chain, filterfalse, starmap
21from pathlib import Path
22from typing import (
23 IO,
24 Any,
25 BinaryIO,
26 NewType,
27 Protocol,
28 cast,
29)
30from zipfile import ZipFile, ZipInfo
31
32from pip._vendor.distlib.scripts import ScriptMaker
33from pip._vendor.distlib.util import get_export_entry
34from pip._vendor.packaging.utils import canonicalize_name
35
36from pip._internal.exceptions import InstallationError
37from pip._internal.locations import get_major_minor_version
38from pip._internal.metadata import (
39 BaseDistribution,
40 FilesystemWheel,
41 get_wheel_distribution,
42)
43from pip._internal.models.direct_url import DIRECT_URL_METADATA_NAME, DirectUrl
44from pip._internal.models.scheme import SCHEME_KEYS, Scheme
45from pip._internal.utils.filesystem import adjacent_tmp_file, replace
46from pip._internal.utils.misc import StreamWrapper, ensure_dir, hash_file, partition
47from pip._internal.utils.unpacking import (
48 current_umask,
49 is_within_directory,
50 set_extracted_file_to_default_mode_plus_executable,
51 zip_item_is_executable,
52)
53from pip._internal.utils.wheel import parse_wheel
54
55
56class File(Protocol):
57 src_record_path: RecordPath
58 dest_path: str
59 changed: bool
60
61 def save(self) -> None:
62 pass
63
64
65logger = logging.getLogger(__name__)
66
67RecordPath = NewType("RecordPath", str)
68InstalledCSVRow = tuple[RecordPath, str, int | str]
69
70
71def rehash(path: str, blocksize: int = 1 << 20) -> tuple[str, str]:
72 """Return (encoded_digest, length) for path using hashlib.sha256()"""
73 h, length = hash_file(path, blocksize)
74 digest = "sha256=" + urlsafe_b64encode(h.digest()).decode("latin1").rstrip("=")
75 return (digest, str(length))
76
77
78def csv_io_kwargs(mode: str) -> dict[str, Any]:
79 """Return keyword arguments to properly open a CSV file
80 in the given mode.
81 """
82 return {"mode": mode, "newline": "", "encoding": "utf-8"}
83
84
85def fix_script(path: str) -> bool:
86 """Replace #!python with #!/path/to/python
87 Return True if file was changed.
88 """
89 # XXX RECORD hashes will need to be updated
90 assert os.path.isfile(path)
91
92 with open(path, "rb") as script:
93 firstline = script.readline()
94 if not firstline.startswith(b"#!python"):
95 return False
96 exename = sys.executable.encode(sys.getfilesystemencoding())
97 firstline = b"#!" + exename + os.linesep.encode("ascii")
98 rest = script.read()
99 with open(path, "wb") as script:
100 script.write(firstline)
101 script.write(rest)
102 return True
103
104
105def wheel_root_is_purelib(metadata: Message) -> bool:
106 return metadata.get("Root-Is-Purelib", "").lower() == "true"
107
108
109def get_entrypoints(dist: BaseDistribution) -> tuple[dict[str, str], dict[str, str]]:
110 console_scripts = {}
111 gui_scripts = {}
112 for entry_point in dist.iter_entry_points():
113 if entry_point.group == "console_scripts":
114 console_scripts[entry_point.name] = entry_point.value
115 elif entry_point.group == "gui_scripts":
116 gui_scripts[entry_point.name] = entry_point.value
117 return console_scripts, gui_scripts
118
119
120def message_about_scripts_not_on_PATH(scripts: Sequence[str]) -> str | None:
121 """Determine if any scripts are not on PATH and format a warning.
122 Returns a warning message if one or more scripts are not on PATH,
123 otherwise None.
124 """
125 if not scripts:
126 return None
127
128 # Group scripts by the path they were installed in
129 grouped_by_dir: dict[Path, set[str]] = collections.defaultdict(set)
130 for destfile in scripts:
131 dest_path = Path(destfile)
132 parent_dir = dest_path.parent.resolve()
133 script_name = dest_path.name
134 grouped_by_dir[parent_dir].add(script_name)
135
136 # We don't want to warn for directories that are on PATH.
137 not_warn_dirs = [
138 Path(i).resolve() for i in os.environ.get("PATH", "").split(os.pathsep)
139 ]
140 # If an executable sits with sys.executable, we don't warn for it.
141 # This covers the case of venv invocations without activating the venv.
142 not_warn_dirs.append(Path(sys.executable).parent.resolve())
143 warn_for: dict[Path, set[str]] = {
144 parent_dir: scripts
145 for parent_dir, scripts in grouped_by_dir.items()
146 if parent_dir not in not_warn_dirs
147 }
148 if not warn_for:
149 return None
150
151 # Format a message
152 msg_lines = []
153 for parent_dir, dir_scripts in warn_for.items():
154 sorted_scripts: list[str] = sorted(dir_scripts)
155 if len(sorted_scripts) == 1:
156 start_text = f"script {sorted_scripts[0]} is"
157 else:
158 start_text = "scripts {} are".format(
159 ", ".join(sorted_scripts[:-1]) + " and " + sorted_scripts[-1]
160 )
161
162 msg_lines.append(
163 f"The {start_text} installed in '{parent_dir}' which is not on PATH."
164 )
165
166 last_line_fmt = (
167 "Consider adding {} to PATH or, if you prefer "
168 "to suppress this warning, use --no-warn-script-location."
169 )
170 if len(msg_lines) == 1:
171 msg_lines.append(last_line_fmt.format("this directory"))
172 else:
173 msg_lines.append(last_line_fmt.format("these directories"))
174
175 # Add a note if any directory starts with ~
176 warn_for_tilde = any(
177 i[0] == "~" for i in os.environ.get("PATH", "").split(os.pathsep) if i
178 )
179 if warn_for_tilde:
180 tilde_warning_msg = (
181 "NOTE: The current PATH contains path(s) starting with `~`, "
182 "which may not be expanded by all applications."
183 )
184 msg_lines.append(tilde_warning_msg)
185
186 # Returns the formatted multiline message
187 return "\n".join(msg_lines)
188
189
190def _normalized_outrows(
191 outrows: Iterable[InstalledCSVRow],
192) -> list[tuple[str, str, str]]:
193 """Normalize the given rows of a RECORD file.
194
195 Items in each row are converted into str. Rows are then sorted to make
196 the value more predictable for tests.
197
198 Each row is a 3-tuple (path, hash, size) and corresponds to a record of
199 a RECORD file (see PEP 376 and PEP 427 for details). For the rows
200 passed to this function, the size can be an integer as an int or string,
201 or the empty string.
202 """
203 # Normally, there should only be one row per path, in which case the
204 # second and third elements don't come into play when sorting.
205 # However, in cases in the wild where a path might happen to occur twice,
206 # we don't want the sort operation to trigger an error (but still want
207 # determinism). Since the third element can be an int or string, we
208 # coerce each element to a string to avoid a TypeError in this case.
209 # For additional background, see--
210 # https://github.com/pypa/pip/issues/5868
211 return sorted(
212 (record_path, hash_, str(size)) for record_path, hash_, size in outrows
213 )
214
215
216def _record_to_fs_path(record_path: RecordPath, lib_dir: str) -> str:
217 return os.path.join(lib_dir, record_path)
218
219
220def _fs_to_record_path(path: str, lib_dir: str) -> RecordPath:
221 # On Windows, do not handle relative paths if they belong to different
222 # logical disks
223 if os.path.splitdrive(path)[0].lower() == os.path.splitdrive(lib_dir)[0].lower():
224 path = os.path.relpath(path, lib_dir)
225
226 path = path.replace(os.path.sep, "/")
227 return cast("RecordPath", path)
228
229
230def get_csv_rows_for_installed(
231 old_csv_rows: list[list[str]],
232 installed: dict[RecordPath, RecordPath],
233 changed: set[RecordPath],
234 generated: list[str],
235 lib_dir: str,
236) -> list[InstalledCSVRow]:
237 """
238 :param installed: A map from archive RECORD path to installation RECORD
239 path.
240 """
241 installed_rows: list[InstalledCSVRow] = []
242 for row in old_csv_rows:
243 if len(row) > 3:
244 logger.warning("RECORD line has more than three elements: %s", row)
245 old_record_path = cast("RecordPath", row[0])
246 new_record_path = installed.pop(old_record_path, old_record_path)
247 if new_record_path in changed:
248 digest, length = rehash(_record_to_fs_path(new_record_path, lib_dir))
249 else:
250 digest = row[1] if len(row) > 1 else ""
251 length = row[2] if len(row) > 2 else ""
252 installed_rows.append((new_record_path, digest, length))
253 for f in generated:
254 path = _fs_to_record_path(f, lib_dir)
255 digest, length = rehash(f)
256 installed_rows.append((path, digest, length))
257 return installed_rows + [
258 (installed_record_path, "", "") for installed_record_path in installed.values()
259 ]
260
261
262def get_console_script_specs(console: dict[str, str]) -> list[str]:
263 """
264 Given the mapping from entrypoint name to callable, return the relevant
265 console script specs.
266 """
267 # Don't mutate caller's version
268 console = console.copy()
269
270 scripts_to_generate = []
271
272 # Special case pip and setuptools to generate versioned wrappers
273 #
274 # The issue is that some projects (specifically, pip and setuptools) use
275 # code in setup.py to create "versioned" entry points - pip2.7 on Python
276 # 2.7, pip3.3 on Python 3.3, etc. But these entry points are baked into
277 # the wheel metadata at build time, and so if the wheel is installed with
278 # a *different* version of Python the entry points will be wrong. The
279 # correct fix for this is to enhance the metadata to be able to describe
280 # such versioned entry points.
281 # Currently, projects using versioned entry points will either have
282 # incorrect versioned entry points, or they will not be able to distribute
283 # "universal" wheels (i.e., they will need a wheel per Python version).
284 #
285 # Because setuptools and pip are bundled with _ensurepip and virtualenv,
286 # we need to use universal wheels. As a workaround, we
287 # override the versioned entry points in the wheel and generate the
288 # correct ones.
289 #
290 # To add the level of hack in this section of code, in order to support
291 # ensurepip this code will look for an ``ENSUREPIP_OPTIONS`` environment
292 # variable which will control which version scripts get installed.
293 #
294 # ENSUREPIP_OPTIONS=altinstall
295 # - Only pipX.Y and easy_install-X.Y will be generated and installed
296 # ENSUREPIP_OPTIONS=install
297 # - pipX.Y, pipX, easy_install-X.Y will be generated and installed. Note
298 # that this option is technically if ENSUREPIP_OPTIONS is set and is
299 # not altinstall
300 # DEFAULT
301 # - The default behavior is to install pip, pipX, pipX.Y, easy_install
302 # and easy_install-X.Y.
303 pip_script = console.pop("pip", None)
304 if pip_script:
305 if "ENSUREPIP_OPTIONS" not in os.environ:
306 scripts_to_generate.append("pip = " + pip_script)
307
308 if os.environ.get("ENSUREPIP_OPTIONS", "") != "altinstall":
309 scripts_to_generate.append(f"pip{sys.version_info[0]} = {pip_script}")
310
311 scripts_to_generate.append(f"pip{get_major_minor_version()} = {pip_script}")
312 # Delete any other versioned pip entry points
313 pip_ep = [k for k in console if re.match(r"pip(\d+(\.\d+)?)?$", k)]
314 for k in pip_ep:
315 del console[k]
316 easy_install_script = console.pop("easy_install", None)
317 if easy_install_script:
318 if "ENSUREPIP_OPTIONS" not in os.environ:
319 scripts_to_generate.append("easy_install = " + easy_install_script)
320
321 scripts_to_generate.append(
322 f"easy_install-{get_major_minor_version()} = {easy_install_script}"
323 )
324 # Delete any other versioned easy_install entry points
325 easy_install_ep = [
326 k for k in console if re.match(r"easy_install(-\d+\.\d+)?$", k)
327 ]
328 for k in easy_install_ep:
329 del console[k]
330
331 # Generate the console entry points specified in the wheel
332 scripts_to_generate.extend(starmap("{} = {}".format, console.items()))
333
334 return scripts_to_generate
335
336
337class ZipBackedFile:
338 def __init__(
339 self, src_record_path: RecordPath, dest_path: str, zip_file: ZipFile
340 ) -> None:
341 self.src_record_path = src_record_path
342 self.dest_path = dest_path
343 self._zip_file = zip_file
344 self.changed = False
345
346 def _getinfo(self) -> ZipInfo:
347 return self._zip_file.getinfo(self.src_record_path)
348
349 def save(self) -> None:
350 # When we open the output file below, any existing file is truncated
351 # before we start writing the new contents. This is fine in most
352 # cases, but can cause a segfault if pip has loaded a shared
353 # object (e.g. from pyopenssl through its vendored urllib3)
354 # Since the shared object is mmap'd an attempt to call a
355 # symbol in it will then cause a segfault. Unlinking the file
356 # allows writing of new contents while allowing the process to
357 # continue to use the old copy.
358 if os.path.exists(self.dest_path):
359 os.unlink(self.dest_path)
360
361 zipinfo = self._getinfo()
362
363 # optimization: the file is created by open(),
364 # skip the decompression when there is 0 bytes to decompress.
365 with open(self.dest_path, "wb") as dest:
366 if zipinfo.file_size > 0:
367 with self._zip_file.open(zipinfo) as f:
368 blocksize = min(zipinfo.file_size, 1024 * 1024)
369 shutil.copyfileobj(f, dest, blocksize)
370
371 if zip_item_is_executable(zipinfo):
372 set_extracted_file_to_default_mode_plus_executable(self.dest_path)
373
374
375class ScriptFile:
376 def __init__(self, file: File) -> None:
377 self._file = file
378 self.src_record_path = self._file.src_record_path
379 self.dest_path = self._file.dest_path
380 self.changed = False
381
382 def save(self) -> None:
383 self._file.save()
384 self.changed = fix_script(self.dest_path)
385
386
387class MissingCallableSuffix(InstallationError):
388 def __init__(self, entry_point: str) -> None:
389 super().__init__(
390 f"Invalid script entry point: {entry_point} - A callable "
391 "suffix is required. See https://packaging.python.org/"
392 "specifications/entry-points/#use-for-scripts for more "
393 "information."
394 )
395
396
397def _raise_for_invalid_entrypoint(specification: str, scripts_dir: str) -> None:
398 entry = get_export_entry(specification)
399 if entry is None:
400 return
401
402 if entry.suffix is None:
403 raise MissingCallableSuffix(str(entry))
404
405 # distlib joins the entry point name onto the scripts directory, so a name
406 # with path separators or ``..`` components can resolve elsewhere. The script
407 # must resolve to a path strictly inside the scripts directory.
408 dest = os.path.join(scripts_dir, entry.name)
409 resolves_to_scripts_dir = os.path.abspath(dest) == os.path.abspath(scripts_dir)
410 if resolves_to_scripts_dir or not is_within_directory(scripts_dir, dest):
411 raise InstallationError(
412 f"Invalid script entry point name {entry.name!r}: the script "
413 f"would be installed outside the scripts directory ({scripts_dir})."
414 )
415
416
417class PipScriptMaker(ScriptMaker):
418 # Override distlib's default script template with one that
419 # doesn't import `re` module, allowing scripts to load faster.
420 script_template = textwrap.dedent("""\
421 import sys
422 from %(module)s import %(import_name)s
423 if __name__ == '__main__':
424 sys.argv[0] = sys.argv[0].removesuffix('.exe')
425 sys.exit(%(func)s())
426""")
427
428 def make(
429 self, specification: str, options: dict[str, Any] | None = None
430 ) -> list[str]:
431 _raise_for_invalid_entrypoint(specification, self.target_dir)
432 return super().make(specification, options)
433
434
435def _install_wheel( # noqa: C901, PLR0915 function is too long
436 name: str,
437 wheel_zip: ZipFile,
438 wheel_path: str,
439 scheme: Scheme,
440 pycompile: bool = True,
441 warn_script_location: bool = True,
442 direct_url: DirectUrl | None = None,
443 requested: bool = False,
444) -> None:
445 """Install a wheel.
446
447 :param name: Name of the project to install
448 :param wheel_zip: open ZipFile for wheel being installed
449 :param scheme: Distutils scheme dictating the install directories
450 :param req_description: String used in place of the requirement, for
451 logging
452 :param pycompile: Whether to byte-compile installed Python files
453 :param warn_script_location: Whether to check that scripts are installed
454 into a directory on PATH
455 :raises UnsupportedWheel:
456 * when the directory holds an unpacked wheel with incompatible
457 Wheel-Version
458 * when the .dist-info dir does not match the wheel
459 """
460 info_dir, metadata = parse_wheel(wheel_zip, name)
461
462 if wheel_root_is_purelib(metadata):
463 lib_dir = scheme.purelib
464 else:
465 lib_dir = scheme.platlib
466
467 # Record details of the files moved
468 # installed = files copied from the wheel to the destination
469 # changed = files changed while installing (scripts #! line typically)
470 # generated = files newly generated during the install (script wrappers)
471 installed: dict[RecordPath, RecordPath] = {}
472 changed: set[RecordPath] = set()
473 generated: list[str] = []
474
475 def record_installed(
476 srcfile: RecordPath, destfile: str, modified: bool = False
477 ) -> None:
478 """Map archive RECORD paths to installation RECORD paths."""
479 newpath = _fs_to_record_path(destfile, lib_dir)
480 installed[srcfile] = newpath
481 if modified:
482 changed.add(newpath)
483
484 def is_dir_path(path: RecordPath) -> bool:
485 return path.endswith("/")
486
487 def assert_no_path_traversal(dest_dir_path: str, target_path: str) -> None:
488 if not is_within_directory(dest_dir_path, target_path):
489 message = (
490 "The wheel {!r} has a file {!r} trying to install"
491 " outside the target directory {!r}"
492 )
493 raise InstallationError(
494 message.format(wheel_path, target_path, dest_dir_path)
495 )
496
497 def root_scheme_file_maker(
498 zip_file: ZipFile, dest: str
499 ) -> Callable[[RecordPath], File]:
500 def make_root_scheme_file(record_path: RecordPath) -> File:
501 normed_path = os.path.normpath(record_path)
502 dest_path = os.path.join(dest, normed_path)
503 assert_no_path_traversal(dest, dest_path)
504 return ZipBackedFile(record_path, dest_path, zip_file)
505
506 return make_root_scheme_file
507
508 def data_scheme_file_maker(
509 zip_file: ZipFile, scheme: Scheme
510 ) -> Callable[[RecordPath], File]:
511 scheme_paths = {key: getattr(scheme, key) for key in SCHEME_KEYS}
512
513 def make_data_scheme_file(record_path: RecordPath) -> File:
514 normed_path = os.path.normpath(record_path)
515 try:
516 _, scheme_key, dest_subpath = normed_path.split(os.path.sep, 2)
517 except ValueError:
518 message = (
519 f"Unexpected file in {wheel_path}: {record_path!r}. .data directory"
520 " contents should be named like: '<scheme key>/<path>'."
521 )
522 raise InstallationError(message)
523
524 try:
525 scheme_path = scheme_paths[scheme_key]
526 except KeyError:
527 valid_scheme_keys = ", ".join(sorted(scheme_paths))
528 message = (
529 f"Unknown scheme key used in {wheel_path}: {scheme_key} "
530 f"(for file {record_path!r}). .data directory contents "
531 f"should be in subdirectories named with a valid scheme "
532 f"key ({valid_scheme_keys})"
533 )
534 raise InstallationError(message)
535
536 dest_path = os.path.join(scheme_path, dest_subpath)
537 assert_no_path_traversal(scheme_path, dest_path)
538 return ZipBackedFile(record_path, dest_path, zip_file)
539
540 return make_data_scheme_file
541
542 def is_data_scheme_path(path: RecordPath) -> bool:
543 return path.split("/", 1)[0].endswith(".data")
544
545 paths = cast(list[RecordPath], wheel_zip.namelist())
546 file_paths = filterfalse(is_dir_path, paths)
547 root_scheme_paths, data_scheme_paths = partition(is_data_scheme_path, file_paths)
548
549 make_root_scheme_file = root_scheme_file_maker(wheel_zip, lib_dir)
550 files: Iterator[File] = map(make_root_scheme_file, root_scheme_paths)
551
552 def is_script_scheme_path(path: RecordPath) -> bool:
553 parts = path.split("/", 2)
554 return len(parts) > 2 and parts[0].endswith(".data") and parts[1] == "scripts"
555
556 other_scheme_paths, script_scheme_paths = partition(
557 is_script_scheme_path, data_scheme_paths
558 )
559
560 make_data_scheme_file = data_scheme_file_maker(wheel_zip, scheme)
561 other_scheme_files = map(make_data_scheme_file, other_scheme_paths)
562 files = chain(files, other_scheme_files)
563
564 # Get the defined entry points
565 distribution = get_wheel_distribution(
566 FilesystemWheel(wheel_path),
567 canonicalize_name(name),
568 )
569 console, gui = get_entrypoints(distribution)
570
571 def is_entrypoint_wrapper(file: File) -> bool:
572 # EP, EP.exe and EP-script.py are scripts generated for
573 # entry point EP by setuptools
574 path = file.dest_path
575 name = os.path.basename(path)
576 if name.lower().endswith(".exe"):
577 matchname = name[:-4]
578 elif name.lower().endswith("-script.py"):
579 matchname = name[:-10]
580 elif name.lower().endswith(".pya"):
581 matchname = name[:-4]
582 else:
583 matchname = name
584 # Ignore setuptools-generated scripts
585 return matchname in console or matchname in gui
586
587 script_scheme_files: Iterator[File] = map(
588 make_data_scheme_file, script_scheme_paths
589 )
590 script_scheme_files = filterfalse(is_entrypoint_wrapper, script_scheme_files)
591 script_scheme_files = map(ScriptFile, script_scheme_files)
592 files = chain(files, script_scheme_files)
593
594 existing_parents = set()
595 for file in files:
596 # directory creation is lazy and after file filtering
597 # to ensure we don't install empty dirs; empty dirs can't be
598 # uninstalled.
599 parent_dir = os.path.dirname(file.dest_path)
600 if parent_dir not in existing_parents:
601 ensure_dir(parent_dir)
602 existing_parents.add(parent_dir)
603 file.save()
604 record_installed(file.src_record_path, file.dest_path, file.changed)
605
606 def pyc_source_file_paths() -> Generator[str, None, None]:
607 # We de-duplicate installation paths, since there can be overlap (e.g.
608 # file in .data maps to same location as file in wheel root).
609 # Sorting installation paths makes it easier to reproduce and debug
610 # issues related to permissions on existing files.
611 for installed_path in sorted(set(installed.values())):
612 full_installed_path = os.path.join(lib_dir, installed_path)
613 if not os.path.isfile(full_installed_path):
614 continue
615 if not full_installed_path.endswith(".py"):
616 continue
617 yield full_installed_path
618
619 def pyc_output_path(path: str) -> str:
620 """Return the path the pyc file would have been written to."""
621 return importlib.util.cache_from_source(path)
622
623 # Compile all of the pyc files for the installed files
624 if pycompile:
625 with contextlib.redirect_stdout(
626 StreamWrapper.from_stream(sys.stdout)
627 ) as stdout:
628 with warnings.catch_warnings():
629 warnings.filterwarnings("ignore")
630 for path in pyc_source_file_paths():
631 success = compileall.compile_file(path, force=True, quiet=True)
632 if success:
633 pyc_path = pyc_output_path(path)
634 assert os.path.exists(pyc_path)
635 pyc_record_path = cast(
636 "RecordPath", pyc_path.replace(os.path.sep, "/")
637 )
638 record_installed(pyc_record_path, pyc_path)
639 logger.debug(stdout.getvalue())
640
641 maker = PipScriptMaker(None, scheme.scripts)
642
643 # Ensure old scripts are overwritten.
644 # See https://github.com/pypa/pip/issues/1800
645 maker.clobber = True
646
647 # Ensure we don't generate any variants for scripts because this is almost
648 # never what somebody wants.
649 # See https://bitbucket.org/pypa/distlib/issue/35/
650 maker.variants = {""}
651
652 # This is required because otherwise distlib creates scripts that are not
653 # executable.
654 # See https://bitbucket.org/pypa/distlib/issue/32/
655 maker.set_mode = True
656
657 # Generate the console and GUI entry points specified in the wheel
658 scripts_to_generate = get_console_script_specs(console)
659
660 gui_scripts_to_generate = list(starmap("{} = {}".format, gui.items()))
661
662 generated_console_scripts = maker.make_multiple(scripts_to_generate)
663 generated.extend(generated_console_scripts)
664
665 generated.extend(maker.make_multiple(gui_scripts_to_generate, {"gui": True}))
666
667 if warn_script_location:
668 msg = message_about_scripts_not_on_PATH(generated_console_scripts)
669 if msg is not None:
670 logger.warning(msg)
671
672 generated_file_mode = 0o666 & ~current_umask()
673
674 @contextlib.contextmanager
675 def _generate_file(path: str, **kwargs: Any) -> Generator[BinaryIO, None, None]:
676 with adjacent_tmp_file(path, **kwargs) as f:
677 yield f
678 os.chmod(f.name, generated_file_mode)
679 replace(f.name, path)
680
681 dest_info_dir = os.path.join(lib_dir, info_dir)
682
683 # Record pip as the installer
684 installer_path = os.path.join(dest_info_dir, "INSTALLER")
685 with _generate_file(installer_path) as installer_file:
686 installer_file.write(b"pip\n")
687 generated.append(installer_path)
688
689 # Record the PEP 610 direct URL reference
690 if direct_url is not None:
691 direct_url_path = os.path.join(dest_info_dir, DIRECT_URL_METADATA_NAME)
692 with _generate_file(direct_url_path) as direct_url_file:
693 direct_url_file.write(direct_url.to_json().encode("utf-8"))
694 generated.append(direct_url_path)
695
696 # Record the REQUESTED file
697 if requested:
698 requested_path = os.path.join(dest_info_dir, "REQUESTED")
699 with open(requested_path, "wb"):
700 pass
701 generated.append(requested_path)
702
703 record_text = distribution.read_text("RECORD")
704 record_rows = list(csv.reader(record_text.splitlines()))
705
706 rows = get_csv_rows_for_installed(
707 record_rows,
708 installed=installed,
709 changed=changed,
710 generated=generated,
711 lib_dir=lib_dir,
712 )
713
714 # Record details of all files installed
715 record_path = os.path.join(dest_info_dir, "RECORD")
716
717 with _generate_file(record_path, **csv_io_kwargs("w")) as record_file:
718 # Explicitly cast to typing.IO[str] as a workaround for the mypy error:
719 # "writer" has incompatible type "BinaryIO"; expected "_Writer"
720 writer = csv.writer(cast("IO[str]", record_file))
721 writer.writerows(_normalized_outrows(rows))
722
723
724@contextlib.contextmanager
725def req_error_context(req_description: str) -> Generator[None, None, None]:
726 try:
727 yield
728 except InstallationError as e:
729 message = f"For req: {req_description}. {e.args[0]}"
730 raise InstallationError(message) from e
731
732
733def install_wheel(
734 name: str,
735 wheel_path: str,
736 scheme: Scheme,
737 req_description: str,
738 pycompile: bool = True,
739 warn_script_location: bool = True,
740 direct_url: DirectUrl | None = None,
741 requested: bool = False,
742) -> None:
743 with ZipFile(wheel_path, allowZip64=True) as z:
744 with req_error_context(req_description):
745 _install_wheel(
746 name=name,
747 wheel_zip=z,
748 wheel_path=wheel_path,
749 scheme=scheme,
750 pycompile=pycompile,
751 warn_script_location=warn_script_location,
752 direct_url=direct_url,
753 requested=requested,
754 )