Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pip/_internal/operations/install/wheel.py: 17%

339 statements  

« prev     ^ index     » next       coverage.py v7.4.3, created at 2024-02-26 06:33 +0000

1"""Support for installing and building the "wheel" binary package format. 

2""" 

3 

4import collections 

5import compileall 

6import contextlib 

7import csv 

8import importlib 

9import logging 

10import os.path 

11import re 

12import shutil 

13import sys 

14import warnings 

15from base64 import urlsafe_b64encode 

16from email.message import Message 

17from itertools import chain, filterfalse, starmap 

18from typing import ( 

19 IO, 

20 TYPE_CHECKING, 

21 Any, 

22 BinaryIO, 

23 Callable, 

24 Dict, 

25 Generator, 

26 Iterable, 

27 Iterator, 

28 List, 

29 NewType, 

30 Optional, 

31 Protocol, 

32 Sequence, 

33 Set, 

34 Tuple, 

35 Union, 

36 cast, 

37) 

38from zipfile import ZipFile, ZipInfo 

39 

40from pip._vendor.distlib.scripts import ScriptMaker 

41from pip._vendor.distlib.util import get_export_entry 

42from pip._vendor.packaging.utils import canonicalize_name 

43 

44from pip._internal.exceptions import InstallationError 

45from pip._internal.locations import get_major_minor_version 

46from pip._internal.metadata import ( 

47 BaseDistribution, 

48 FilesystemWheel, 

49 get_wheel_distribution, 

50) 

51from pip._internal.models.direct_url import DIRECT_URL_METADATA_NAME, DirectUrl 

52from pip._internal.models.scheme import SCHEME_KEYS, Scheme 

53from pip._internal.utils.filesystem import adjacent_tmp_file, replace 

54from pip._internal.utils.misc import captured_stdout, ensure_dir, hash_file, partition 

55from pip._internal.utils.unpacking import ( 

56 current_umask, 

57 is_within_directory, 

58 set_extracted_file_to_default_mode_plus_executable, 

59 zip_item_is_executable, 

60) 

61from pip._internal.utils.wheel import parse_wheel 

62 

63if TYPE_CHECKING: 

64 

65 class File(Protocol): 

66 src_record_path: "RecordPath" 

67 dest_path: str 

68 changed: bool 

69 

70 def save(self) -> None: 

71 pass 

72 

73 

74logger = logging.getLogger(__name__) 

75 

76RecordPath = NewType("RecordPath", str) 

77InstalledCSVRow = Tuple[RecordPath, str, Union[int, str]] 

78 

79 

80def rehash(path: str, blocksize: int = 1 << 20) -> Tuple[str, str]: 

81 """Return (encoded_digest, length) for path using hashlib.sha256()""" 

82 h, length = hash_file(path, blocksize) 

83 digest = "sha256=" + urlsafe_b64encode(h.digest()).decode("latin1").rstrip("=") 

84 return (digest, str(length)) 

85 

86 

87def csv_io_kwargs(mode: str) -> Dict[str, Any]: 

88 """Return keyword arguments to properly open a CSV file 

89 in the given mode. 

90 """ 

91 return {"mode": mode, "newline": "", "encoding": "utf-8"} 

92 

93 

94def fix_script(path: str) -> bool: 

95 """Replace #!python with #!/path/to/python 

96 Return True if file was changed. 

97 """ 

98 # XXX RECORD hashes will need to be updated 

99 assert os.path.isfile(path) 

100 

101 with open(path, "rb") as script: 

102 firstline = script.readline() 

103 if not firstline.startswith(b"#!python"): 

104 return False 

105 exename = sys.executable.encode(sys.getfilesystemencoding()) 

106 firstline = b"#!" + exename + os.linesep.encode("ascii") 

107 rest = script.read() 

108 with open(path, "wb") as script: 

109 script.write(firstline) 

110 script.write(rest) 

111 return True 

112 

113 

114def wheel_root_is_purelib(metadata: Message) -> bool: 

115 return metadata.get("Root-Is-Purelib", "").lower() == "true" 

116 

117 

118def get_entrypoints(dist: BaseDistribution) -> Tuple[Dict[str, str], Dict[str, str]]: 

119 console_scripts = {} 

120 gui_scripts = {} 

121 for entry_point in dist.iter_entry_points(): 

