Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pip/_internal/operations/install/wheel.py: 20%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

348 statements  

1"""Support for installing and building the "wheel" binary package format.""" 

2 

3from __future__ import annotations 

4 

5import collections 

6import compileall 

7import contextlib 

8import csv 

9import importlib 

10import logging 

11import os.path 

12import re 

13import shutil 

14import sys 

15import textwrap 

16import warnings 

17from base64 import urlsafe_b64encode 

18from collections.abc import Generator, Iterable, Iterator, Sequence 

19from email.message import Message 

20from itertools import chain, filterfalse, starmap 

21from typing import ( 

22 IO, 

23 Any, 

24 BinaryIO, 

25 Callable, 

26 NewType, 

27 Protocol, 

28 Union, 

29 cast, 

30) 

31from zipfile import ZipFile, ZipInfo 

32 

33from pip._vendor.distlib.scripts import ScriptMaker 

34from pip._vendor.distlib.util import get_export_entry 

35from pip._vendor.packaging.utils import canonicalize_name 

36 

37from pip._internal.exceptions import InstallationError 

38from pip._internal.locations import get_major_minor_version 

39from pip._internal.metadata import ( 

40 BaseDistribution, 

41 FilesystemWheel, 

42 get_wheel_distribution, 

43) 

44from pip._internal.models.direct_url import DIRECT_URL_METADATA_NAME, DirectUrl 

45from pip._internal.models.scheme import SCHEME_KEYS, Scheme 

46from pip._internal.utils.filesystem import adjacent_tmp_file, replace 

47from pip._internal.utils.misc import StreamWrapper, ensure_dir, hash_file, partition 

48from pip._internal.utils.unpacking import ( 

49 current_umask, 

50 is_within_directory, 

51 set_extracted_file_to_default_mode_plus_executable, 

52 zip_item_is_executable, 

53) 

54from pip._internal.utils.wheel import parse_wheel 

55 

56 

57class File(Protocol): 

58 src_record_path: RecordPath 

59 dest_path: str 

60 changed: bool 

61 

62 def save(self) -> None: 

63 pass 

64 

65 

66logger = logging.getLogger(__name__) 

67 

68RecordPath = NewType("RecordPath", str) 

69InstalledCSVRow = tuple[RecordPath, str, Union[int, str]] 

70 

71 

72def rehash(path: str, blocksize: int = 1 << 20) -> tuple[str, str]: 

73 """Return (encoded_digest, length) for path using hashlib.sha256()""" 

74 h, length = hash_file(path, blocksize) 

75 digest = "sha256=" + urlsafe_b64encode(h.digest()).decode("latin1").rstrip("=") 

76 return (digest, str(length)) 

77 

78 

79def csv_io_kwargs(mode: str) -> dict[str, Any]: 

80 """Return keyword arguments to properly open a CSV file 

81 in the given mode. 

82 """ 

83 return {"mode": mode, "newline": "", "encoding": "utf-8"} 

84 

85 

86def fix_script(path: str) -> bool: 

87 """Replace #!python with #!/path/to/python 

88 Return True if file was changed. 

89 """ 

90 # XXX RECORD hashes will need to be updated 

91 assert os.path.isfile(path) 

92 

93 with open(path, "rb") as script: 

94 firstline = script.readline() 

95 if not firstline.startswith(b"#!python"): 

96 return False 

97 exename = sys.executable.encode(sys.getfilesystemencoding()) 

98 firstline = b"#!" + exename + os.linesep.encode("ascii") 

99 rest = script.read() 

100 with open(path, "wb") as script: 

101 script.write(firstline) 

102 script.write(rest) 

103 return True 

104 

105 

106def wheel_root_is_purelib(metadata: Message) -> bool: 

107 return metadata.get("Root-Is-Purelib", "").lower() == "true" 

108 

109 

110def get_entrypoints(dist: BaseDistribution) -> tuple[dict[str, str], dict[str, str]]: 

111 console_scripts = {} 

112 gui_scripts = {} 

113 for entry_point in dist.iter_entry_points(): 

114 if entry_point.group == "console_scripts": 

115 console_scripts[entry_point.name] = entry_point.value 

116 elif entry_point.group == "gui_scripts": 

117 gui_scripts[entry_point.name] = entry_point.value 

118 return console_scripts, gui_scripts 

119 

120 

121def message_about_scripts_not_on_PATH(scripts: Sequence[str]) -> str | None: 

