Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pip/_internal/operations/install/wheel.py: 17%

342 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:48 +0000

1"""Support for installing and building the "wheel" binary package format. 

2""" 

3 

4import collections 

5import compileall 

6import contextlib 

7import csv 

8import importlib 

9import logging 

10import os.path 

11import re 

12import shutil 

13import sys 

14import warnings 

15from base64 import urlsafe_b64encode 

16from email.message import Message 

17from itertools import chain, filterfalse, starmap 

18from typing import ( 

19 IO, 

20 TYPE_CHECKING, 

21 Any, 

22 BinaryIO, 

23 Callable, 

24 Dict, 

25 Generator, 

26 Iterable, 

27 Iterator, 

28 List, 

29 NewType, 

30 Optional, 

31 Sequence, 

32 Set, 

33 Tuple, 

34 Union, 

35 cast, 

36) 

37from zipfile import ZipFile, ZipInfo 

38 

39from pip._vendor.distlib.scripts import ScriptMaker 

40from pip._vendor.distlib.util import get_export_entry 

41from pip._vendor.packaging.utils import canonicalize_name 

42 

43from pip._internal.exceptions import InstallationError 

44from pip._internal.locations import get_major_minor_version 

45from pip._internal.metadata import ( 

46 BaseDistribution, 

47 FilesystemWheel, 

48 get_wheel_distribution, 

49) 

50from pip._internal.models.direct_url import DIRECT_URL_METADATA_NAME, DirectUrl 

51from pip._internal.models.scheme import SCHEME_KEYS, Scheme 

52from pip._internal.utils.filesystem import adjacent_tmp_file, replace 

53from pip._internal.utils.misc import captured_stdout, ensure_dir, hash_file, partition 

54from pip._internal.utils.unpacking import ( 

55 current_umask, 

56 is_within_directory, 

57 set_extracted_file_to_default_mode_plus_executable, 

58 zip_item_is_executable, 

59) 

60from pip._internal.utils.wheel import parse_wheel 

61 

62if TYPE_CHECKING: 

63 from typing import Protocol 

64 

65 class File(Protocol): 

66 src_record_path: "RecordPath" 

67 dest_path: str 

68 changed: bool 

69 

70 def save(self) -> None: 

71 pass 

72 

73 

74logger = logging.getLogger(__name__) 

75 

76RecordPath = NewType("RecordPath", str) 

77InstalledCSVRow = Tuple[RecordPath, str, Union[int, str]] 

78 

79 

80def rehash(path: str, blocksize: int = 1 << 20) -> Tuple[str, str]: 

81 """Return (encoded_digest, length) for path using hashlib.sha256()""" 

82 h, length = hash_file(path, blocksize) 

83 digest = "sha256=" + urlsafe_b64encode(h.digest()).decode("latin1").rstrip("=") 

84 return (digest, str(length)) 

85 

86 

87def csv_io_kwargs(mode: str) -> Dict[str, Any]: 

88 """Return keyword arguments to properly open a CSV file 

89 in the given mode. 

90 """ 

91 return {"mode": mode, "newline": "", "encoding": "utf-8"} 

92 

93 

94def fix_script(path: str) -> bool: 

95 """Replace #!python with #!/path/to/python 

96 Return True if file was changed. 

97 """ 

98 # XXX RECORD hashes will need to be updated 

99 assert os.path.isfile(path) 

100 

101 with open(path, "rb") as script: 

102 firstline = script.readline() 

103 if not firstline.startswith(b"#!python"): 

104 return False 

105 exename = sys.executable.encode(sys.getfilesystemencoding()) 

106 firstline = b"#!" + exename + os.linesep.encode("ascii") 

107 rest = script.read() 

108 with open(path, "wb") as script: 

109 script.write(firstline) 

110 script.write(rest) 

111 return True 

112 

113 

114def wheel_root_is_purelib(metadata: Message) -> bool: 

115 return metadata.get("Root-Is-Purelib", "").lower() == "true" 

116 

117 

118def get_entrypoints(dist: BaseDistribution) -> Tuple[Dict[str, str], Dict[str, str]]: 

119 console_scripts = {} 

120 gui_scripts = {} 

121 for entry_point in dist.iter_entry_points(): 

122 if entry_point.group == "console_scripts": 

123 console_scripts[entry_point.name] = entry_point.value 

