Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pip/_internal/operations/install/wheel.py: 19%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

356 statements  

1"""Support for installing and building the "wheel" binary package format.""" 

2 

3from __future__ import annotations 

4 

5import collections 

6import compileall 

7import contextlib 

8import csv 

9import importlib 

10import logging 

11import os.path 

12import re 

13import shutil 

14import sys 

15import textwrap 

16import warnings 

17from base64 import urlsafe_b64encode 

18from collections.abc import Callable, Generator, Iterable, Iterator, Sequence 

19from email.message import Message 

20from itertools import chain, filterfalse, starmap 

21from pathlib import Path 

22from typing import ( 

23 IO, 

24 Any, 

25 BinaryIO, 

26 NewType, 

27 Protocol, 

28 cast, 

29) 

30from zipfile import ZipFile, ZipInfo 

31 

32from pip._vendor.distlib.scripts import ScriptMaker 

33from pip._vendor.distlib.util import get_export_entry 

34from pip._vendor.packaging.utils import canonicalize_name 

35 

36from pip._internal.exceptions import InstallationError 

37from pip._internal.locations import get_major_minor_version 

38from pip._internal.metadata import ( 

39 BaseDistribution, 

40 FilesystemWheel, 

41 get_wheel_distribution, 

42) 

43from pip._internal.models.direct_url import DIRECT_URL_METADATA_NAME, DirectUrl 

44from pip._internal.models.scheme import SCHEME_KEYS, Scheme 

45from pip._internal.utils.filesystem import adjacent_tmp_file, replace 

46from pip._internal.utils.misc import StreamWrapper, ensure_dir, hash_file, partition 

47from pip._internal.utils.unpacking import ( 

48 current_umask, 

49 is_within_directory, 

50 set_extracted_file_to_default_mode_plus_executable, 

51 zip_item_is_executable, 

52) 

53from pip._internal.utils.wheel import parse_wheel 

54 

55 

56class File(Protocol): 

57 src_record_path: RecordPath 

58 dest_path: str 

59 changed: bool 

60 

61 def save(self) -> None: 

62 pass 

63 

64 

65logger = logging.getLogger(__name__) 

66 

67RecordPath = NewType("RecordPath", str) 

68InstalledCSVRow = tuple[RecordPath, str, int | str] 

69 

70 

71def rehash(path: str, blocksize: int = 1 << 20) -> tuple[str, str]: 

72 """Return (encoded_digest, length) for path using hashlib.sha256()""" 

73 h, length = hash_file(path, blocksize) 

74 digest = "sha256=" + urlsafe_b64encode(h.digest()).decode("latin1").rstrip("=") 

75 return (digest, str(length)) 

76 

77 

78def csv_io_kwargs(mode: str) -> dict[str, Any]: 

79 """Return keyword arguments to properly open a CSV file 

80 in the given mode. 

81 """ 

82 return {"mode": mode, "newline": "", "encoding": "utf-8"} 

83 

84 

85def fix_script(path: str) -> bool: 

86 """Replace #!python with #!/path/to/python 

87 Return True if file was changed. 

88 """ 

89 # XXX RECORD hashes will need to be updated 

90 assert os.path.isfile(path) 

91 

92 with open(path, "rb") as script: 

93 firstline = script.readline() 

94 if not firstline.startswith(b"#!python"): 

95 return False 

96 exename = sys.executable.encode(sys.getfilesystemencoding()) 

97 firstline = b"#!" + exename + os.linesep.encode("ascii") 

98 rest = script.read() 

99 with open(path, "wb") as script: 

100 script.write(firstline) 

101 script.write(rest) 

102 return True 

103 

104 

105def wheel_root_is_purelib(metadata: Message) -> bool: 

106 return metadata.get("Root-Is-Purelib", "").lower() == "true" 

107 

108 

109def get_entrypoints(dist: BaseDistribution) -> tuple[dict[str, str], dict[str, str]]: 