122 if entry_point.group == "console_scripts": 

123 console_scripts[entry_point.name] = entry_point.value 

124 elif entry_point.group == "gui_scripts": 

125 gui_scripts[entry_point.name] = entry_point.value 

126 return console_scripts, gui_scripts 

127 

128 

129def message_about_scripts_not_on_PATH(scripts: Sequence[str]) -> Optional[str]: 

130 """Determine if any scripts are not on PATH and format a warning. 

131 Returns a warning message if one or more scripts are not on PATH, 

132 otherwise None. 

133 """ 

134 if not scripts: 

135 return None 

136 

137 # Group scripts by the path they were installed in 

138 grouped_by_dir: Dict[str, Set[str]] = collections.defaultdict(set) 

139 for destfile in scripts: 

140 parent_dir = os.path.dirname(destfile) 

141 script_name = os.path.basename(destfile) 

142 grouped_by_dir[parent_dir].add(script_name) 

143 

144 # We don't want to warn for directories that are on PATH. 

145 not_warn_dirs = [ 

146 os.path.normcase(os.path.normpath(i)).rstrip(os.sep) 

147 for i in os.environ.get("PATH", "").split(os.pathsep) 

148 ] 

149 # If an executable sits with sys.executable, we don't warn for it. 

150 # This covers the case of venv invocations without activating the venv. 

151 not_warn_dirs.append( 

152 os.path.normcase(os.path.normpath(os.path.dirname(sys.executable))) 

153 ) 

154 warn_for: Dict[str, Set[str]] = { 

155 parent_dir: scripts 

156 for parent_dir, scripts in grouped_by_dir.items() 

157 if os.path.normcase(os.path.normpath(parent_dir)) not in not_warn_dirs 

158 } 

159 if not warn_for: 

160 return None 

161 

162 # Format a message 

163 msg_lines = [] 

164 for parent_dir, dir_scripts in warn_for.items(): 

165 sorted_scripts: List[str] = sorted(dir_scripts) 

166 if len(sorted_scripts) == 1: 

167 start_text = f"script {sorted_scripts[0]} is" 

168 else: 

169 start_text = "scripts {} are".format( 

170 ", ".join(sorted_scripts[:-1]) + " and " + sorted_scripts[-1] 

171 ) 

172 

173 msg_lines.append( 

174 f"The {start_text} installed in '{parent_dir}' which is not on PATH." 

175 ) 

176 

177 last_line_fmt = ( 

178 "Consider adding {} to PATH or, if you prefer " 

179 "to suppress this warning, use --no-warn-script-location." 

180 ) 

181 if len(msg_lines) == 1: 

182 msg_lines.append(last_line_fmt.format("this directory")) 

183 else: 

184 msg_lines.append(last_line_fmt.format("these directories")) 

185 

186 # Add a note if any directory starts with ~ 

187 warn_for_tilde = any( 

188 i[0] == "~" for i in os.environ.get("PATH", "").split(os.pathsep) if i 

189 ) 

190 if warn_for_tilde: 

191 tilde_warning_msg = ( 

192 "NOTE: The current PATH contains path(s) starting with `~`, " 

193 "which may not be expanded by all applications." 

194 ) 

195 msg_lines.append(tilde_warning_msg) 

196 

197 # Returns the formatted multiline message 

198 return "\n".join(msg_lines) 

199 

200 

201def _normalized_outrows( 

202 outrows: Iterable[InstalledCSVRow], 

203) -> List[Tuple[str, str, str]]: 

204 """Normalize the given rows of a RECORD file. 

205 

206 Items in each row are converted into str. Rows are then sorted to make 

207 the value more predictable for tests. 

208 

209 Each row is a 3-tuple (path, hash, size) and corresponds to a record of 

210 a RECORD file (see PEP 376 and PEP 427 for details). For the rows 

211 passed to this function, the size can be an integer as an int or string, 

212 or the empty string. 

213 """ 

214 # Normally, there should only be one row per path, in which case the 

215 # second and third elements don't come into play when sorting. 

216 # However, in cases in the wild where a path might happen to occur twice, 

217 # we don't want the sort operation to trigger an error (but still want 

218 # determinism). Since the third element can be an int or string, we 

219 # coerce each element to a string to avoid a TypeError in this case. 

220 # For additional background, see-- 