124 elif entry_point.group == "gui_scripts": 

125 gui_scripts[entry_point.name] = entry_point.value 

126 return console_scripts, gui_scripts 

127 

128 

129def message_about_scripts_not_on_PATH(scripts: Sequence[str]) -> Optional[str]: 

130 """Determine if any scripts are not on PATH and format a warning. 

131 Returns a warning message if one or more scripts are not on PATH, 

132 otherwise None. 

133 """ 

134 if not scripts: 

135 return None 

136 

137 # Group scripts by the path they were installed in 

138 grouped_by_dir: Dict[str, Set[str]] = collections.defaultdict(set) 

139 for destfile in scripts: 

140 parent_dir = os.path.dirname(destfile) 

141 script_name = os.path.basename(destfile) 

142 grouped_by_dir[parent_dir].add(script_name) 

143 

144 # We don't want to warn for directories that are on PATH. 

145 not_warn_dirs = [ 

146 os.path.normcase(os.path.normpath(i)).rstrip(os.sep) 

147 for i in os.environ.get("PATH", "").split(os.pathsep) 

148 ] 

149 # If an executable sits with sys.executable, we don't warn for it. 

150 # This covers the case of venv invocations without activating the venv. 

151 not_warn_dirs.append( 

152 os.path.normcase(os.path.normpath(os.path.dirname(sys.executable))) 

153 ) 

154 warn_for: Dict[str, Set[str]] = { 

155 parent_dir: scripts 

156 for parent_dir, scripts in grouped_by_dir.items() 

157 if os.path.normcase(os.path.normpath(parent_dir)) not in not_warn_dirs 

158 } 

159 if not warn_for: 

160 return None 

161 

162 # Format a message 

163 msg_lines = [] 

164 for parent_dir, dir_scripts in warn_for.items(): 

165 sorted_scripts: List[str] = sorted(dir_scripts) 

166 if len(sorted_scripts) == 1: 

167 start_text = "script {} is".format(sorted_scripts[0]) 

168 else: 

169 start_text = "scripts {} are".format( 

170 ", ".join(sorted_scripts[:-1]) + " and " + sorted_scripts[-1] 

171 ) 

172 

173 msg_lines.append( 

174 "The {} installed in '{}' which is not on PATH.".format( 

175 start_text, parent_dir 

176 ) 

177 ) 

178 

179 last_line_fmt = ( 

180 "Consider adding {} to PATH or, if you prefer " 

181 "to suppress this warning, use --no-warn-script-location." 

182 ) 

183 if len(msg_lines) == 1: 

184 msg_lines.append(last_line_fmt.format("this directory")) 

185 else: 

186 msg_lines.append(last_line_fmt.format("these directories")) 

187 

188 # Add a note if any directory starts with ~ 

189 warn_for_tilde = any( 

190 i[0] == "~" for i in os.environ.get("PATH", "").split(os.pathsep) if i 

191 ) 

192 if warn_for_tilde: 

193 tilde_warning_msg = ( 

194 "NOTE: The current PATH contains path(s) starting with `~`, " 

195 "which may not be expanded by all applications." 

196 ) 

197 msg_lines.append(tilde_warning_msg) 

198 

199 # Returns the formatted multiline message 

200 return "\n".join(msg_lines) 

201 

202 

203def _normalized_outrows( 

204 outrows: Iterable[InstalledCSVRow], 

205) -> List[Tuple[str, str, str]]: 

206 """Normalize the given rows of a RECORD file. 

207 

208 Items in each row are converted into str. Rows are then sorted to make 

209 the value more predictable for tests. 

210 

211 Each row is a 3-tuple (path, hash, size) and corresponds to a record of 

212 a RECORD file (see PEP 376 and PEP 427 for details). For the rows 

213 passed to this function, the size can be an integer as an int or string, 

214 or the empty string. 

215 """ 

216 # Normally, there should only be one row per path, in which case the 

217 # second and third elements don't come into play when sorting. 

218 # However, in cases in the wild where a path might happen to occur twice, 

219 # we don't want the sort operation to trigger an error (but still want 

220 # determinism). Since the third element can be an int or string, we 

221 # coerce each element to a string to avoid a TypeError in this case. 

222 # For additional background, see-- 

223 # https://github.com/pypa/pip/issues/5868 

224 return sorted( 

225 (record_path, hash_, str(size)) for record_path, hash_, size in outrows 

226 ) 