110 console_scripts = {} 

111 gui_scripts = {} 

112 for entry_point in dist.iter_entry_points(): 

113 if entry_point.group == "console_scripts": 

114 console_scripts[entry_point.name] = entry_point.value 

115 elif entry_point.group == "gui_scripts": 

116 gui_scripts[entry_point.name] = entry_point.value 

117 return console_scripts, gui_scripts 

118 

119 

120def message_about_scripts_not_on_PATH(scripts: Sequence[str]) -> str | None: 

121 """Determine if any scripts are not on PATH and format a warning. 

122 Returns a warning message if one or more scripts are not on PATH, 

123 otherwise None. 

124 """ 

125 if not scripts: 

126 return None 

127 

128 # Group scripts by the path they were installed in 

129 grouped_by_dir: dict[Path, set[str]] = collections.defaultdict(set) 

130 for destfile in scripts: 

131 dest_path = Path(destfile) 

132 parent_dir = dest_path.parent.resolve() 

133 script_name = dest_path.name 

134 grouped_by_dir[parent_dir].add(script_name) 

135 

136 # We don't want to warn for directories that are on PATH. 

137 not_warn_dirs = [ 

138 Path(i).resolve() for i in os.environ.get("PATH", "").split(os.pathsep) 

139 ] 

140 # If an executable sits with sys.executable, we don't warn for it. 

141 # This covers the case of venv invocations without activating the venv. 

142 not_warn_dirs.append(Path(sys.executable).parent.resolve()) 

143 warn_for: dict[Path, set[str]] = { 

144 parent_dir: scripts 

145 for parent_dir, scripts in grouped_by_dir.items() 

146 if parent_dir not in not_warn_dirs 

147 } 

148 if not warn_for: 

149 return None 

150 

151 # Format a message 

152 msg_lines = [] 

153 for parent_dir, dir_scripts in warn_for.items(): 

154 sorted_scripts: list[str] = sorted(dir_scripts) 

155 if len(sorted_scripts) == 1: 

156 start_text = f"script {sorted_scripts[0]} is" 

157 else: 

158 start_text = "scripts {} are".format( 

159 ", ".join(sorted_scripts[:-1]) + " and " + sorted_scripts[-1] 

160 ) 

161 

162 msg_lines.append( 

163 f"The {start_text} installed in '{parent_dir}' which is not on PATH." 

164 ) 

165 

166 last_line_fmt = ( 

167 "Consider adding {} to PATH or, if you prefer " 

168 "to suppress this warning, use --no-warn-script-location." 

169 ) 

170 if len(msg_lines) == 1: 

171 msg_lines.append(last_line_fmt.format("this directory")) 

172 else: 

173 msg_lines.append(last_line_fmt.format("these directories")) 

174 

175 # Add a note if any directory starts with ~ 

176 warn_for_tilde = any( 

177 i[0] == "~" for i in os.environ.get("PATH", "").split(os.pathsep) if i 

178 ) 

179 if warn_for_tilde: 

180 tilde_warning_msg = ( 

181 "NOTE: The current PATH contains path(s) starting with `~`, " 

182 "which may not be expanded by all applications." 

183 ) 

184 msg_lines.append(tilde_warning_msg) 

185 

186 # Returns the formatted multiline message 

187 return "\n".join(msg_lines) 

188 

189 

190def _normalized_outrows( 

191 outrows: Iterable[InstalledCSVRow], 

192) -> list[tuple[str, str, str]]: 

193 """Normalize the given rows of a RECORD file. 

194 

195 Items in each row are converted into str. Rows are then sorted to make 

196 the value more predictable for tests. 

197 

198 Each row is a 3-tuple (path, hash, size) and corresponds to a record of 

199 a RECORD file (see PEP 376 and PEP 427 for details). For the rows 

200 passed to this function, the size can be an integer as an int or string, 

201 or the empty string. 

202 """ 

203 # Normally, there should only be one row per path, in which case the 