221 # https://github.com/pypa/pip/issues/5868 

222 return sorted( 

223 (record_path, hash_, str(size)) for record_path, hash_, size in outrows 

224 ) 

225 

226 

227def _record_to_fs_path(record_path: RecordPath, lib_dir: str) -> str: 

228 return os.path.join(lib_dir, record_path) 

229 

230 

231def _fs_to_record_path(path: str, lib_dir: str) -> RecordPath: 

232 # On Windows, do not handle relative paths if they belong to different 

233 # logical disks 

234 if os.path.splitdrive(path)[0].lower() == os.path.splitdrive(lib_dir)[0].lower(): 

235 path = os.path.relpath(path, lib_dir) 

236 

237 path = path.replace(os.path.sep, "/") 

238 return cast("RecordPath", path) 

239 

240 

241def get_csv_rows_for_installed( 

242 old_csv_rows: List[List[str]], 

243 installed: Dict[RecordPath, RecordPath], 

244 changed: Set[RecordPath], 

245 generated: List[str], 

246 lib_dir: str, 

247) -> List[InstalledCSVRow]: 

248 """ 

249 :param installed: A map from archive RECORD path to installation RECORD 

250 path. 

251 """ 

252 installed_rows: List[InstalledCSVRow] = [] 

253 for row in old_csv_rows: 

254 if len(row) > 3: 

255 logger.warning("RECORD line has more than three elements: %s", row) 

256 old_record_path = cast("RecordPath", row[0]) 

257 new_record_path = installed.pop(old_record_path, old_record_path) 

258 if new_record_path in changed: 

259 digest, length = rehash(_record_to_fs_path(new_record_path, lib_dir)) 

260 else: 

261 digest = row[1] if len(row) > 1 else "" 

262 length = row[2] if len(row) > 2 else "" 

263 installed_rows.append((new_record_path, digest, length)) 

264 for f in generated: 

265 path = _fs_to_record_path(f, lib_dir) 

266 digest, length = rehash(f) 

267 installed_rows.append((path, digest, length)) 

268 return installed_rows + [ 

269 (installed_record_path, "", "") for installed_record_path in installed.values() 

270 ] 

271 

272 

273def get_console_script_specs(console: Dict[str, str]) -> List[str]: 

274 """ 

275 Given the mapping from entrypoint name to callable, return the relevant 

276 console script specs. 

277 """ 

278 # Don't mutate caller's version 

279 console = console.copy() 

280 

281 scripts_to_generate = [] 

282 

283 # Special case pip and setuptools to generate versioned wrappers 

284 # 

285 # The issue is that some projects (specifically, pip and setuptools) use 

286 # code in setup.py to create "versioned" entry points - pip2.7 on Python 

287 # 2.7, pip3.3 on Python 3.3, etc. But these entry points are baked into 

288 # the wheel metadata at build time, and so if the wheel is installed with 

289 # a *different* version of Python the entry points will be wrong. The 

290 # correct fix for this is to enhance the metadata to be able to describe 

291 # such versioned entry points. 

292 # Currently, projects using versioned entry points will either have 

293 # incorrect versioned entry points, or they will not be able to distribute 

294 # "universal" wheels (i.e., they will need a wheel per Python version). 

295 # 

296 # Because setuptools and pip are bundled with _ensurepip and virtualenv, 

297 # we need to use universal wheels. As a workaround, we 

298 # override the versioned entry points in the wheel and generate the 

299 # correct ones. 

300 # 

301 # To add the level of hack in this section of code, in order to support 

302 # ensurepip this code will look for an ``ENSUREPIP_OPTIONS`` environment 

303 # variable which will control which version scripts get installed. 

304 # 

305 # ENSUREPIP_OPTIONS=altinstall 

306 # - Only pipX.Y and easy_install-X.Y will be generated and installed 

307 # ENSUREPIP_OPTIONS=install 

308 # - pipX.Y, pipX, easy_install-X.Y will be generated and installed. Note 

309 # that this option is technically if ENSUREPIP_OPTIONS is set and is 

310 # not altinstall 

311 # DEFAULT 

312 # - The default behavior is to install pip, pipX, pipX.Y, easy_install 

313 # and easy_install-X.Y. 

314 pip_script = console.pop("pip", None) 

315 if pip_script: 

316 if "ENSUREPIP_OPTIONS" not in os.environ: 