227 

228 

229def _record_to_fs_path(record_path: RecordPath, lib_dir: str) -> str: 

230 return os.path.join(lib_dir, record_path) 

231 

232 

233def _fs_to_record_path(path: str, lib_dir: str) -> RecordPath: 

234 # On Windows, do not handle relative paths if they belong to different 

235 # logical disks 

236 if os.path.splitdrive(path)[0].lower() == os.path.splitdrive(lib_dir)[0].lower(): 

237 path = os.path.relpath(path, lib_dir) 

238 

239 path = path.replace(os.path.sep, "/") 

240 return cast("RecordPath", path) 

241 

242 

243def get_csv_rows_for_installed( 

244 old_csv_rows: List[List[str]], 

245 installed: Dict[RecordPath, RecordPath], 

246 changed: Set[RecordPath], 

247 generated: List[str], 

248 lib_dir: str, 

249) -> List[InstalledCSVRow]: 

250 """ 

251 :param installed: A map from archive RECORD path to installation RECORD 

252 path. 

253 """ 

254 installed_rows: List[InstalledCSVRow] = [] 

255 for row in old_csv_rows: 

256 if len(row) > 3: 

257 logger.warning("RECORD line has more than three elements: %s", row) 

258 old_record_path = cast("RecordPath", row[0]) 

259 new_record_path = installed.pop(old_record_path, old_record_path) 

260 if new_record_path in changed: 

261 digest, length = rehash(_record_to_fs_path(new_record_path, lib_dir)) 

262 else: 

263 digest = row[1] if len(row) > 1 else "" 

264 length = row[2] if len(row) > 2 else "" 

265 installed_rows.append((new_record_path, digest, length)) 

266 for f in generated: 

267 path = _fs_to_record_path(f, lib_dir) 

268 digest, length = rehash(f) 

269 installed_rows.append((path, digest, length)) 

270 for installed_record_path in installed.values(): 

271 installed_rows.append((installed_record_path, "", "")) 

272 return installed_rows 

273 

274 

275def get_console_script_specs(console: Dict[str, str]) -> List[str]: 

276 """ 

277 Given the mapping from entrypoint name to callable, return the relevant 

278 console script specs. 

279 """ 

280 # Don't mutate caller's version 

281 console = console.copy() 

282 

283 scripts_to_generate = [] 

284 

285 # Special case pip and setuptools to generate versioned wrappers 

286 # 

287 # The issue is that some projects (specifically, pip and setuptools) use 

288 # code in setup.py to create "versioned" entry points - pip2.7 on Python 

289 # 2.7, pip3.3 on Python 3.3, etc. But these entry points are baked into 

290 # the wheel metadata at build time, and so if the wheel is installed with 

291 # a *different* version of Python the entry points will be wrong. The 

292 # correct fix for this is to enhance the metadata to be able to describe 

293 # such versioned entry points, but that won't happen till Metadata 2.0 is 

294 # available. 

295 # In the meantime, projects using versioned entry points will either have 

296 # incorrect versioned entry points, or they will not be able to distribute 

297 # "universal" wheels (i.e., they will need a wheel per Python version). 

298 # 

299 # Because setuptools and pip are bundled with _ensurepip and virtualenv, 

300 # we need to use universal wheels. So, as a stopgap until Metadata 2.0, we 

301 # override the versioned entry points in the wheel and generate the 

302 # correct ones. This code is purely a short-term measure until Metadata 2.0 

303 # is available. 

304 # 

305 # To add the level of hack in this section of code, in order to support 

306 # ensurepip this code will look for an ``ENSUREPIP_OPTIONS`` environment 

307 # variable which will control which version scripts get installed. 

308 # 

309 # ENSUREPIP_OPTIONS=altinstall 

310 # - Only pipX.Y and easy_install-X.Y will be generated and installed 

311 # ENSUREPIP_OPTIONS=install 

312 # - pipX.Y, pipX, easy_install-X.Y will be generated and installed. Note 

313 # that this option is technically if ENSUREPIP_OPTIONS is set and is 

314 # not altinstall 

315 # DEFAULT 

316 # - The default behavior is to install pip, pipX, pipX.Y, easy_install 

317 # and easy_install-X.Y. 

318 pip_script = console.pop("pip", None) 

319 if pip_script: 