204 # second and third elements don't come into play when sorting. 

205 # However, in cases in the wild where a path might happen to occur twice, 

206 # we don't want the sort operation to trigger an error (but still want 

207 # determinism). Since the third element can be an int or string, we 

208 # coerce each element to a string to avoid a TypeError in this case. 

209 # For additional background, see-- 

210 # https://github.com/pypa/pip/issues/5868 

211 return sorted( 

212 (record_path, hash_, str(size)) for record_path, hash_, size in outrows 

213 ) 

214 

215 

216def _record_to_fs_path(record_path: RecordPath, lib_dir: str) -> str: 

217 return os.path.join(lib_dir, record_path) 

218 

219 

220def _fs_to_record_path(path: str, lib_dir: str) -> RecordPath: 

221 # On Windows, do not handle relative paths if they belong to different 

222 # logical disks 

223 if os.path.splitdrive(path)[0].lower() == os.path.splitdrive(lib_dir)[0].lower(): 

224 path = os.path.relpath(path, lib_dir) 

225 

226 path = path.replace(os.path.sep, "/") 

227 return cast("RecordPath", path) 

228 

229 

230def get_csv_rows_for_installed( 

231 old_csv_rows: list[list[str]], 

232 installed: dict[RecordPath, RecordPath], 

233 changed: set[RecordPath], 

234 generated: list[str], 

235 lib_dir: str, 

236) -> list[InstalledCSVRow]: 

237 """ 

238 :param installed: A map from archive RECORD path to installation RECORD 

239 path. 

240 """ 

241 installed_rows: list[InstalledCSVRow] = [] 

242 for row in old_csv_rows: 

243 if len(row) > 3: 

244 logger.warning("RECORD line has more than three elements: %s", row) 

245 old_record_path = cast("RecordPath", row[0]) 

246 new_record_path = installed.pop(old_record_path, old_record_path) 

247 if new_record_path in changed: 

248 digest, length = rehash(_record_to_fs_path(new_record_path, lib_dir)) 

249 else: 

250 digest = row[1] if len(row) > 1 else "" 

251 length = row[2] if len(row) > 2 else "" 

252 installed_rows.append((new_record_path, digest, length)) 

253 for f in generated: 

254 path = _fs_to_record_path(f, lib_dir) 

255 digest, length = rehash(f) 

256 installed_rows.append((path, digest, length)) 

257 return installed_rows + [ 

258 (installed_record_path, "", "") for installed_record_path in installed.values() 

259 ] 

260 

261 

262def get_console_script_specs(console: dict[str, str]) -> list[str]: 

263 """ 

264 Given the mapping from entrypoint name to callable, return the relevant 

265 console script specs. 

266 """ 

267 # Don't mutate caller's version 

268 console = console.copy() 

269 

270 scripts_to_generate = [] 

271 

272 # Special case pip and setuptools to generate versioned wrappers 

273 # 

274 # The issue is that some projects (specifically, pip and setuptools) use 

275 # code in setup.py to create "versioned" entry points - pip2.7 on Python 

276 # 2.7, pip3.3 on Python 3.3, etc. But these entry points are baked into 

277 # the wheel metadata at build time, and so if the wheel is installed with 

278 # a *different* version of Python the entry points will be wrong. The 

279 # correct fix for this is to enhance the metadata to be able to describe 

280 # such versioned entry points. 

281 # Currently, projects using versioned entry points will either have 

282 # incorrect versioned entry points, or they will not be able to distribute 

283 # "universal" wheels (i.e., they will need a wheel per Python version). 

284 # 

285 # Because setuptools and pip are bundled with _ensurepip and virtualenv, 

286 # we need to use universal wheels. As a workaround, we 

287 # override the versioned entry points in the wheel and generate the 

288 # correct ones. 

289 # 

290 # To add the level of hack in this section of code, in order to support 

291 # ensurepip this code will look for an ``ENSUREPIP_OPTIONS`` environment 