122 """Determine if any scripts are not on PATH and format a warning. 

123 Returns a warning message if one or more scripts are not on PATH, 

124 otherwise None. 

125 """ 

126 if not scripts: 

127 return None 

128 

129 # Group scripts by the path they were installed in 

130 grouped_by_dir: dict[str, set[str]] = collections.defaultdict(set) 

131 for destfile in scripts: 

132 parent_dir = os.path.dirname(destfile) 

133 script_name = os.path.basename(destfile) 

134 grouped_by_dir[parent_dir].add(script_name) 

135 

136 # We don't want to warn for directories that are on PATH. 

137 not_warn_dirs = [ 

138 os.path.normcase(os.path.normpath(i)).rstrip(os.sep) 

139 for i in os.environ.get("PATH", "").split(os.pathsep) 

140 ] 

141 # If an executable sits with sys.executable, we don't warn for it. 

142 # This covers the case of venv invocations without activating the venv. 

143 not_warn_dirs.append( 

144 os.path.normcase(os.path.normpath(os.path.dirname(sys.executable))) 

145 ) 

146 warn_for: dict[str, set[str]] = { 

147 parent_dir: scripts 

148 for parent_dir, scripts in grouped_by_dir.items() 

149 if os.path.normcase(os.path.normpath(parent_dir)) not in not_warn_dirs 

150 } 

151 if not warn_for: 

152 return None 

153 

154 # Format a message 

155 msg_lines = [] 

156 for parent_dir, dir_scripts in warn_for.items(): 

157 sorted_scripts: list[str] = sorted(dir_scripts) 

158 if len(sorted_scripts) == 1: 

159 start_text = f"script {sorted_scripts[0]} is" 

160 else: 

161 start_text = "scripts {} are".format( 

162 ", ".join(sorted_scripts[:-1]) + " and " + sorted_scripts[-1] 

163 ) 

164 

165 msg_lines.append( 

166 f"The {start_text} installed in '{parent_dir}' which is not on PATH." 

167 ) 

168 

169 last_line_fmt = ( 

170 "Consider adding {} to PATH or, if you prefer " 

171 "to suppress this warning, use --no-warn-script-location." 

172 ) 

173 if len(msg_lines) == 1: 

174 msg_lines.append(last_line_fmt.format("this directory")) 

175 else: 

176 msg_lines.append(last_line_fmt.format("these directories")) 

177 

178 # Add a note if any directory starts with ~ 

179 warn_for_tilde = any( 

180 i[0] == "~" for i in os.environ.get("PATH", "").split(os.pathsep) if i 

181 ) 

182 if warn_for_tilde: 

183 tilde_warning_msg = ( 

184 "NOTE: The current PATH contains path(s) starting with `~`, " 

185 "which may not be expanded by all applications." 

186 ) 

187 msg_lines.append(tilde_warning_msg) 

188 

189 # Returns the formatted multiline message 

190 return "\n".join(msg_lines) 

191 

192 

193def _normalized_outrows( 

194 outrows: Iterable[InstalledCSVRow], 

195) -> list[tuple[str, str, str]]: 

196 """Normalize the given rows of a RECORD file. 

197 

198 Items in each row are converted into str. Rows are then sorted to make 

199 the value more predictable for tests. 

200 

201 Each row is a 3-tuple (path, hash, size) and corresponds to a record of 

202 a RECORD file (see PEP 376 and PEP 427 for details). For the rows 

203 passed to this function, the size can be an integer as an int or string, 

204 or the empty string. 

205 """ 

206 # Normally, there should only be one row per path, in which case the 

207 # second and third elements don't come into play when sorting. 

208 # However, in cases in the wild where a path might happen to occur twice, 

209 # we don't want the sort operation to trigger an error (but still want 

210 # determinism). Since the third element can be an int or string, we 

211 # coerce each element to a string to avoid a TypeError in this case. 

212 # For additional background, see-- 

213 # https://github.com/pypa/pip/issues/5868 

214 return sorted( 

215 (record_path, hash_, str(size)) for record_path, hash_, size in outrows 

216 ) 

217 

218 

219def _record_to_fs_path(record_path: RecordPath, lib_dir: str) -> str: 

220 return os.path.join(lib_dir, record_path) 

221 

222 

223def _fs_to_record_path(path: str, lib_dir: str) -> RecordPath: 

224 # On Windows, do not handle relative paths if they belong to different 

