1"""Support for installing and building the "wheel" binary package format."""
2
3from __future__ import annotations
4
5import collections
6import compileall
7import contextlib
8import csv
9import importlib
10import logging
11import os.path
12import re
13import shutil
14import sys
15import textwrap
16import warnings
17from base64 import urlsafe_b64encode
18from collections.abc import Generator, Iterable, Iterator, Sequence
19from email.message import Message
20from itertools import chain, filterfalse, starmap
21from typing import (
22 IO,
23 Any,
24 BinaryIO,
25 Callable,
26 NewType,
27 Protocol,
28 Union,
29 cast,
30)
31from zipfile import ZipFile, ZipInfo
32
33from pip._vendor.distlib.scripts import ScriptMaker
34from pip._vendor.distlib.util import get_export_entry
35from pip._vendor.packaging.utils import canonicalize_name
36
37from pip._internal.exceptions import InstallationError
38from pip._internal.locations import get_major_minor_version
39from pip._internal.metadata import (
40 BaseDistribution,
41 FilesystemWheel,
42 get_wheel_distribution,
43)
44from pip._internal.models.direct_url import DIRECT_URL_METADATA_NAME, DirectUrl
45from pip._internal.models.scheme import SCHEME_KEYS, Scheme
46from pip._internal.utils.filesystem import adjacent_tmp_file, replace
47from pip._internal.utils.misc import StreamWrapper, ensure_dir, hash_file, partition
48from pip._internal.utils.unpacking import (
49 current_umask,
50 is_within_directory,
51 set_extracted_file_to_default_mode_plus_executable,
52 zip_item_is_executable,
53)
54from pip._internal.utils.wheel import parse_wheel
55
56
57class File(Protocol):
58 src_record_path: RecordPath
59 dest_path: str
60 changed: bool
61
62 def save(self) -> None:
63 pass
64
65
66logger = logging.getLogger(__name__)
67
68RecordPath = NewType("RecordPath", str)
69InstalledCSVRow = tuple[RecordPath, str, Union[int, str]]
70
71
72def rehash(path: str, blocksize: int = 1 << 20) -> tuple[str, str]:
73 """Return (encoded_digest, length) for path using hashlib.sha256()"""
74 h, length = hash_file(path, blocksize)
75 digest = "sha256=" + urlsafe_b64encode(h.digest()).decode("latin1").rstrip("=")
76 return (digest, str(length))
77
78
79def csv_io_kwargs(mode: str) -> dict[str, Any]:
80 """Return keyword arguments to properly open a CSV file
81 in the given mode.
82 """
83 return {"mode": mode, "newline": "", "encoding": "utf-8"}
84
85
86def fix_script(path: str) -> bool:
87 """Replace #!python with #!/path/to/python
88 Return True if file was changed.
89 """
90 # XXX RECORD hashes will need to be updated
91 assert os.path.isfile(path)
92
93 with open(path, "rb") as script:
94 firstline = script.readline()
95 if not firstline.startswith(b"#!python"):
96 return False
97 exename = sys.executable.encode(sys.getfilesystemencoding())
98 firstline = b"#!" + exename + os.linesep.encode("ascii")
99 rest = script.read()
100 with open(path, "wb") as script:
101 script.write(firstline)
102 script.write(rest)
103 return True
104
105
106def wheel_root_is_purelib(metadata: Message) -> bool:
107 return metadata.get("Root-Is-Purelib", "").lower() == "true"
108
109
110def get_entrypoints(dist: BaseDistribution) -> tuple[dict[str, str], dict[str, str]]:
111 console_scripts = {}
112 gui_scripts = {}
113 for entry_point in dist.iter_entry_points():
114 if entry_point.group == "console_scripts":
115 console_scripts[entry_point.name] = entry_point.value
116 elif entry_point.group == "gui_scripts":
117 gui_scripts[entry_point.name] = entry_point.value
118 return console_scripts, gui_scripts
119
120
121def message_about_scripts_not_on_PATH(scripts: Sequence[str]) -> str | None:
122 """Determine if any scripts are not on PATH and format a warning.
123 Returns a warning message if one or more scripts are not on PATH,
124 otherwise None.
125 """
126 if not scripts:
127 return None
128
129 # Group scripts by the path they were installed in
130 grouped_by_dir: dict[str, set[str]] = collections.defaultdict(set)
131 for destfile in scripts:
132 parent_dir = os.path.dirname(destfile)
133 script_name = os.path.basename(destfile)
134 grouped_by_dir[parent_dir].add(script_name)
135
136 # We don't want to warn for directories that are on PATH.
137 not_warn_dirs = [
138 os.path.normcase(os.path.normpath(i)).rstrip(os.sep)
139 for i in os.environ.get("PATH", "").split(os.pathsep)
140 ]
141 # If an executable sits with sys.executable, we don't warn for it.
142 # This covers the case of venv invocations without activating the venv.
143 not_warn_dirs.append(
144 os.path.normcase(os.path.normpath(os.path.dirname(sys.executable)))
145 )
146 warn_for: dict[str, set[str]] = {
147 parent_dir: scripts
148 for parent_dir, scripts in grouped_by_dir.items()
149 if os.path.normcase(os.path.normpath(parent_dir)) not in not_warn_dirs
150 }
151 if not warn_for:
152 return None
153
154 # Format a message
155 msg_lines = []
156 for parent_dir, dir_scripts in warn_for.items():
157 sorted_scripts: list[str] = sorted(dir_scripts)
158 if len(sorted_scripts) == 1:
159 start_text = f"script {sorted_scripts[0]} is"
160 else:
161 start_text = "scripts {} are".format(
162 ", ".join(sorted_scripts[:-1]) + " and " + sorted_scripts[-1]
163 )
164
165 msg_lines.append(
166 f"The {start_text} installed in '{parent_dir}' which is not on PATH."
167 )
168
169 last_line_fmt = (
170 "Consider adding {} to PATH or, if you prefer "
171 "to suppress this warning, use --no-warn-script-location."
172 )
173 if len(msg_lines) == 1:
174 msg_lines.append(last_line_fmt.format("this directory"))
175 else:
176 msg_lines.append(last_line_fmt.format("these directories"))
177
178 # Add a note if any directory starts with ~
179 warn_for_tilde = any(
180 i[0] == "~" for i in os.environ.get("PATH", "").split(os.pathsep) if i
181 )
182 if warn_for_tilde:
183 tilde_warning_msg = (
184 "NOTE: The current PATH contains path(s) starting with `~`, "
185 "which may not be expanded by all applications."
186 )
187 msg_lines.append(tilde_warning_msg)
188
189 # Returns the formatted multiline message
190 return "\n".join(msg_lines)
191
192
193def _normalized_outrows(
194 outrows: Iterable[InstalledCSVRow],
195) -> list[tuple[str, str, str]]:
196 """Normalize the given rows of a RECORD file.
197
198 Items in each row are converted into str. Rows are then sorted to make
199 the value more predictable for tests.
200
201 Each row is a 3-tuple (path, hash, size) and corresponds to a record of
202 a RECORD file (see PEP 376 and PEP 427 for details). For the rows
203 passed to this function, the size can be an integer as an int or string,
204 or the empty string.
205 """
206 # Normally, there should only be one row per path, in which case the
207 # second and third elements don't come into play when sorting.
208 # However, in cases in the wild where a path might happen to occur twice,
209 # we don't want the sort operation to trigger an error (but still want
210 # determinism). Since the third element can be an int or string, we
211 # coerce each element to a string to avoid a TypeError in this case.
212 # For additional background, see--
213 # https://github.com/pypa/pip/issues/5868
214 return sorted(
215 (record_path, hash_, str(size)) for record_path, hash_, size in outrows
216 )
217
218
219def _record_to_fs_path(record_path: RecordPath, lib_dir: str) -> str:
220 return os.path.join(lib_dir, record_path)
221
222
223def _fs_to_record_path(path: str, lib_dir: str) -> RecordPath:
224 # On Windows, do not handle relative paths if they belong to different
225 # logical disks
226 if os.path.splitdrive(path)[0].lower() == os.path.splitdrive(lib_dir)[0].lower():
227 path = os.path.relpath(path, lib_dir)
228
229 path = path.replace(os.path.sep, "/")
230 return cast("RecordPath", path)
231
232
233def get_csv_rows_for_installed(
234 old_csv_rows: list[list[str]],
235 installed: dict[RecordPath, RecordPath],
236 changed: set[RecordPath],
237 generated: list[str],
238 lib_dir: str,
239) -> list[InstalledCSVRow]:
240 """
241 :param installed: A map from archive RECORD path to installation RECORD
242 path.
243 """
244 installed_rows: list[InstalledCSVRow] = []
245 for row in old_csv_rows:
246 if len(row) > 3:
247 logger.warning("RECORD line has more than three elements: %s", row)
248 old_record_path = cast("RecordPath", row[0])
249 new_record_path = installed.pop(old_record_path, old_record_path)
250 if new_record_path in changed:
251 digest, length = rehash(_record_to_fs_path(new_record_path, lib_dir))
252 else:
253 digest = row[1] if len(row) > 1 else ""
254 length = row[2] if len(row) > 2 else ""
255 installed_rows.append((new_record_path, digest, length))
256 for f in generated:
257 path = _fs_to_record_path(f, lib_dir)
258 digest, length = rehash(f)
259 installed_rows.append((path, digest, length))
260 return installed_rows + [
261 (installed_record_path, "", "") for installed_record_path in installed.values()
262 ]
263
264
265def get_console_script_specs(console: dict[str, str]) -> list[str]:
266 """
267 Given the mapping from entrypoint name to callable, return the relevant
268 console script specs.
269 """
270 # Don't mutate caller's version
271 console = console.copy()
272
273 scripts_to_generate = []
274
275 # Special case pip and setuptools to generate versioned wrappers
276 #
277 # The issue is that some projects (specifically, pip and setuptools) use
278 # code in setup.py to create "versioned" entry points - pip2.7 on Python
279 # 2.7, pip3.3 on Python 3.3, etc. But these entry points are baked into
280 # the wheel metadata at build time, and so if the wheel is installed with
281 # a *different* version of Python the entry points will be wrong. The
282 # correct fix for this is to enhance the metadata to be able to describe
283 # such versioned entry points.
284 # Currently, projects using versioned entry points will either have
285 # incorrect versioned entry points, or they will not be able to distribute
286 # "universal" wheels (i.e., they will need a wheel per Python version).
287 #
288 # Because setuptools and pip are bundled with _ensurepip and virtualenv,
289 # we need to use universal wheels. As a workaround, we
290 # override the versioned entry points in the wheel and generate the
291 # correct ones.
292 #
293 # To add the level of hack in this section of code, in order to support
294 # ensurepip this code will look for an ``ENSUREPIP_OPTIONS`` environment
295 # variable which will control which version scripts get installed.
296 #
297 # ENSUREPIP_OPTIONS=altinstall
298 # - Only pipX.Y and easy_install-X.Y will be generated and installed
299 # ENSUREPIP_OPTIONS=install
300 # - pipX.Y, pipX, easy_install-X.Y will be generated and installed. Note
301 # that this option is technically if ENSUREPIP_OPTIONS is set and is
302 # not altinstall
303 # DEFAULT
304 # - The default behavior is to install pip, pipX, pipX.Y, easy_install
305 # and easy_install-X.Y.
306 pip_script = console.pop("pip", None)
307 if pip_script:
308 if "ENSUREPIP_OPTIONS" not in os.environ:
309 scripts_to_generate.append("pip = " + pip_script)
310
311 if os.environ.get("ENSUREPIP_OPTIONS", "") != "altinstall":
312 scripts_to_generate.append(f"pip{sys.version_info[0]} = {pip_script}")
313
314 scripts_to_generate.append(f"pip{get_major_minor_version()} = {pip_script}")
315 # Delete any other versioned pip entry points
316 pip_ep = [k for k in console if re.match(r"pip(\d+(\.\d+)?)?$", k)]
317 for k in pip_ep:
318 del console[k]
319 easy_install_script = console.pop("easy_install", None)
320 if easy_install_script:
321 if "ENSUREPIP_OPTIONS" not in os.environ:
322 scripts_to_generate.append("easy_install = " + easy_install_script)
323
324 scripts_to_generate.append(
325 f"easy_install-{get_major_minor_version()} = {easy_install_script}"
326 )
327 # Delete any other versioned easy_install entry points
328 easy_install_ep = [
329 k for k in console if re.match(r"easy_install(-\d+\.\d+)?$", k)
330 ]
331 for k in easy_install_ep:
332 del console[k]
333
334 # Generate the console entry points specified in the wheel
335 scripts_to_generate.extend(starmap("{} = {}".format, console.items()))
336
337 return scripts_to_generate
338
339
340class ZipBackedFile:
341 def __init__(
342 self, src_record_path: RecordPath, dest_path: str, zip_file: ZipFile
343 ) -> None:
344 self.src_record_path = src_record_path
345 self.dest_path = dest_path
346 self._zip_file = zip_file
347 self.changed = False
348
349 def _getinfo(self) -> ZipInfo:
350 return self._zip_file.getinfo(self.src_record_path)
351
352 def save(self) -> None:
353 # When we open the output file below, any existing file is truncated
354 # before we start writing the new contents. This is fine in most
355 # cases, but can cause a segfault if pip has loaded a shared
356 # object (e.g. from pyopenssl through its vendored urllib3)
357 # Since the shared object is mmap'd an attempt to call a
358 # symbol in it will then cause a segfault. Unlinking the file
359 # allows writing of new contents while allowing the process to
360 # continue to use the old copy.
361 if os.path.exists(self.dest_path):
362 os.unlink(self.dest_path)
363
364 zipinfo = self._getinfo()
365
366 # optimization: the file is created by open(),
367 # skip the decompression when there is 0 bytes to decompress.
368 with open(self.dest_path, "wb") as dest:
369 if zipinfo.file_size > 0:
370 with self._zip_file.open(zipinfo) as f:
371 blocksize = min(zipinfo.file_size, 1024 * 1024)
372 shutil.copyfileobj(f, dest, blocksize)
373
374 if zip_item_is_executable(zipinfo):
375 set_extracted_file_to_default_mode_plus_executable(self.dest_path)
376
377
378class ScriptFile:
379 def __init__(self, file: File) -> None:
380 self._file = file
381 self.src_record_path = self._file.src_record_path
382 self.dest_path = self._file.dest_path
383 self.changed = False
384
385 def save(self) -> None:
386 self._file.save()
387 self.changed = fix_script(self.dest_path)
388
389
390class MissingCallableSuffix(InstallationError):
391 def __init__(self, entry_point: str) -> None:
392 super().__init__(
393 f"Invalid script entry point: {entry_point} - A callable "
394 "suffix is required. See https://packaging.python.org/"
395 "specifications/entry-points/#use-for-scripts for more "
396 "information."
397 )
398
399
400def _raise_for_invalid_entrypoint(specification: str) -> None:
401 entry = get_export_entry(specification)
402 if entry is not None and entry.suffix is None:
403 raise MissingCallableSuffix(str(entry))
404
405
406class PipScriptMaker(ScriptMaker):
407 # Override distlib's default script template with one that
408 # doesn't import `re` module, allowing scripts to load faster.
409 script_template = textwrap.dedent(
410 """\
411 import sys
412 from %(module)s import %(import_name)s
413 if __name__ == '__main__':
414 sys.argv[0] = sys.argv[0].removesuffix('.exe')
415 sys.exit(%(func)s())
416"""
417 )
418
419 def make(
420 self, specification: str, options: dict[str, Any] | None = None
421 ) -> list[str]:
422 _raise_for_invalid_entrypoint(specification)
423 return super().make(specification, options)
424
425
426def _install_wheel( # noqa: C901, PLR0915 function is too long
427 name: str,
428 wheel_zip: ZipFile,
429 wheel_path: str,
430 scheme: Scheme,
431 pycompile: bool = True,
432 warn_script_location: bool = True,
433 direct_url: DirectUrl | None = None,
434 requested: bool = False,
435) -> None:
436 """Install a wheel.
437
438 :param name: Name of the project to install
439 :param wheel_zip: open ZipFile for wheel being installed
440 :param scheme: Distutils scheme dictating the install directories
441 :param req_description: String used in place of the requirement, for
442 logging
443 :param pycompile: Whether to byte-compile installed Python files
444 :param warn_script_location: Whether to check that scripts are installed
445 into a directory on PATH
446 :raises UnsupportedWheel:
447 * when the directory holds an unpacked wheel with incompatible
448 Wheel-Version
449 * when the .dist-info dir does not match the wheel
450 """
451 info_dir, metadata = parse_wheel(wheel_zip, name)
452
453 if wheel_root_is_purelib(metadata):
454 lib_dir = scheme.purelib
455 else:
456 lib_dir = scheme.platlib
457
458 # Record details of the files moved
459 # installed = files copied from the wheel to the destination
460 # changed = files changed while installing (scripts #! line typically)
461 # generated = files newly generated during the install (script wrappers)
462 installed: dict[RecordPath, RecordPath] = {}
463 changed: set[RecordPath] = set()
464 generated: list[str] = []
465
466 def record_installed(
467 srcfile: RecordPath, destfile: str, modified: bool = False
468 ) -> None:
469 """Map archive RECORD paths to installation RECORD paths."""
470 newpath = _fs_to_record_path(destfile, lib_dir)
471 installed[srcfile] = newpath
472 if modified:
473 changed.add(newpath)
474
475 def is_dir_path(path: RecordPath) -> bool:
476 return path.endswith("/")
477
478 def assert_no_path_traversal(dest_dir_path: str, target_path: str) -> None:
479 if not is_within_directory(dest_dir_path, target_path):
480 message = (
481 "The wheel {!r} has a file {!r} trying to install"
482 " outside the target directory {!r}"
483 )
484 raise InstallationError(
485 message.format(wheel_path, target_path, dest_dir_path)
486 )
487
488 def root_scheme_file_maker(
489 zip_file: ZipFile, dest: str
490 ) -> Callable[[RecordPath], File]:
491 def make_root_scheme_file(record_path: RecordPath) -> File:
492 normed_path = os.path.normpath(record_path)
493 dest_path = os.path.join(dest, normed_path)
494 assert_no_path_traversal(dest, dest_path)
495 return ZipBackedFile(record_path, dest_path, zip_file)
496
497 return make_root_scheme_file
498
499 def data_scheme_file_maker(
500 zip_file: ZipFile, scheme: Scheme
501 ) -> Callable[[RecordPath], File]:
502 scheme_paths = {key: getattr(scheme, key) for key in SCHEME_KEYS}
503
504 def make_data_scheme_file(record_path: RecordPath) -> File:
505 normed_path = os.path.normpath(record_path)
506 try:
507 _, scheme_key, dest_subpath = normed_path.split(os.path.sep, 2)
508 except ValueError:
509 message = (
510 f"Unexpected file in {wheel_path}: {record_path!r}. .data directory"
511 " contents should be named like: '<scheme key>/<path>'."
512 )
513 raise InstallationError(message)
514
515 try:
516 scheme_path = scheme_paths[scheme_key]
517 except KeyError:
518 valid_scheme_keys = ", ".join(sorted(scheme_paths))
519 message = (
520 f"Unknown scheme key used in {wheel_path}: {scheme_key} "
521 f"(for file {record_path!r}). .data directory contents "
522 f"should be in subdirectories named with a valid scheme "
523 f"key ({valid_scheme_keys})"
524 )
525 raise InstallationError(message)
526
527 dest_path = os.path.join(scheme_path, dest_subpath)
528 assert_no_path_traversal(scheme_path, dest_path)
529 return ZipBackedFile(record_path, dest_path, zip_file)
530
531 return make_data_scheme_file
532
533 def is_data_scheme_path(path: RecordPath) -> bool:
534 return path.split("/", 1)[0].endswith(".data")
535
536 paths = cast(list[RecordPath], wheel_zip.namelist())
537 file_paths = filterfalse(is_dir_path, paths)
538 root_scheme_paths, data_scheme_paths = partition(is_data_scheme_path, file_paths)
539
540 make_root_scheme_file = root_scheme_file_maker(wheel_zip, lib_dir)
541 files: Iterator[File] = map(make_root_scheme_file, root_scheme_paths)
542
543 def is_script_scheme_path(path: RecordPath) -> bool:
544 parts = path.split("/", 2)
545 return len(parts) > 2 and parts[0].endswith(".data") and parts[1] == "scripts"
546
547 other_scheme_paths, script_scheme_paths = partition(
548 is_script_scheme_path, data_scheme_paths
549 )
550
551 make_data_scheme_file = data_scheme_file_maker(wheel_zip, scheme)
552 other_scheme_files = map(make_data_scheme_file, other_scheme_paths)
553 files = chain(files, other_scheme_files)
554
555 # Get the defined entry points
556 distribution = get_wheel_distribution(
557 FilesystemWheel(wheel_path),
558 canonicalize_name(name),
559 )
560 console, gui = get_entrypoints(distribution)
561
562 def is_entrypoint_wrapper(file: File) -> bool:
563 # EP, EP.exe and EP-script.py are scripts generated for
564 # entry point EP by setuptools
565 path = file.dest_path
566 name = os.path.basename(path)
567 if name.lower().endswith(".exe"):
568 matchname = name[:-4]
569 elif name.lower().endswith("-script.py"):
570 matchname = name[:-10]
571 elif name.lower().endswith(".pya"):
572 matchname = name[:-4]
573 else:
574 matchname = name
575 # Ignore setuptools-generated scripts
576 return matchname in console or matchname in gui
577
578 script_scheme_files: Iterator[File] = map(
579 make_data_scheme_file, script_scheme_paths
580 )
581 script_scheme_files = filterfalse(is_entrypoint_wrapper, script_scheme_files)
582 script_scheme_files = map(ScriptFile, script_scheme_files)
583 files = chain(files, script_scheme_files)
584
585 existing_parents = set()
586 for file in files:
587 # directory creation is lazy and after file filtering
588 # to ensure we don't install empty dirs; empty dirs can't be
589 # uninstalled.
590 parent_dir = os.path.dirname(file.dest_path)
591 if parent_dir not in existing_parents:
592 ensure_dir(parent_dir)
593 existing_parents.add(parent_dir)
594 file.save()
595 record_installed(file.src_record_path, file.dest_path, file.changed)
596
597 def pyc_source_file_paths() -> Generator[str, None, None]:
598 # We de-duplicate installation paths, since there can be overlap (e.g.
599 # file in .data maps to same location as file in wheel root).
600 # Sorting installation paths makes it easier to reproduce and debug
601 # issues related to permissions on existing files.
602 for installed_path in sorted(set(installed.values())):
603 full_installed_path = os.path.join(lib_dir, installed_path)
604 if not os.path.isfile(full_installed_path):
605 continue
606 if not full_installed_path.endswith(".py"):
607 continue
608 yield full_installed_path
609
610 def pyc_output_path(path: str) -> str:
611 """Return the path the pyc file would have been written to."""
612 return importlib.util.cache_from_source(path)
613
614 # Compile all of the pyc files for the installed files
615 if pycompile:
616 with contextlib.redirect_stdout(
617 StreamWrapper.from_stream(sys.stdout)
618 ) as stdout:
619 with warnings.catch_warnings():
620 warnings.filterwarnings("ignore")
621 for path in pyc_source_file_paths():
622 success = compileall.compile_file(path, force=True, quiet=True)
623 if success:
624 pyc_path = pyc_output_path(path)
625 assert os.path.exists(pyc_path)
626 pyc_record_path = cast(
627 "RecordPath", pyc_path.replace(os.path.sep, "/")
628 )
629 record_installed(pyc_record_path, pyc_path)
630 logger.debug(stdout.getvalue())
631
632 maker = PipScriptMaker(None, scheme.scripts)
633
634 # Ensure old scripts are overwritten.
635 # See https://github.com/pypa/pip/issues/1800
636 maker.clobber = True
637
638 # Ensure we don't generate any variants for scripts because this is almost
639 # never what somebody wants.
640 # See https://bitbucket.org/pypa/distlib/issue/35/
641 maker.variants = {""}
642
643 # This is required because otherwise distlib creates scripts that are not
644 # executable.
645 # See https://bitbucket.org/pypa/distlib/issue/32/
646 maker.set_mode = True
647
648 # Generate the console and GUI entry points specified in the wheel
649 scripts_to_generate = get_console_script_specs(console)
650
651 gui_scripts_to_generate = list(starmap("{} = {}".format, gui.items()))
652
653 generated_console_scripts = maker.make_multiple(scripts_to_generate)
654 generated.extend(generated_console_scripts)
655
656 generated.extend(maker.make_multiple(gui_scripts_to_generate, {"gui": True}))
657
658 if warn_script_location:
659 msg = message_about_scripts_not_on_PATH(generated_console_scripts)
660 if msg is not None:
661 logger.warning(msg)
662
663 generated_file_mode = 0o666 & ~current_umask()
664
665 @contextlib.contextmanager
666 def _generate_file(path: str, **kwargs: Any) -> Generator[BinaryIO, None, None]:
667 with adjacent_tmp_file(path, **kwargs) as f:
668 yield f
669 os.chmod(f.name, generated_file_mode)
670 replace(f.name, path)
671
672 dest_info_dir = os.path.join(lib_dir, info_dir)
673
674 # Record pip as the installer
675 installer_path = os.path.join(dest_info_dir, "INSTALLER")
676 with _generate_file(installer_path) as installer_file:
677 installer_file.write(b"pip\n")
678 generated.append(installer_path)
679
680 # Record the PEP 610 direct URL reference
681 if direct_url is not None:
682 direct_url_path = os.path.join(dest_info_dir, DIRECT_URL_METADATA_NAME)
683 with _generate_file(direct_url_path) as direct_url_file:
684 direct_url_file.write(direct_url.to_json().encode("utf-8"))
685 generated.append(direct_url_path)
686
687 # Record the REQUESTED file
688 if requested:
689 requested_path = os.path.join(dest_info_dir, "REQUESTED")
690 with open(requested_path, "wb"):
691 pass
692 generated.append(requested_path)
693
694 record_text = distribution.read_text("RECORD")
695 record_rows = list(csv.reader(record_text.splitlines()))
696
697 rows = get_csv_rows_for_installed(
698 record_rows,
699 installed=installed,
700 changed=changed,
701 generated=generated,
702 lib_dir=lib_dir,
703 )
704
705 # Record details of all files installed
706 record_path = os.path.join(dest_info_dir, "RECORD")
707
708 with _generate_file(record_path, **csv_io_kwargs("w")) as record_file:
709 # Explicitly cast to typing.IO[str] as a workaround for the mypy error:
710 # "writer" has incompatible type "BinaryIO"; expected "_Writer"
711 writer = csv.writer(cast("IO[str]", record_file))
712 writer.writerows(_normalized_outrows(rows))
713
714
715@contextlib.contextmanager
716def req_error_context(req_description: str) -> Generator[None, None, None]:
717 try:
718 yield
719 except InstallationError as e:
720 message = f"For req: {req_description}. {e.args[0]}"
721 raise InstallationError(message) from e
722
723
724def install_wheel(
725 name: str,
726 wheel_path: str,
727 scheme: Scheme,
728 req_description: str,
729 pycompile: bool = True,
730 warn_script_location: bool = True,
731 direct_url: DirectUrl | None = None,
732 requested: bool = False,
733) -> None:
734 with ZipFile(wheel_path, allowZip64=True) as z:
735 with req_error_context(req_description):
736 _install_wheel(
737 name=name,
738 wheel_zip=z,
739 wheel_path=wheel_path,
740 scheme=scheme,
741 pycompile=pycompile,
742 warn_script_location=warn_script_location,
743 direct_url=direct_url,
744 requested=requested,
745 )