292 # variable which will control which version scripts get installed. 

293 # 

294 # ENSUREPIP_OPTIONS=altinstall 

295 # - Only pipX.Y and easy_install-X.Y will be generated and installed 

296 # ENSUREPIP_OPTIONS=install 

297 # - pipX.Y, pipX, easy_install-X.Y will be generated and installed. Note 

298 # that this option is technically if ENSUREPIP_OPTIONS is set and is 

299 # not altinstall 

300 # DEFAULT 

301 # - The default behavior is to install pip, pipX, pipX.Y, easy_install 

302 # and easy_install-X.Y. 

303 pip_script = console.pop("pip", None) 

304 if pip_script: 

305 if "ENSUREPIP_OPTIONS" not in os.environ: 

306 scripts_to_generate.append("pip = " + pip_script) 

307 

308 if os.environ.get("ENSUREPIP_OPTIONS", "") != "altinstall": 

309 scripts_to_generate.append(f"pip{sys.version_info[0]} = {pip_script}") 

310 

311 scripts_to_generate.append(f"pip{get_major_minor_version()} = {pip_script}") 

312 # Delete any other versioned pip entry points 

313 pip_ep = [k for k in console if re.match(r"pip(\d+(\.\d+)?)?$", k)] 

314 for k in pip_ep: 

315 del console[k] 

316 easy_install_script = console.pop("easy_install", None) 

317 if easy_install_script: 

318 if "ENSUREPIP_OPTIONS" not in os.environ: 

319 scripts_to_generate.append("easy_install = " + easy_install_script) 

320 

321 scripts_to_generate.append( 

322 f"easy_install-{get_major_minor_version()} = {easy_install_script}" 

323 ) 

324 # Delete any other versioned easy_install entry points 

325 easy_install_ep = [ 

326 k for k in console if re.match(r"easy_install(-\d+\.\d+)?$", k) 

327 ] 

328 for k in easy_install_ep: 

329 del console[k] 

330 

331 # Generate the console entry points specified in the wheel 

332 scripts_to_generate.extend(starmap("{} = {}".format, console.items())) 

333 

334 return scripts_to_generate 

335 

336 

337class ZipBackedFile: 

338 def __init__( 

339 self, src_record_path: RecordPath, dest_path: str, zip_file: ZipFile 

340 ) -> None: 

341 self.src_record_path = src_record_path 

342 self.dest_path = dest_path 

343 self._zip_file = zip_file 

344 self.changed = False 

345 

346 def _getinfo(self) -> ZipInfo: 

347 return self._zip_file.getinfo(self.src_record_path) 

348 

349 def save(self) -> None: 

350 # When we open the output file below, any existing file is truncated 

351 # before we start writing the new contents. This is fine in most 

352 # cases, but can cause a segfault if pip has loaded a shared 

353 # object (e.g. from pyopenssl through its vendored urllib3) 

354 # Since the shared object is mmap'd an attempt to call a 

355 # symbol in it will then cause a segfault. Unlinking the file 

356 # allows writing of new contents while allowing the process to 

357 # continue to use the old copy. 

358 if os.path.exists(self.dest_path): 

359 os.unlink(self.dest_path) 

360 

361 zipinfo = self._getinfo() 

362 

363 # optimization: the file is created by open(), 

364 # skip the decompression when there is 0 bytes to decompress. 

365 with open(self.dest_path, "wb") as dest: 

366 if zipinfo.file_size > 0: 

367 with self._zip_file.open(zipinfo) as f: 

368 blocksize = min(zipinfo.file_size, 1024 * 1024) 

369 shutil.copyfileobj(f, dest, blocksize) 

370 

371 if zip_item_is_executable(zipinfo): 

372 set_extracted_file_to_default_mode_plus_executable(self.dest_path) 

373 

374 

375class ScriptFile: 

376 def __init__(self, file: File) -> None: 

377 self._file = file 