317 scripts_to_generate.append("pip = " + pip_script) 

318 

319 if os.environ.get("ENSUREPIP_OPTIONS", "") != "altinstall": 

320 scripts_to_generate.append(f"pip{sys.version_info[0]} = {pip_script}") 

321 

322 scripts_to_generate.append(f"pip{get_major_minor_version()} = {pip_script}") 

323 # Delete any other versioned pip entry points 

324 pip_ep = [k for k in console if re.match(r"pip(\d+(\.\d+)?)?$", k)] 

325 for k in pip_ep: 

326 del console[k] 

327 easy_install_script = console.pop("easy_install", None) 

328 if easy_install_script: 

329 if "ENSUREPIP_OPTIONS" not in os.environ: 

330 scripts_to_generate.append("easy_install = " + easy_install_script) 

331 

332 scripts_to_generate.append( 

333 f"easy_install-{get_major_minor_version()} = {easy_install_script}" 

334 ) 

335 # Delete any other versioned easy_install entry points 

336 easy_install_ep = [ 

337 k for k in console if re.match(r"easy_install(-\d+\.\d+)?$", k) 

338 ] 

339 for k in easy_install_ep: 

340 del console[k] 

341 

342 # Generate the console entry points specified in the wheel 

343 scripts_to_generate.extend(starmap("{} = {}".format, console.items())) 

344 

345 return scripts_to_generate 

346 

347 

348class ZipBackedFile: 

349 def __init__( 

350 self, src_record_path: RecordPath, dest_path: str, zip_file: ZipFile 

351 ) -> None: 

352 self.src_record_path = src_record_path 

353 self.dest_path = dest_path 

354 self._zip_file = zip_file 

355 self.changed = False 

356 

357 def _getinfo(self) -> ZipInfo: 

358 return self._zip_file.getinfo(self.src_record_path) 

359 

360 def save(self) -> None: 

361 # directory creation is lazy and after file filtering 

362 # to ensure we don't install empty dirs; empty dirs can't be 

363 # uninstalled. 

364 parent_dir = os.path.dirname(self.dest_path) 

365 ensure_dir(parent_dir) 

366 

367 # When we open the output file below, any existing file is truncated 

368 # before we start writing the new contents. This is fine in most 

369 # cases, but can cause a segfault if pip has loaded a shared 

370 # object (e.g. from pyopenssl through its vendored urllib3) 

371 # Since the shared object is mmap'd an attempt to call a 

372 # symbol in it will then cause a segfault. Unlinking the file 

373 # allows writing of new contents while allowing the process to 

374 # continue to use the old copy. 

375 if os.path.exists(self.dest_path): 

376 os.unlink(self.dest_path) 

377 

378 zipinfo = self._getinfo() 

379 

380 with self._zip_file.open(zipinfo) as f: 

381 with open(self.dest_path, "wb") as dest: 

382 shutil.copyfileobj(f, dest) 

383 

384 if zip_item_is_executable(zipinfo): 

385 set_extracted_file_to_default_mode_plus_executable(self.dest_path) 

386 

387 

388class ScriptFile: 

389 def __init__(self, file: "File") -> None: 

390 self._file = file 

391 self.src_record_path = self._file.src_record_path 

392 self.dest_path = self._file.dest_path 

393 self.changed = False 

394 

395 def save(self) -> None: 

396 self._file.save() 

397 self.changed = fix_script(self.dest_path) 

398 

399 

400class MissingCallableSuffix(InstallationError): 

401 def __init__(self, entry_point: str) -> None: 

402 super().__init__( 

403 f"Invalid script entry point: {entry_point} - A callable " 

404 "suffix is required. Cf https://packaging.python.org/" 

405 "specifications/entry-points/#use-for-scripts for more " 

406 "information." 

407 ) 

408 

409 

410def _raise_for_invalid_entrypoint(specification: str) -> None: 

411 entry = get_export_entry(specification) 

412 if entry is not None and entry.suffix is None: 

413 raise MissingCallableSuffix(str(entry)) 

414 

415 

416class PipScriptMaker(ScriptMaker): 

417 def make( 

418 self, specification: str, options: Optional[Dict[str, Any]] = None 

419 ) -> List[str]: 

420 _raise_for_invalid_entrypoint(specification) 

421 return super().make(specification, options) 