320 if "ENSUREPIP_OPTIONS" not in os.environ: 

321 scripts_to_generate.append("pip = " + pip_script) 

322 

323 if os.environ.get("ENSUREPIP_OPTIONS", "") != "altinstall": 

324 scripts_to_generate.append( 

325 "pip{} = {}".format(sys.version_info[0], pip_script) 

326 ) 

327 

328 scripts_to_generate.append(f"pip{get_major_minor_version()} = {pip_script}") 

329 # Delete any other versioned pip entry points 

330 pip_ep = [k for k in console if re.match(r"pip(\d+(\.\d+)?)?$", k)] 

331 for k in pip_ep: 

332 del console[k] 

333 easy_install_script = console.pop("easy_install", None) 

334 if easy_install_script: 

335 if "ENSUREPIP_OPTIONS" not in os.environ: 

336 scripts_to_generate.append("easy_install = " + easy_install_script) 

337 

338 scripts_to_generate.append( 

339 "easy_install-{} = {}".format( 

340 get_major_minor_version(), easy_install_script 

341 ) 

342 ) 

343 # Delete any other versioned easy_install entry points 

344 easy_install_ep = [ 

345 k for k in console if re.match(r"easy_install(-\d+\.\d+)?$", k) 

346 ] 

347 for k in easy_install_ep: 

348 del console[k] 

349 

350 # Generate the console entry points specified in the wheel 

351 scripts_to_generate.extend(starmap("{} = {}".format, console.items())) 

352 

353 return scripts_to_generate 

354 

355 

356class ZipBackedFile: 

357 def __init__( 

358 self, src_record_path: RecordPath, dest_path: str, zip_file: ZipFile 

359 ) -> None: 

360 self.src_record_path = src_record_path 

361 self.dest_path = dest_path 

362 self._zip_file = zip_file 

363 self.changed = False 

364 

365 def _getinfo(self) -> ZipInfo: 

366 return self._zip_file.getinfo(self.src_record_path) 

367 

368 def save(self) -> None: 

369 # directory creation is lazy and after file filtering 

370 # to ensure we don't install empty dirs; empty dirs can't be 

371 # uninstalled. 

372 parent_dir = os.path.dirname(self.dest_path) 

373 ensure_dir(parent_dir) 

374 

375 # When we open the output file below, any existing file is truncated 

376 # before we start writing the new contents. This is fine in most 

377 # cases, but can cause a segfault if pip has loaded a shared 

378 # object (e.g. from pyopenssl through its vendored urllib3) 

379 # Since the shared object is mmap'd an attempt to call a 

380 # symbol in it will then cause a segfault. Unlinking the file 

381 # allows writing of new contents while allowing the process to 

382 # continue to use the old copy. 

383 if os.path.exists(self.dest_path): 

384 os.unlink(self.dest_path) 

385 

386 zipinfo = self._getinfo() 

387 

388 with self._zip_file.open(zipinfo) as f: 

389 with open(self.dest_path, "wb") as dest: 

390 shutil.copyfileobj(f, dest) 

391 

392 if zip_item_is_executable(zipinfo): 

393 set_extracted_file_to_default_mode_plus_executable(self.dest_path) 

394 

395 

396class ScriptFile: 

397 def __init__(self, file: "File") -> None: 

398 self._file = file 

399 self.src_record_path = self._file.src_record_path 

400 self.dest_path = self._file.dest_path 

401 self.changed = False 

402 

403 def save(self) -> None: 

404 self._file.save() 

405 self.changed = fix_script(self.dest_path) 

406 

407 

408class MissingCallableSuffix(InstallationError): 

409 def __init__(self, entry_point: str) -> None: 

410 super().__init__( 

411 "Invalid script entry point: {} - A callable " 

412 "suffix is required. Cf https://packaging.python.org/" 

413 "specifications/entry-points/#use-for-scripts for more " 

414 "information.".format(entry_point) 

415 ) 

416 

417 

418def _raise_for_invalid_entrypoint(specification: str) -> None: 

419 entry = get_export_entry(specification) 

420 if entry is not None and entry.suffix is None: 

421 raise MissingCallableSuffix(str(entry)) 

422 

423 

424class PipScriptMaker(ScriptMaker): 

425 def make( 

426 self, specification: str, options: Optional[Dict[str, Any]] = None 

427 ) -> List[str]: 