378 self.src_record_path = self._file.src_record_path 

379 self.dest_path = self._file.dest_path 

380 self.changed = False 

381 

382 def save(self) -> None: 

383 self._file.save() 

384 self.changed = fix_script(self.dest_path) 

385 

386 

387class MissingCallableSuffix(InstallationError): 

388 def __init__(self, entry_point: str) -> None: 

389 super().__init__( 

390 f"Invalid script entry point: {entry_point} - A callable " 

391 "suffix is required. See https://packaging.python.org/" 

392 "specifications/entry-points/#use-for-scripts for more " 

393 "information." 

394 ) 

395 

396 

397def _raise_for_invalid_entrypoint(specification: str, scripts_dir: str) -> None: 

398 entry = get_export_entry(specification) 

399 if entry is None: 

400 return 

401 

402 if entry.suffix is None: 

403 raise MissingCallableSuffix(str(entry)) 

404 

405 # distlib joins the entry point name onto the scripts directory, so a name 

406 # with path separators or ``..`` components can resolve elsewhere. The script 

407 # must resolve to a path strictly inside the scripts directory. 

408 dest = os.path.join(scripts_dir, entry.name) 

409 resolves_to_scripts_dir = os.path.abspath(dest) == os.path.abspath(scripts_dir) 

410 if resolves_to_scripts_dir or not is_within_directory(scripts_dir, dest): 

411 raise InstallationError( 

412 f"Invalid script entry point name {entry.name!r}: the script " 

413 f"would be installed outside the scripts directory ({scripts_dir})." 

414 ) 

415 

416 

417class PipScriptMaker(ScriptMaker): 

418 # Override distlib's default script template with one that 

419 # doesn't import `re` module, allowing scripts to load faster. 

420 script_template = textwrap.dedent("""\ 

421 import sys 

422 from %(module)s import %(import_name)s 

423 if __name__ == '__main__': 

424 sys.argv[0] = sys.argv[0].removesuffix('.exe') 

425 sys.exit(%(func)s()) 

426""") 

427 

428 def make( 

429 self, specification: str, options: dict[str, Any] | None = None 

430 ) -> list[str]: 

431 _raise_for_invalid_entrypoint(specification, self.target_dir) 

432 return super().make(specification, options) 

433 

434 

435def _install_wheel( # noqa: C901, PLR0915 function is too long 

436 name: str, 

437 wheel_zip: ZipFile, 

438 wheel_path: str, 

439 scheme: Scheme, 

440 pycompile: bool = True, 

441 warn_script_location: bool = True, 

442 direct_url: DirectUrl | None = None, 

443 requested: bool = False, 

444) -> None: 

445 """Install a wheel. 

446 

447 :param name: Name of the project to install 

448 :param wheel_zip: open ZipFile for wheel being installed 

449 :param scheme: Distutils scheme dictating the install directories 

450 :param req_description: String used in place of the requirement, for 

451 logging 

452 :param pycompile: Whether to byte-compile installed Python files 

453 :param warn_script_location: Whether to check that scripts are installed 

454 into a directory on PATH 

455 :raises UnsupportedWheel: 

456 * when the directory holds an unpacked wheel with incompatible 

457 Wheel-Version 

458 * when the .dist-info dir does not match the wheel 

459 """ 

460 info_dir, metadata = parse_wheel(wheel_zip, name) 

461 

462 if wheel_root_is_purelib(metadata): 

463 lib_dir = scheme.purelib 

464 else: 

465 lib_dir = scheme.platlib 

466 

467 # Record details of the files moved 

468 # installed = files copied from the wheel to the destination 

469 # changed = files changed while installing (scripts #! line typically) 

470 # generated = files newly generated during the install (script wrappers) 

471 installed: dict[RecordPath, RecordPath] = {} 

472 changed: set[RecordPath] = set() 

473 generated: list[str] = [] 

474 