422 

423 

424def _install_wheel( 

425 name: str, 

426 wheel_zip: ZipFile, 

427 wheel_path: str, 

428 scheme: Scheme, 

429 pycompile: bool = True, 

430 warn_script_location: bool = True, 

431 direct_url: Optional[DirectUrl] = None, 

432 requested: bool = False, 

433) -> None: 

434 """Install a wheel. 

435 

436 :param name: Name of the project to install 

437 :param wheel_zip: open ZipFile for wheel being installed 

438 :param scheme: Distutils scheme dictating the install directories 

439 :param req_description: String used in place of the requirement, for 

440 logging 

441 :param pycompile: Whether to byte-compile installed Python files 

442 :param warn_script_location: Whether to check that scripts are installed 

443 into a directory on PATH 

444 :raises UnsupportedWheel: 

445 * when the directory holds an unpacked wheel with incompatible 

446 Wheel-Version 

447 * when the .dist-info dir does not match the wheel 

448 """ 

449 info_dir, metadata = parse_wheel(wheel_zip, name) 

450 

451 if wheel_root_is_purelib(metadata): 

452 lib_dir = scheme.purelib 

453 else: 

454 lib_dir = scheme.platlib 

455 

456 # Record details of the files moved 

457 # installed = files copied from the wheel to the destination 

458 # changed = files changed while installing (scripts #! line typically) 

459 # generated = files newly generated during the install (script wrappers) 

460 installed: Dict[RecordPath, RecordPath] = {} 

461 changed: Set[RecordPath] = set() 

462 generated: List[str] = [] 

463 

464 def record_installed( 

465 srcfile: RecordPath, destfile: str, modified: bool = False 

466 ) -> None: 

467 """Map archive RECORD paths to installation RECORD paths.""" 

468 newpath = _fs_to_record_path(destfile, lib_dir) 

469 installed[srcfile] = newpath 

470 if modified: 

471 changed.add(newpath) 

472 

473 def is_dir_path(path: RecordPath) -> bool: 

474 return path.endswith("/") 

475 

476 def assert_no_path_traversal(dest_dir_path: str, target_path: str) -> None: 

477 if not is_within_directory(dest_dir_path, target_path): 

478 message = ( 

479 "The wheel {!r} has a file {!r} trying to install" 

480 " outside the target directory {!r}" 

481 ) 

482 raise InstallationError( 

483 message.format(wheel_path, target_path, dest_dir_path) 

484 ) 

485 

486 def root_scheme_file_maker( 

487 zip_file: ZipFile, dest: str 

488 ) -> Callable[[RecordPath], "File"]: 

489 def make_root_scheme_file(record_path: RecordPath) -> "File": 

490 normed_path = os.path.normpath(record_path) 

491 dest_path = os.path.join(dest, normed_path) 

492 assert_no_path_traversal(dest, dest_path) 

493 return ZipBackedFile(record_path, dest_path, zip_file) 

494 

495 return make_root_scheme_file 

496 

497 def data_scheme_file_maker( 

498 zip_file: ZipFile, scheme: Scheme 

499 ) -> Callable[[RecordPath], "File"]: 

500 scheme_paths = {key: getattr(scheme, key) for key in SCHEME_KEYS} 

501 

502 def make_data_scheme_file(record_path: RecordPath) -> "File": 

503 normed_path = os.path.normpath(record_path) 

504 try: 

505 _, scheme_key, dest_subpath = normed_path.split(os.path.sep, 2) 

506 except ValueError: 

507 message = ( 

508 "Unexpected file in {}: {!r}. .data directory contents" 

509 " should be named like: '<scheme key>/<path>'." 

510 ).format(wheel_path, record_path) 

511 raise InstallationError(message) 

512 

513 try: 

514 scheme_path = scheme_paths[scheme_key] 

515 except KeyError: 

516 valid_scheme_keys = ", ".join(sorted(scheme_paths)) 

517 message = ( 

518 "Unknown scheme key used in {}: {} (for file {!r}). .data" 

519 " directory contents should be in subdirectories named" 

520 " with a valid scheme key ({})" 

521 ).format(wheel_path, scheme_key, record_path, valid_scheme_keys) 

522 raise InstallationError(message) 

523 

524 dest_path = os.path.join(scheme_path, dest_subpath) 