428 _raise_for_invalid_entrypoint(specification) 

429 return super().make(specification, options) 

430 

431 

432def _install_wheel( 

433 name: str, 

434 wheel_zip: ZipFile, 

435 wheel_path: str, 

436 scheme: Scheme, 

437 pycompile: bool = True, 

438 warn_script_location: bool = True, 

439 direct_url: Optional[DirectUrl] = None, 

440 requested: bool = False, 

441) -> None: 

442 """Install a wheel. 

443 

444 :param name: Name of the project to install 

445 :param wheel_zip: open ZipFile for wheel being installed 

446 :param scheme: Distutils scheme dictating the install directories 

447 :param req_description: String used in place of the requirement, for 

448 logging 

449 :param pycompile: Whether to byte-compile installed Python files 

450 :param warn_script_location: Whether to check that scripts are installed 

451 into a directory on PATH 

452 :raises UnsupportedWheel: 

453 * when the directory holds an unpacked wheel with incompatible 

454 Wheel-Version 

455 * when the .dist-info dir does not match the wheel 

456 """ 

457 info_dir, metadata = parse_wheel(wheel_zip, name) 

458 

459 if wheel_root_is_purelib(metadata): 

460 lib_dir = scheme.purelib 

461 else: 

462 lib_dir = scheme.platlib 

463 

464 # Record details of the files moved 

465 # installed = files copied from the wheel to the destination 

466 # changed = files changed while installing (scripts #! line typically) 

467 # generated = files newly generated during the install (script wrappers) 

468 installed: Dict[RecordPath, RecordPath] = {} 

469 changed: Set[RecordPath] = set() 

470 generated: List[str] = [] 

471 

472 def record_installed( 

473 srcfile: RecordPath, destfile: str, modified: bool = False 

474 ) -> None: 

475 """Map archive RECORD paths to installation RECORD paths.""" 

476 newpath = _fs_to_record_path(destfile, lib_dir) 

477 installed[srcfile] = newpath 

478 if modified: 

479 changed.add(newpath) 

480 

481 def is_dir_path(path: RecordPath) -> bool: 

482 return path.endswith("/") 

483 

484 def assert_no_path_traversal(dest_dir_path: str, target_path: str) -> None: 

485 if not is_within_directory(dest_dir_path, target_path): 

486 message = ( 

487 "The wheel {!r} has a file {!r} trying to install" 

488 " outside the target directory {!r}" 

489 ) 

490 raise InstallationError( 

491 message.format(wheel_path, target_path, dest_dir_path) 

492 ) 

493 

494 def root_scheme_file_maker( 

495 zip_file: ZipFile, dest: str 

496 ) -> Callable[[RecordPath], "File"]: 

497 def make_root_scheme_file(record_path: RecordPath) -> "File": 

498 normed_path = os.path.normpath(record_path) 

499 dest_path = os.path.join(dest, normed_path) 

500 assert_no_path_traversal(dest, dest_path) 

501 return ZipBackedFile(record_path, dest_path, zip_file) 

502 

503 return make_root_scheme_file 

504 

505 def data_scheme_file_maker( 

506 zip_file: ZipFile, scheme: Scheme 

507 ) -> Callable[[RecordPath], "File"]: 

508 scheme_paths = {key: getattr(scheme, key) for key in SCHEME_KEYS} 

509 

510 def make_data_scheme_file(record_path: RecordPath) -> "File": 

511 normed_path = os.path.normpath(record_path) 

512 try: 

513 _, scheme_key, dest_subpath = normed_path.split(os.path.sep, 2) 

514 except ValueError: 

515 message = ( 

516 "Unexpected file in {}: {!r}. .data directory contents" 

517 " should be named like: '<scheme key>/<path>'." 

518 ).format(wheel_path, record_path) 

519 raise InstallationError(message) 

520 

521 try: 

522 scheme_path = scheme_paths[scheme_key] 

523 except KeyError: 

524 valid_scheme_keys = ", ".join(sorted(scheme_paths)) 

525 message = ( 

526 "Unknown scheme key used in {}: {} (for file {!r}). .data" 

527 " directory contents should be in subdirectories named" 

528 " with a valid scheme key ({})" 

529 ).format(wheel_path, scheme_key, record_path, valid_scheme_keys) 

530 raise InstallationError(message) 

531 

532 dest_path = os.path.join(scheme_path, dest_subpath) 