475 def record_installed( 

476 srcfile: RecordPath, destfile: str, modified: bool = False 

477 ) -> None: 

478 """Map archive RECORD paths to installation RECORD paths.""" 

479 newpath = _fs_to_record_path(destfile, lib_dir) 

480 installed[srcfile] = newpath 

481 if modified: 

482 changed.add(newpath) 

483 

484 def is_dir_path(path: RecordPath) -> bool: 

485 return path.endswith("/") 

486 

487 def assert_no_path_traversal(dest_dir_path: str, target_path: str) -> None: 

488 if not is_within_directory(dest_dir_path, target_path): 

489 message = ( 

490 "The wheel {!r} has a file {!r} trying to install" 

491 " outside the target directory {!r}" 

492 ) 

493 raise InstallationError( 

494 message.format(wheel_path, target_path, dest_dir_path) 

495 ) 

496 

497 def root_scheme_file_maker( 

498 zip_file: ZipFile, dest: str 

499 ) -> Callable[[RecordPath], File]: 

500 def make_root_scheme_file(record_path: RecordPath) -> File: 

501 normed_path = os.path.normpath(record_path) 

502 dest_path = os.path.join(dest, normed_path) 

503 assert_no_path_traversal(dest, dest_path) 

504 return ZipBackedFile(record_path, dest_path, zip_file) 

505 

506 return make_root_scheme_file 

507 

508 def data_scheme_file_maker( 

509 zip_file: ZipFile, scheme: Scheme 

510 ) -> Callable[[RecordPath], File]: 

511 scheme_paths = {key: getattr(scheme, key) for key in SCHEME_KEYS} 

512 

513 def make_data_scheme_file(record_path: RecordPath) -> File: 

514 normed_path = os.path.normpath(record_path) 

515 try: 

516 _, scheme_key, dest_subpath = normed_path.split(os.path.sep, 2) 

517 except ValueError: 

518 message = ( 

519 f"Unexpected file in {wheel_path}: {record_path!r}. .data directory" 

520 " contents should be named like: '<scheme key>/<path>'." 

521 ) 

522 raise InstallationError(message) 

523 

524 try: 

525 scheme_path = scheme_paths[scheme_key] 

526 except KeyError: 

527 valid_scheme_keys = ", ".join(sorted(scheme_paths)) 

528 message = ( 

529 f"Unknown scheme key used in {wheel_path}: {scheme_key} " 

530 f"(for file {record_path!r}). .data directory contents " 

531 f"should be in subdirectories named with a valid scheme " 

532 f"key ({valid_scheme_keys})" 

533 ) 

534 raise InstallationError(message) 

535 

536 dest_path = os.path.join(scheme_path, dest_subpath) 

537 assert_no_path_traversal(scheme_path, dest_path) 

538 return ZipBackedFile(record_path, dest_path, zip_file) 

539 

540 return make_data_scheme_file 

541 

542 def is_data_scheme_path(path: RecordPath) -> bool: 

543 return path.split("/", 1)[0].endswith(".data") 

544 

545 paths = cast(list[RecordPath], wheel_zip.namelist()) 

546 file_paths = filterfalse(is_dir_path, paths) 

547 root_scheme_paths, data_scheme_paths = partition(is_data_scheme_path, file_paths) 

548 

549 make_root_scheme_file = root_scheme_file_maker(wheel_zip, lib_dir) 

550 files: Iterator[File] = map(make_root_scheme_file, root_scheme_paths) 

551 

552 def is_script_scheme_path(path: RecordPath) -> bool: 

553 parts = path.split("/", 2) 

554 return len(parts) > 2 and parts[0].endswith(".data") and parts[1] == "scripts" 

555 

556 other_scheme_paths, script_scheme_paths = partition( 

557 is_script_scheme_path, data_scheme_paths 

558 ) 

559 

560 make_data_scheme_file = data_scheme_file_maker(wheel_zip, scheme) 

561 other_scheme_files = map(make_data_scheme_file, other_scheme_paths) 