225 # logical disks 

226 if os.path.splitdrive(path)[0].lower() == os.path.splitdrive(lib_dir)[0].lower(): 

227 path = os.path.relpath(path, lib_dir) 

228 

229 path = path.replace(os.path.sep, "/") 

230 return cast("RecordPath", path) 

231 

232 

233def get_csv_rows_for_installed( 

234 old_csv_rows: list[list[str]], 

235 installed: dict[RecordPath, RecordPath], 

236 changed: set[RecordPath], 

237 generated: list[str], 

238 lib_dir: str, 

239) -> list[InstalledCSVRow]: 

240 """ 

241 :param installed: A map from archive RECORD path to installation RECORD 

242 path. 

243 """ 

244 installed_rows: list[InstalledCSVRow] = [] 

245 for row in old_csv_rows: 

246 if len(row) > 3: 

247 logger.warning("RECORD line has more than three elements: %s", row) 

248 old_record_path = cast("RecordPath", row[0]) 

249 new_record_path = installed.pop(old_record_path, old_record_path) 

250 if new_record_path in changed: 

251 digest, length = rehash(_record_to_fs_path(new_record_path, lib_dir)) 

252 else: 

253 digest = row[1] if len(row) > 1 else "" 

254 length = row[2] if len(row) > 2 else "" 

255 installed_rows.append((new_record_path, digest, length)) 

256 for f in generated: 

257 path = _fs_to_record_path(f, lib_dir) 

258 digest, length = rehash(f) 

259 installed_rows.append((path, digest, length)) 

260 return installed_rows + [ 

261 (installed_record_path, "", "") for installed_record_path in installed.values() 

262 ] 

263 

264 

265def get_console_script_specs(console: dict[str, str]) -> list[str]: 

266 """ 

267 Given the mapping from entrypoint name to callable, return the relevant 

268 console script specs. 

269 """ 

270 # Don't mutate caller's version 

271 console = console.copy() 

272 

273 scripts_to_generate = [] 

274 

275 # Special case pip and setuptools to generate versioned wrappers 

276 # 

277 # The issue is that some projects (specifically, pip and setuptools) use 

278 # code in setup.py to create "versioned" entry points - pip2.7 on Python 

279 # 2.7, pip3.3 on Python 3.3, etc. But these entry points are baked into 

280 # the wheel metadata at build time, and so if the wheel is installed with 

281 # a *different* version of Python the entry points will be wrong. The 

282 # correct fix for this is to enhance the metadata to be able to describe 

283 # such versioned entry points. 

284 # Currently, projects using versioned entry points will either have 

285 # incorrect versioned entry points, or they will not be able to distribute 

286 # "universal" wheels (i.e., they will need a wheel per Python version). 

287 # 

288 # Because setuptools and pip are bundled with _ensurepip and virtualenv, 

289 # we need to use universal wheels. As a workaround, we 

290 # override the versioned entry points in the wheel and generate the 

291 # correct ones. 

292 # 

293 # To add the level of hack in this section of code, in order to support 

294 # ensurepip this code will look for an ``ENSUREPIP_OPTIONS`` environment 

295 # variable which will control which version scripts get installed. 

296 # 

297 # ENSUREPIP_OPTIONS=altinstall 

298 # - Only pipX.Y and easy_install-X.Y will be generated and installed 

299 # ENSUREPIP_OPTIONS=install 

300 # - pipX.Y, pipX, easy_install-X.Y will be generated and installed. Note 

301 # that this option is technically if ENSUREPIP_OPTIONS is set and is 

302 # not altinstall 

303 # DEFAULT 

304 # - The default behavior is to install pip, pipX, pipX.Y, easy_install 

305 # and easy_install-X.Y. 

306 pip_script = console.pop("pip", None) 

307 if pip_script: 

308 if "ENSUREPIP_OPTIONS" not in os.environ: 

309 scripts_to_generate.append("pip = " + pip_script) 

310 

311 if os.environ.get("ENSUREPIP_OPTIONS", "") != "altinstall": 

312 scripts_to_generate.append(f"pip{sys.version_info[0]} = {pip_script}") 

313 

314 scripts_to_generate.append(f"pip{get_major_minor_version()} = {pip_script}") 

315 # Delete any other versioned pip entry points 

316 pip_ep = [k for k in console if re.match(r"pip(\d+(\.\d+)?)?$", k)] 