525 assert_no_path_traversal(scheme_path, dest_path) 

526 return ZipBackedFile(record_path, dest_path, zip_file) 

527 

528 return make_data_scheme_file 

529 

530 def is_data_scheme_path(path: RecordPath) -> bool: 

531 return path.split("/", 1)[0].endswith(".data") 

532 

533 paths = cast(List[RecordPath], wheel_zip.namelist()) 

534 file_paths = filterfalse(is_dir_path, paths) 

535 root_scheme_paths, data_scheme_paths = partition(is_data_scheme_path, file_paths) 

536 

537 make_root_scheme_file = root_scheme_file_maker(wheel_zip, lib_dir) 

538 files: Iterator[File] = map(make_root_scheme_file, root_scheme_paths) 

539 

540 def is_script_scheme_path(path: RecordPath) -> bool: 

541 parts = path.split("/", 2) 

542 return len(parts) > 2 and parts[0].endswith(".data") and parts[1] == "scripts" 

543 

544 other_scheme_paths, script_scheme_paths = partition( 

545 is_script_scheme_path, data_scheme_paths 

546 ) 

547 

548 make_data_scheme_file = data_scheme_file_maker(wheel_zip, scheme) 

549 other_scheme_files = map(make_data_scheme_file, other_scheme_paths) 

550 files = chain(files, other_scheme_files) 

551 

552 # Get the defined entry points 

553 distribution = get_wheel_distribution( 

554 FilesystemWheel(wheel_path), 

555 canonicalize_name(name), 

556 ) 

557 console, gui = get_entrypoints(distribution) 

558 

559 def is_entrypoint_wrapper(file: "File") -> bool: 

560 # EP, EP.exe and EP-script.py are scripts generated for 

561 # entry point EP by setuptools 

562 path = file.dest_path 

563 name = os.path.basename(path) 

564 if name.lower().endswith(".exe"): 

565 matchname = name[:-4] 

566 elif name.lower().endswith("-script.py"): 

567 matchname = name[:-10] 

568 elif name.lower().endswith(".pya"): 

569 matchname = name[:-4] 

570 else: 

571 matchname = name 

572 # Ignore setuptools-generated scripts 

573 return matchname in console or matchname in gui 

574 

575 script_scheme_files: Iterator[File] = map( 

576 make_data_scheme_file, script_scheme_paths 

577 ) 

578 script_scheme_files = filterfalse(is_entrypoint_wrapper, script_scheme_files) 

579 script_scheme_files = map(ScriptFile, script_scheme_files) 

580 files = chain(files, script_scheme_files) 

581 

582 for file in files: 

583 file.save() 

584 record_installed(file.src_record_path, file.dest_path, file.changed) 

585 

586 def pyc_source_file_paths() -> Generator[str, None, None]: 

587 # We de-duplicate installation paths, since there can be overlap (e.g. 

588 # file in .data maps to same location as file in wheel root). 

589 # Sorting installation paths makes it easier to reproduce and debug 

590 # issues related to permissions on existing files. 

591 for installed_path in sorted(set(installed.values())): 

592 full_installed_path = os.path.join(lib_dir, installed_path) 

593 if not os.path.isfile(full_installed_path): 

594 continue 

595 if not full_installed_path.endswith(".py"): 

596 continue 

597 yield full_installed_path 

598 

599 def pyc_output_path(path: str) -> str: 

600 """Return the path the pyc file would have been written to.""" 

601 return importlib.util.cache_from_source(path) 

602 

603 # Compile all of the pyc files for the installed files 

604 if pycompile: 

605 with captured_stdout() as stdout: 

606 with warnings.catch_warnings(): 

607 warnings.filterwarnings("ignore") 

608 for path in pyc_source_file_paths(): 

609 success = compileall.compile_file(path, force=True, quiet=True) 

610 if success: 

611 pyc_path = pyc_output_path(path) 

612 assert os.path.exists(pyc_path) 

613 pyc_record_path = cast( 

614 "RecordPath", pyc_path.replace(os.path.sep, "/") 

615 ) 

616 record_installed(pyc_record_path, pyc_path) 

617 logger.debug(stdout.getvalue()) 

618 

619 maker = PipScriptMaker(None, scheme.scripts) 

620 

621 # Ensure old scripts are overwritten. 

622 # See https://github.com/pypa/pip/issues/1800 