562 files = chain(files, other_scheme_files) 

563 

564 # Get the defined entry points 

565 distribution = get_wheel_distribution( 

566 FilesystemWheel(wheel_path), 

567 canonicalize_name(name), 

568 ) 

569 console, gui = get_entrypoints(distribution) 

570 

571 def is_entrypoint_wrapper(file: File) -> bool: 

572 # EP, EP.exe and EP-script.py are scripts generated for 

573 # entry point EP by setuptools 

574 path = file.dest_path 

575 name = os.path.basename(path) 

576 if name.lower().endswith(".exe"): 

577 matchname = name[:-4] 

578 elif name.lower().endswith("-script.py"): 

579 matchname = name[:-10] 

580 elif name.lower().endswith(".pya"): 

581 matchname = name[:-4] 

582 else: 

583 matchname = name 

584 # Ignore setuptools-generated scripts 

585 return matchname in console or matchname in gui 

586 

587 script_scheme_files: Iterator[File] = map( 

588 make_data_scheme_file, script_scheme_paths 

589 ) 

590 script_scheme_files = filterfalse(is_entrypoint_wrapper, script_scheme_files) 

591 script_scheme_files = map(ScriptFile, script_scheme_files) 

592 files = chain(files, script_scheme_files) 

593 

594 existing_parents = set() 

595 for file in files: 

596 # directory creation is lazy and after file filtering 

597 # to ensure we don't install empty dirs; empty dirs can't be 

598 # uninstalled. 

599 parent_dir = os.path.dirname(file.dest_path) 

600 if parent_dir not in existing_parents: 

601 ensure_dir(parent_dir) 

602 existing_parents.add(parent_dir) 

603 file.save() 

604 record_installed(file.src_record_path, file.dest_path, file.changed) 

605 

606 def pyc_source_file_paths() -> Generator[str, None, None]: 

607 # We de-duplicate installation paths, since there can be overlap (e.g. 

608 # file in .data maps to same location as file in wheel root). 

609 # Sorting installation paths makes it easier to reproduce and debug 

610 # issues related to permissions on existing files. 

611 for installed_path in sorted(set(installed.values())): 

612 full_installed_path = os.path.join(lib_dir, installed_path) 

613 if not os.path.isfile(full_installed_path): 

614 continue 

615 if not full_installed_path.endswith(".py"): 

616 continue 

617 yield full_installed_path 

618 

619 def pyc_output_path(path: str) -> str: 

620 """Return the path the pyc file would have been written to.""" 

621 return importlib.util.cache_from_source(path) 

622 

623 # Compile all of the pyc files for the installed files 

624 if pycompile: 

625 with contextlib.redirect_stdout( 

626 StreamWrapper.from_stream(sys.stdout) 

627 ) as stdout: 

628 with warnings.catch_warnings(): 

629 warnings.filterwarnings("ignore") 

630 for path in pyc_source_file_paths(): 

631 success = compileall.compile_file(path, force=True, quiet=True) 

632 if success: 

633 pyc_path = pyc_output_path(path) 

634 assert os.path.exists(pyc_path) 

635 pyc_record_path = cast( 

636 "RecordPath", pyc_path.replace(os.path.sep, "/") 

637 ) 

638 record_installed(pyc_record_path, pyc_path) 

639 logger.debug(stdout.getvalue()) 

640 

641 maker = PipScriptMaker(None, scheme.scripts) 

642 

643 # Ensure old scripts are overwritten. 

644 # See https://github.com/pypa/pip/issues/1800 

645 maker.clobber = True 

646 

647 # Ensure we don't generate any variants for scripts because this is almost 

648 # never what somebody wants. 

649 # See https://bitbucket.org/pypa/distlib/issue/35/ 

650 maker.variants = {""} 

651 

652 # This is required because otherwise distlib creates scripts that are not 

653 # executable. 

654 # See https://bitbucket.org/pypa/distlib/issue/32/ 