533 assert_no_path_traversal(scheme_path, dest_path) 

534 return ZipBackedFile(record_path, dest_path, zip_file) 

535 

536 return make_data_scheme_file 

537 

538 def is_data_scheme_path(path: RecordPath) -> bool: 

539 return path.split("/", 1)[0].endswith(".data") 

540 

541 paths = cast(List[RecordPath], wheel_zip.namelist()) 

542 file_paths = filterfalse(is_dir_path, paths) 

543 root_scheme_paths, data_scheme_paths = partition(is_data_scheme_path, file_paths) 

544 

545 make_root_scheme_file = root_scheme_file_maker(wheel_zip, lib_dir) 

546 files: Iterator[File] = map(make_root_scheme_file, root_scheme_paths) 

547 

548 def is_script_scheme_path(path: RecordPath) -> bool: 

549 parts = path.split("/", 2) 

550 return len(parts) > 2 and parts[0].endswith(".data") and parts[1] == "scripts" 

551 

552 other_scheme_paths, script_scheme_paths = partition( 

553 is_script_scheme_path, data_scheme_paths 

554 ) 

555 

556 make_data_scheme_file = data_scheme_file_maker(wheel_zip, scheme) 

557 other_scheme_files = map(make_data_scheme_file, other_scheme_paths) 

558 files = chain(files, other_scheme_files) 

559 

560 # Get the defined entry points 

561 distribution = get_wheel_distribution( 

562 FilesystemWheel(wheel_path), 

563 canonicalize_name(name), 

564 ) 

565 console, gui = get_entrypoints(distribution) 

566 

567 def is_entrypoint_wrapper(file: "File") -> bool: 

568 # EP, EP.exe and EP-script.py are scripts generated for 

569 # entry point EP by setuptools 

570 path = file.dest_path 

571 name = os.path.basename(path) 

572 if name.lower().endswith(".exe"): 

573 matchname = name[:-4] 

574 elif name.lower().endswith("-script.py"): 

575 matchname = name[:-10] 

576 elif name.lower().endswith(".pya"): 

577 matchname = name[:-4] 

578 else: 

579 matchname = name 

580 # Ignore setuptools-generated scripts 

581 return matchname in console or matchname in gui 

582 

583 script_scheme_files: Iterator[File] = map( 

584 make_data_scheme_file, script_scheme_paths 

585 ) 

586 script_scheme_files = filterfalse(is_entrypoint_wrapper, script_scheme_files) 

587 script_scheme_files = map(ScriptFile, script_scheme_files) 

588 files = chain(files, script_scheme_files) 

589 

590 for file in files: 

591 file.save() 

592 record_installed(file.src_record_path, file.dest_path, file.changed) 

593 

594 def pyc_source_file_paths() -> Generator[str, None, None]: 

595 # We de-duplicate installation paths, since there can be overlap (e.g. 

596 # file in .data maps to same location as file in wheel root). 

597 # Sorting installation paths makes it easier to reproduce and debug 

598 # issues related to permissions on existing files. 

599 for installed_path in sorted(set(installed.values())): 

600 full_installed_path = os.path.join(lib_dir, installed_path) 

601 if not os.path.isfile(full_installed_path): 

602 continue 

603 if not full_installed_path.endswith(".py"): 

604 continue 

605 yield full_installed_path 

606 

607 def pyc_output_path(path: str) -> str: 

608 """Return the path the pyc file would have been written to.""" 

609 return importlib.util.cache_from_source(path) 

610 

611 # Compile all of the pyc files for the installed files 

612 if pycompile: 

613 with captured_stdout() as stdout: 

614 with warnings.catch_warnings(): 

615 warnings.filterwarnings("ignore") 

616 for path in pyc_source_file_paths(): 

617 success = compileall.compile_file(path, force=True, quiet=True) 

618 if success: 

619 pyc_path = pyc_output_path(path) 

620 assert os.path.exists(pyc_path) 

621 pyc_record_path = cast( 

622 "RecordPath", pyc_path.replace(os.path.sep, "/") 

623 ) 

624 record_installed(pyc_record_path, pyc_path) 

625 logger.debug(stdout.getvalue()) 

626 

627 maker = PipScriptMaker(None, scheme.scripts) 

628 

629 # Ensure old scripts are overwritten. 

630 # See https://github.com/pypa/pip/issues/1800 