317 for k in pip_ep: 

318 del console[k] 

319 easy_install_script = console.pop("easy_install", None) 

320 if easy_install_script: 

321 if "ENSUREPIP_OPTIONS" not in os.environ: 

322 scripts_to_generate.append("easy_install = " + easy_install_script) 

323 

324 scripts_to_generate.append( 

325 f"easy_install-{get_major_minor_version()} = {easy_install_script}" 

326 ) 

327 # Delete any other versioned easy_install entry points 

328 easy_install_ep = [ 

329 k for k in console if re.match(r"easy_install(-\d+\.\d+)?$", k) 

330 ] 

331 for k in easy_install_ep: 

332 del console[k] 

333 

334 # Generate the console entry points specified in the wheel 

335 scripts_to_generate.extend(starmap("{} = {}".format, console.items())) 

336 

337 return scripts_to_generate 

338 

339 

340class ZipBackedFile: 

341 def __init__( 

342 self, src_record_path: RecordPath, dest_path: str, zip_file: ZipFile 

343 ) -> None: 

344 self.src_record_path = src_record_path 

345 self.dest_path = dest_path 

346 self._zip_file = zip_file 

347 self.changed = False 

348 

349 def _getinfo(self) -> ZipInfo: 

350 return self._zip_file.getinfo(self.src_record_path) 

351 

352 def save(self) -> None: 

353 # When we open the output file below, any existing file is truncated 

354 # before we start writing the new contents. This is fine in most 

355 # cases, but can cause a segfault if pip has loaded a shared 

356 # object (e.g. from pyopenssl through its vendored urllib3) 

357 # Since the shared object is mmap'd an attempt to call a 

358 # symbol in it will then cause a segfault. Unlinking the file 

359 # allows writing of new contents while allowing the process to 

360 # continue to use the old copy. 

361 if os.path.exists(self.dest_path): 

362 os.unlink(self.dest_path) 

363 

364 zipinfo = self._getinfo() 

365 

366 # optimization: the file is created by open(), 

367 # skip the decompression when there is 0 bytes to decompress. 

368 with open(self.dest_path, "wb") as dest: 

369 if zipinfo.file_size > 0: 

370 with self._zip_file.open(zipinfo) as f: 

371 blocksize = min(zipinfo.file_size, 1024 * 1024) 

372 shutil.copyfileobj(f, dest, blocksize) 

373 

374 if zip_item_is_executable(zipinfo): 

375 set_extracted_file_to_default_mode_plus_executable(self.dest_path) 

376 

377 

378class ScriptFile: 

379 def __init__(self, file: File) -> None: 

380 self._file = file 

381 self.src_record_path = self._file.src_record_path 

382 self.dest_path = self._file.dest_path 

383 self.changed = False 

384 

385 def save(self) -> None: 

386 self._file.save() 

387 self.changed = fix_script(self.dest_path) 

388 

389 

390class MissingCallableSuffix(InstallationError): 

391 def __init__(self, entry_point: str) -> None: 

392 super().__init__( 

393 f"Invalid script entry point: {entry_point} - A callable " 

394 "suffix is required. See https://packaging.python.org/" 

395 "specifications/entry-points/#use-for-scripts for more " 

396 "information." 

397 ) 

398 

399 

400def _raise_for_invalid_entrypoint(specification: str) -> None: 

401 entry = get_export_entry(specification) 

402 if entry is not None and entry.suffix is None: 

403 raise MissingCallableSuffix(str(entry)) 

404 

405 

406class PipScriptMaker(ScriptMaker): 

407 # Override distlib's default script template with one that 

408 # doesn't import `re` module, allowing scripts to load faster. 

409 script_template = textwrap.dedent( 

410 """\ 

411 import sys 

412 from %(module)s import %(import_name)s 

413 if __name__ == '__main__': 

414 sys.argv[0] = sys.argv[0].removesuffix('.exe') 

415 sys.exit(%(func)s()) 

416""" 

417 ) 

418 

419 def make( 

420 self, specification: str, options: dict[str, Any] | None = None 

421 ) -> list[str]: 

422 _raise_for_invalid_entrypoint(specification) 

423 return super().make(specification, options) 

424 

425 

426def _install_wheel( # noqa: C901, PLR0915 function is too long 

427 name: str, 

428 wheel_zip: ZipFile, 

429 wheel_path: str, 

430 scheme: Scheme, 

431 pycompile: bool = True, 

432 warn_script_location: bool = True, 

433 direct_url: DirectUrl | None = None, 

434 requested: bool = False, 

435) -> None: 