655 maker.set_mode = True 

656 

657 # Generate the console and GUI entry points specified in the wheel 

658 scripts_to_generate = get_console_script_specs(console) 

659 

660 gui_scripts_to_generate = list(starmap("{} = {}".format, gui.items())) 

661 

662 generated_console_scripts = maker.make_multiple(scripts_to_generate) 

663 generated.extend(generated_console_scripts) 

664 

665 generated.extend(maker.make_multiple(gui_scripts_to_generate, {"gui": True})) 

666 

667 if warn_script_location: 

668 msg = message_about_scripts_not_on_PATH(generated_console_scripts) 

669 if msg is not None: 

670 logger.warning(msg) 

671 

672 generated_file_mode = 0o666 & ~current_umask() 

673 

674 @contextlib.contextmanager 

675 def _generate_file(path: str, **kwargs: Any) -> Generator[BinaryIO, None, None]: 

676 with adjacent_tmp_file(path, **kwargs) as f: 

677 yield f 

678 os.chmod(f.name, generated_file_mode) 

679 replace(f.name, path) 

680 

681 dest_info_dir = os.path.join(lib_dir, info_dir) 

682 

683 # Record pip as the installer 

684 installer_path = os.path.join(dest_info_dir, "INSTALLER") 

685 with _generate_file(installer_path) as installer_file: 

686 installer_file.write(b"pip\n") 

687 generated.append(installer_path) 

688 

689 # Record the PEP 610 direct URL reference 

690 if direct_url is not None: 

691 direct_url_path = os.path.join(dest_info_dir, DIRECT_URL_METADATA_NAME) 

692 with _generate_file(direct_url_path) as direct_url_file: 

693 direct_url_file.write(direct_url.to_json().encode("utf-8")) 

694 generated.append(direct_url_path) 

695 

696 # Record the REQUESTED file 

697 if requested: 

698 requested_path = os.path.join(dest_info_dir, "REQUESTED") 

699 with open(requested_path, "wb"): 

700 pass 

701 generated.append(requested_path) 

702 

703 record_text = distribution.read_text("RECORD") 

704 record_rows = list(csv.reader(record_text.splitlines())) 

705 

706 rows = get_csv_rows_for_installed( 

707 record_rows, 

708 installed=installed, 

709 changed=changed, 

710 generated=generated, 

711 lib_dir=lib_dir, 

712 ) 

713 

714 # Record details of all files installed 

715 record_path = os.path.join(dest_info_dir, "RECORD") 

716 

717 with _generate_file(record_path, **csv_io_kwargs("w")) as record_file: 

718 # Explicitly cast to typing.IO[str] as a workaround for the mypy error: 

719 # "writer" has incompatible type "BinaryIO"; expected "_Writer" 

720 writer = csv.writer(cast("IO[str]", record_file)) 

721 writer.writerows(_normalized_outrows(rows)) 

722 

723 

724@contextlib.contextmanager 

725def req_error_context(req_description: str) -> Generator[None, None, None]: 

726 try: 

727 yield 

728 except InstallationError as e: 

729 message = f"For req: {req_description}. {e.args[0]}" 

730 raise InstallationError(message) from e 

731 

732 

733def install_wheel( 

734 name: str, 

735 wheel_path: str, 

736 scheme: Scheme, 

737 req_description: str, 

738 pycompile: bool = True, 

739 warn_script_location: bool = True, 

740 direct_url: DirectUrl | None = None, 

741 requested: bool = False, 

742) -> None: 

743 with ZipFile(wheel_path, allowZip64=True) as z: 

744 with req_error_context(req_description): 

745 _install_wheel( 

746 name=name, 

747 wheel_zip=z, 

748 wheel_path=wheel_path, 

749 scheme=scheme, 

750 pycompile=pycompile, 

751 warn_script_location=warn_script_location, 

752 direct_url=direct_url, 

753 requested=requested, 

754 )