623 maker.clobber = True 

624 

625 # Ensure we don't generate any variants for scripts because this is almost 

626 # never what somebody wants. 

627 # See https://bitbucket.org/pypa/distlib/issue/35/ 

628 maker.variants = {""} 

629 

630 # This is required because otherwise distlib creates scripts that are not 

631 # executable. 

632 # See https://bitbucket.org/pypa/distlib/issue/32/ 

633 maker.set_mode = True 

634 

635 # Generate the console and GUI entry points specified in the wheel 

636 scripts_to_generate = get_console_script_specs(console) 

637 

638 gui_scripts_to_generate = list(starmap("{} = {}".format, gui.items())) 

639 

640 generated_console_scripts = maker.make_multiple(scripts_to_generate) 

641 generated.extend(generated_console_scripts) 

642 

643 generated.extend(maker.make_multiple(gui_scripts_to_generate, {"gui": True})) 

644 

645 if warn_script_location: 

646 msg = message_about_scripts_not_on_PATH(generated_console_scripts) 

647 if msg is not None: 

648 logger.warning(msg) 

649 

650 generated_file_mode = 0o666 & ~current_umask() 

651 

652 @contextlib.contextmanager 

653 def _generate_file(path: str, **kwargs: Any) -> Generator[BinaryIO, None, None]: 

654 with adjacent_tmp_file(path, **kwargs) as f: 

655 yield f 

656 os.chmod(f.name, generated_file_mode) 

657 replace(f.name, path) 

658 

659 dest_info_dir = os.path.join(lib_dir, info_dir) 

660 

661 # Record pip as the installer 

662 installer_path = os.path.join(dest_info_dir, "INSTALLER") 

663 with _generate_file(installer_path) as installer_file: 

664 installer_file.write(b"pip\n") 

665 generated.append(installer_path) 

666 

667 # Record the PEP 610 direct URL reference 

668 if direct_url is not None: 

669 direct_url_path = os.path.join(dest_info_dir, DIRECT_URL_METADATA_NAME) 

670 with _generate_file(direct_url_path) as direct_url_file: 

671 direct_url_file.write(direct_url.to_json().encode("utf-8")) 

672 generated.append(direct_url_path) 

673 

674 # Record the REQUESTED file 

675 if requested: 

676 requested_path = os.path.join(dest_info_dir, "REQUESTED") 

677 with open(requested_path, "wb"): 

678 pass 

679 generated.append(requested_path) 

680 

681 record_text = distribution.read_text("RECORD") 

682 record_rows = list(csv.reader(record_text.splitlines())) 

683 

684 rows = get_csv_rows_for_installed( 

685 record_rows, 

686 installed=installed, 

687 changed=changed, 

688 generated=generated, 

689 lib_dir=lib_dir, 

690 ) 

691 

692 # Record details of all files installed 

693 record_path = os.path.join(dest_info_dir, "RECORD") 

694 

695 with _generate_file(record_path, **csv_io_kwargs("w")) as record_file: 

696 # Explicitly cast to typing.IO[str] as a workaround for the mypy error: 

697 # "writer" has incompatible type "BinaryIO"; expected "_Writer" 

698 writer = csv.writer(cast("IO[str]", record_file)) 

699 writer.writerows(_normalized_outrows(rows)) 

700 

701 

702@contextlib.contextmanager 

703def req_error_context(req_description: str) -> Generator[None, None, None]: 

704 try: 

705 yield 

706 except InstallationError as e: 

707 message = f"For req: {req_description}. {e.args[0]}" 

708 raise InstallationError(message) from e 

709 

710 

711def install_wheel( 

712 name: str, 

713 wheel_path: str, 

714 scheme: Scheme, 

715 req_description: str, 

716 pycompile: bool = True, 

717 warn_script_location: bool = True, 

718 direct_url: Optional[DirectUrl] = None, 

719 requested: bool = False, 

720) -> None: 

721 with ZipFile(wheel_path, allowZip64=True) as z: 

722 with req_error_context(req_description): 

723 _install_wheel( 

724 name=name, 

725 wheel_zip=z, 

726 wheel_path=wheel_path, 

727 scheme=scheme, 

728 pycompile=pycompile, 

729 warn_script_location=warn_script_location, 

730 direct_url=direct_url, 

731 requested=requested, 

732 )