631 maker.clobber = True 

632 

633 # Ensure we don't generate any variants for scripts because this is almost 

634 # never what somebody wants. 

635 # See https://bitbucket.org/pypa/distlib/issue/35/ 

636 maker.variants = {""} 

637 

638 # This is required because otherwise distlib creates scripts that are not 

639 # executable. 

640 # See https://bitbucket.org/pypa/distlib/issue/32/ 

641 maker.set_mode = True 

642 

643 # Generate the console and GUI entry points specified in the wheel 

644 scripts_to_generate = get_console_script_specs(console) 

645 

646 gui_scripts_to_generate = list(starmap("{} = {}".format, gui.items())) 

647 

648 generated_console_scripts = maker.make_multiple(scripts_to_generate) 

649 generated.extend(generated_console_scripts) 

650 

651 generated.extend(maker.make_multiple(gui_scripts_to_generate, {"gui": True})) 

652 

653 if warn_script_location: 

654 msg = message_about_scripts_not_on_PATH(generated_console_scripts) 

655 if msg is not None: 

656 logger.warning(msg) 

657 

658 generated_file_mode = 0o666 & ~current_umask() 

659 

660 @contextlib.contextmanager 

661 def _generate_file(path: str, **kwargs: Any) -> Generator[BinaryIO, None, None]: 

662 with adjacent_tmp_file(path, **kwargs) as f: 

663 yield f 

664 os.chmod(f.name, generated_file_mode) 

665 replace(f.name, path) 

666 

667 dest_info_dir = os.path.join(lib_dir, info_dir) 

668 

669 # Record pip as the installer 

670 installer_path = os.path.join(dest_info_dir, "INSTALLER") 

671 with _generate_file(installer_path) as installer_file: 

672 installer_file.write(b"pip\n") 

673 generated.append(installer_path) 

674 

675 # Record the PEP 610 direct URL reference 

676 if direct_url is not None: 

677 direct_url_path = os.path.join(dest_info_dir, DIRECT_URL_METADATA_NAME) 

678 with _generate_file(direct_url_path) as direct_url_file: 

679 direct_url_file.write(direct_url.to_json().encode("utf-8")) 

680 generated.append(direct_url_path) 

681 

682 # Record the REQUESTED file 

683 if requested: 

684 requested_path = os.path.join(dest_info_dir, "REQUESTED") 

685 with open(requested_path, "wb"): 

686 pass 

687 generated.append(requested_path) 

688 

689 record_text = distribution.read_text("RECORD") 

690 record_rows = list(csv.reader(record_text.splitlines())) 

691 

692 rows = get_csv_rows_for_installed( 

693 record_rows, 

694 installed=installed, 

695 changed=changed, 

696 generated=generated, 

697 lib_dir=lib_dir, 

698 ) 

699 

700 # Record details of all files installed 

701 record_path = os.path.join(dest_info_dir, "RECORD") 

702 

703 with _generate_file(record_path, **csv_io_kwargs("w")) as record_file: 

704 # Explicitly cast to typing.IO[str] as a workaround for the mypy error: 

705 # "writer" has incompatible type "BinaryIO"; expected "_Writer" 

706 writer = csv.writer(cast("IO[str]", record_file)) 

707 writer.writerows(_normalized_outrows(rows)) 

708 

709 

710@contextlib.contextmanager 

711def req_error_context(req_description: str) -> Generator[None, None, None]: 

712 try: 

713 yield 

714 except InstallationError as e: 

715 message = "For req: {}. {}".format(req_description, e.args[0]) 

716 raise InstallationError(message) from e 

717 

718 

719def install_wheel( 

720 name: str, 

721 wheel_path: str, 

722 scheme: Scheme, 

723 req_description: str, 

724 pycompile: bool = True, 

725 warn_script_location: bool = True, 

726 direct_url: Optional[DirectUrl] = None, 

727 requested: bool = False, 

728) -> None: 

729 with ZipFile(wheel_path, allowZip64=True) as z: 

730 with req_error_context(req_description): 

731 _install_wheel( 

732 name=name, 

733 wheel_zip=z, 

734 wheel_path=wheel_path, 

735 scheme=scheme, 

736 pycompile=pycompile, 

737 warn_script_location=warn_script_location, 

738 direct_url=direct_url, 

739 requested=requested, 

740 )