436 """Install a wheel. 

437 

438 :param name: Name of the project to install 

439 :param wheel_zip: open ZipFile for wheel being installed 

440 :param scheme: Distutils scheme dictating the install directories 

441 :param req_description: String used in place of the requirement, for 

442 logging 

443 :param pycompile: Whether to byte-compile installed Python files 

444 :param warn_script_location: Whether to check that scripts are installed 

445 into a directory on PATH 

446 :raises UnsupportedWheel: 

447 * when the directory holds an unpacked wheel with incompatible 

448 Wheel-Version 

449 * when the .dist-info dir does not match the wheel 

450 """ 

451 info_dir, metadata = parse_wheel(wheel_zip, name) 

452 

453 if wheel_root_is_purelib(metadata): 

454 lib_dir = scheme.purelib 

455 else: 

456 lib_dir = scheme.platlib 

457 

458 # Record details of the files moved 

459 # installed = files copied from the wheel to the destination 

460 # changed = files changed while installing (scripts #! line typically) 

461 # generated = files newly generated during the install (script wrappers) 

462 installed: dict[RecordPath, RecordPath] = {} 

463 changed: set[RecordPath] = set() 

464 generated: list[str] = [] 

465 

466 def record_installed( 

467 srcfile: RecordPath, destfile: str, modified: bool = False 

468 ) -> None: 

469 """Map archive RECORD paths to installation RECORD paths.""" 

470 newpath = _fs_to_record_path(destfile, lib_dir) 

471 installed[srcfile] = newpath 

472 if modified: 

473 changed.add(newpath) 

474 

475 def is_dir_path(path: RecordPath) -> bool: 

476 return path.endswith("/") 

477 

478 def assert_no_path_traversal(dest_dir_path: str, target_path: str) -> None: 

479 if not is_within_directory(dest_dir_path, target_path): 

480 message = ( 

481 "The wheel {!r} has a file {!r} trying to install" 

482 " outside the target directory {!r}" 

483 ) 

484 raise InstallationError( 

485 message.format(wheel_path, target_path, dest_dir_path) 

486 ) 

487 

488 def root_scheme_file_maker( 

489 zip_file: ZipFile, dest: str 

490 ) -> Callable[[RecordPath], File]: 

491 def make_root_scheme_file(record_path: RecordPath) -> File: 

492 normed_path = os.path.normpath(record_path) 

493 dest_path = os.path.join(dest, normed_path) 

494 assert_no_path_traversal(dest, dest_path) 

495 return ZipBackedFile(record_path, dest_path, zip_file) 

496 

497 return make_root_scheme_file 

498 

499 def data_scheme_file_maker( 

500 zip_file: ZipFile, scheme: Scheme 

501 ) -> Callable[[RecordPath], File]: 

502 scheme_paths = {key: getattr(scheme, key) for key in SCHEME_KEYS} 

503 

504 def make_data_scheme_file(record_path: RecordPath) -> File: 

505 normed_path = os.path.normpath(record_path) 

506 try: 

507 _, scheme_key, dest_subpath = normed_path.split(os.path.sep, 2) 

508 except ValueError: 

509 message = ( 

510 f"Unexpected file in {wheel_path}: {record_path!r}. .data directory" 

511 " contents should be named like: '<scheme key>/<path>'." 

512 ) 

513 raise InstallationError(message) 

514 

515 try: 

516 scheme_path = scheme_paths[scheme_key] 

517 except KeyError: 

518 valid_scheme_keys = ", ".join(sorted(scheme_paths)) 

519 message = ( 

520 f"Unknown scheme key used in {wheel_path}: {scheme_key} " 

521 f"(for file {record_path!r}). .data directory contents " 

522 f"should be in subdirectories named with a valid scheme " 

523 f"key ({valid_scheme_keys})" 

524 ) 

525 raise InstallationError(message) 

526 

527 dest_path = os.path.join(scheme_path, dest_subpath) 

528 assert_no_path_traversal(scheme_path, dest_path) 

529 return ZipBackedFile(record_path, dest_path, zip_file) 

530 

531 return make_data_scheme_file 

532 

533 def is_data_scheme_path(path: RecordPath) -> bool: 

534 return path.split("/", 1)[0].endswith(".data") 

535 

536 paths = cast(list[RecordPath], wheel_zip.namelist()) 

537 file_paths = filterfalse(is_dir_path, paths) 

538 root_scheme_paths, data_scheme_paths = partition(is_data_scheme_path, file_paths) 

539 

540 make_root_scheme_file = root_scheme_file_maker(wheel_zip, lib_dir) 

541 files: Iterator[File] = map(make_root_scheme_file, root_scheme_paths) 

542 

543 def is_script_scheme_path(path: RecordPath) -> bool: 

544 parts = path.split("/", 2) 

545 return len(parts) > 2 and parts[0].endswith(".data") and parts[1] == "scripts" 

546 

547 other_scheme_paths, script_scheme_paths = partition( 

548 is_script_scheme_path, data_scheme_paths 

549 ) 

550 

551 make_data_scheme_file = data_scheme_file_maker(wheel_zip, scheme) 

552 other_scheme_files = map(make_data_scheme_file, other_scheme_paths) 

553 files = chain(files, other_scheme_files) 

554 

555 # Get the defined entry points 

556 distribution = get_wheel_distribution( 

557 FilesystemWheel(wheel_path), 

558 canonicalize_name(name), 

559 ) 

560 console, gui = get_entrypoints(distribution) 

561 

562 def is_entrypoint_wrapper(file: File) -> bool: 

563 # EP, EP.exe and EP-script.py are scripts generated for 

564 # entry point EP by setuptools 

565 path = file.dest_path 

566 name = os.path.basename(path) 

567 if name.lower().endswith(".exe"): 

568 matchname = name[:-4] 

569 elif name.lower().endswith("-script.py"): 

570 matchname = name[:-10] 

571 elif name.lower().endswith(".pya"): 

572 matchname = name[:-4] 

573 else: 

574 matchname = name 

575 # Ignore setuptools-generated scripts 

576 return matchname in console or matchname in gui 

577 

578 script_scheme_files: Iterator[File] = map( 

579 make_data_scheme_file, script_scheme_paths 

580 ) 

581 script_scheme_files = filterfalse(is_entrypoint_wrapper, script_scheme_files) 

582 script_scheme_files = map(ScriptFile, script_scheme_files) 

583 files = chain(files, script_scheme_files) 

584 

585 existing_parents = set() 

586 for file in files: 

587 # directory creation is lazy and after file filtering 

588 # to ensure we don't install empty dirs; empty dirs can't be 

589 # uninstalled. 

590 parent_dir = os.path.dirname(file.dest_path) 

591 if parent_dir not in existing_parents: 

592 ensure_dir(parent_dir) 

593 existing_parents.add(parent_dir) 

594 file.save() 

595 record_installed(file.src_record_path, file.dest_path, file.changed) 

596 

597 def pyc_source_file_paths() -> Generator[str, None, None]: 

598 # We de-duplicate installation paths, since there can be overlap (e.g. 

599 # file in .data maps to same location as file in wheel root). 

600 # Sorting installation paths makes it easier to reproduce and debug 

601 # issues related to permissions on existing files. 

602 for installed_path in sorted(set(installed.values())): 

603 full_installed_path = os.path.join(lib_dir, installed_path) 

604 if not os.path.isfile(full_installed_path): 

605 continue 

606 if not full_installed_path.endswith(".py"): 

607 continue 

608 yield full_installed_path 

609 

610 def pyc_output_path(path: str) -> str: 

611 """Return the path the pyc file would have been written to.""" 

612 return importlib.util.cache_from_source(path) 

613 

614 # Compile all of the pyc files for the installed files 

615 if pycompile: 

616 with contextlib.redirect_stdout( 

617 StreamWrapper.from_stream(sys.stdout) 

618 ) as stdout: 

619 with warnings.catch_warnings(): 

620 warnings.filterwarnings("ignore") 

621 for path in pyc_source_file_paths(): 

622 success = compileall.compile_file(path, force=True, quiet=True) 

623 if success: 

624 pyc_path = pyc_output_path(path) 

625 assert os.path.exists(pyc_path) 

626 pyc_record_path = cast( 

627 "RecordPath", pyc_path.replace(os.path.sep, "/") 

628 ) 

629 record_installed(pyc_record_path, pyc_path) 

630 logger.debug(stdout.getvalue()) 

631 

632 maker = PipScriptMaker(None, scheme.scripts) 

633 

634 # Ensure old scripts are overwritten. 

635 # See https://github.com/pypa/pip/issues/1800 

636 maker.clobber = True 

637 

638 # Ensure we don't generate any variants for scripts because this is almost 

639 # never what somebody wants. 

640 # See https://bitbucket.org/pypa/distlib/issue/35/ 

641 maker.variants = {""} 

642 

643 # This is required because otherwise distlib creates scripts that are not 

644 # executable. 

645 # See https://bitbucket.org/pypa/distlib/issue/32/ 

646 maker.set_mode = True 

647 

648 # Generate the console and GUI entry points specified in the wheel 

649 scripts_to_generate = get_console_script_specs(console) 

650 

651 gui_scripts_to_generate = list(starmap("{} = {}".format, gui.items())) 

652 

653 generated_console_scripts = maker.make_multiple(scripts_to_generate) 

654 generated.extend(generated_console_scripts) 

655 

656 generated.extend(maker.make_multiple(gui_scripts_to_generate, {"gui": True})) 

657 

658 if warn_script_location: 

659 msg = message_about_scripts_not_on_PATH(generated_console_scripts) 

660 if msg is not None: 

661 logger.warning(msg) 

662 

663 generated_file_mode = 0o666 & ~current_umask() 

664 

665 @contextlib.contextmanager 

666 def _generate_file(path: str, **kwargs: Any) -> Generator[BinaryIO, None, None]: 

667 with adjacent_tmp_file(path, **kwargs) as f: 

668 yield f 

669 os.chmod(f.name, generated_file_mode) 

670 replace(f.name, path) 

671 

672 dest_info_dir = os.path.join(lib_dir, info_dir) 

673 

674 # Record pip as the installer 

675 installer_path = os.path.join(dest_info_dir, "INSTALLER") 

676 with _generate_file(installer_path) as installer_file: 

677 installer_file.write(b"pip\n") 

678 generated.append(installer_path) 

679 

680 # Record the PEP 610 direct URL reference 

681 if direct_url is not None: 

682 direct_url_path = os.path.join(dest_info_dir, DIRECT_URL_METADATA_NAME) 

683 with _generate_file(direct_url_path) as direct_url_file: 

684 direct_url_file.write(direct_url.to_json().encode("utf-8")) 

685 generated.append(direct_url_path) 

686 

687 # Record the REQUESTED file 

688 if requested: 

689 requested_path = os.path.join(dest_info_dir, "REQUESTED") 

690 with open(requested_path, "wb"): 

691 pass 

692 generated.append(requested_path) 

693 

694 record_text = distribution.read_text("RECORD") 

695 record_rows = list(csv.reader(record_text.splitlines())) 

696 

697 rows = get_csv_rows_for_installed( 

698 record_rows, 

699 installed=installed, 

700 changed=changed, 

701 generated=generated, 

702 lib_dir=lib_dir, 

703 ) 

704 

705 # Record details of all files installed 

706 record_path = os.path.join(dest_info_dir, "RECORD") 

707 

708 with _generate_file(record_path, **csv_io_kwargs("w")) as record_file: 

709 # Explicitly cast to typing.IO[str] as a workaround for the mypy error: 

710 # "writer" has incompatible type "BinaryIO"; expected "_Writer" 

711 writer = csv.writer(cast("IO[str]", record_file)) 

712 writer.writerows(_normalized_outrows(rows)) 

713 

714 

715@contextlib.contextmanager 

716def req_error_context(req_description: str) -> Generator[None, None, None]: 

717 try: 

718 yield 

719 except InstallationError as e: 

720 message = f"For req: {req_description}. {e.args[0]}" 

721 raise InstallationError(message) from e 

722 

723 

724def install_wheel( 

725 name: str, 

726 wheel_path: str, 

727 scheme: Scheme, 

728 req_description: str, 

729 pycompile: bool = True, 

730 warn_script_location: bool = True, 

731 direct_url: DirectUrl | None = None, 

732 requested: bool = False, 

733) -> None: 

734 with ZipFile(wheel_path, allowZip64=True) as z: 

735 with req_error_context(req_description): 

736 _install_wheel( 

737 name=name, 

738 wheel_zip=z, 

739 wheel_path=wheel_path, 

740 scheme=scheme, 

741 pycompile=pycompile, 

742 warn_script_location=warn_script_location, 

743 direct_url=direct_url, 

744 requested=requested, 

745 )