Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pip/_vendor/pkg_resources/__init__.py: 1%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1635 statements  

1# TODO: Add Generic type annotations to initialized collections. 

2# For now we'd simply use implicit Any/Unknown which would add redundant annotations 

3# mypy: disable-error-code="var-annotated" 

4""" 

5Package resource API 

6-------------------- 

7 

8A resource is a logical file contained within a package, or a logical 

9subdirectory thereof. The package resource API expects resource names 

10to have their path parts separated with ``/``, *not* whatever the local 

11path separator is. Do not use os.path operations to manipulate resource 

12names being passed into the API. 

13 

14The package resource API is designed to work with normal filesystem packages, 

15.egg files, and unpacked .egg files. It can also work in a limited way with 

16.zip files and with custom PEP 302 loaders that support the ``get_data()`` 

17method. 

18 

19This module is deprecated. Users are directed to :mod:`importlib.resources`, 

20:mod:`importlib.metadata` and :pypi:`packaging` instead. 

21""" 

22 

23from __future__ import annotations 

24 

25import sys 

26 

27if sys.version_info < (3, 8): # noqa: UP036 # Check for unsupported versions 

28 raise RuntimeError("Python 3.8 or later is required") 

29 

30import os 

31import io 

32import time 

33import re 

34import types 

35from typing import ( 

36 Any, 

37 Literal, 

38 Dict, 

39 Iterator, 

40 Mapping, 

41 MutableSequence, 

42 NamedTuple, 

43 NoReturn, 

44 Tuple, 

45 Union, 

46 TYPE_CHECKING, 

47 Protocol, 

48 Callable, 

49 Iterable, 

50 TypeVar, 

51 overload, 

52) 

53import zipfile 

54import zipimport 

55import warnings 

56import stat 

57import functools 

58import pkgutil 

59import operator 

60import platform 

61import collections 

62import plistlib 

63import email.parser 

64import errno 

65import tempfile 

66import textwrap 

67import inspect 

68import ntpath 

69import posixpath 

70import importlib 

71import importlib.abc 

72import importlib.machinery 

73from pkgutil import get_importer 

74 

75import _imp 

76 

77# capture these to bypass sandboxing 

78from os import utime 

79from os import open as os_open 

80from os.path import isdir, split 

81 

82try: 

83 from os import mkdir, rename, unlink 

84 

85 WRITE_SUPPORT = True 

86except ImportError: 

87 # no write support, probably under GAE 

88 WRITE_SUPPORT = False 

89 

90from pip._internal.utils._jaraco_text import ( 

91 yield_lines, 

92 drop_comment, 

93 join_continuation, 

94) 

95from pip._vendor.packaging import markers as _packaging_markers 

96from pip._vendor.packaging import requirements as _packaging_requirements 

97from pip._vendor.packaging import utils as _packaging_utils 

98from pip._vendor.packaging import version as _packaging_version 

99from pip._vendor.platformdirs import user_cache_dir as _user_cache_dir 

100 

101if TYPE_CHECKING: 

102 from _typeshed import BytesPath, StrPath, StrOrBytesPath 

103 from typing_extensions import Self 

104 

105 

106# Patch: Remove deprecation warning from vendored pkg_resources. 

107# Setting PYTHONWARNINGS=error to verify builds produce no warnings 

108# causes immediate exceptions. 

109# See https://github.com/pypa/pip/issues/12243 

110 

111 

112_T = TypeVar("_T") 

113_DistributionT = TypeVar("_DistributionT", bound="Distribution") 

114# Type aliases 

115_NestedStr = Union[str, Iterable[Union[str, Iterable["_NestedStr"]]]] 

116_InstallerTypeT = Callable[["Requirement"], "_DistributionT"] 

117_InstallerType = Callable[["Requirement"], Union["Distribution", None]] 

118_PkgReqType = Union[str, "Requirement"] 

119_EPDistType = Union["Distribution", _PkgReqType] 

120_MetadataType = Union["IResourceProvider", None] 

121_ResolvedEntryPoint = Any # Can be any attribute in the module 

122_ResourceStream = Any # TODO / Incomplete: A readable file-like object 

123# Any object works, but let's indicate we expect something like a module (optionally has __loader__ or __file__) 

124_ModuleLike = Union[object, types.ModuleType] 

125# Any: Should be _ModuleLike but we end up with issues where _ModuleLike doesn't have _ZipLoaderModule's __loader__ 

126_ProviderFactoryType = Callable[[Any], "IResourceProvider"] 

127_DistFinderType = Callable[[_T, str, bool], Iterable["Distribution"]] 

128_NSHandlerType = Callable[[_T, str, str, types.ModuleType], Union[str, None]] 

129_AdapterT = TypeVar( 

130 "_AdapterT", _DistFinderType[Any], _ProviderFactoryType, _NSHandlerType[Any] 

131) 

132 

133 

134# Use _typeshed.importlib.LoaderProtocol once available https://github.com/python/typeshed/pull/11890 

135class _LoaderProtocol(Protocol): 

136 def load_module(self, fullname: str, /) -> types.ModuleType: ... 

137 

138 

139class _ZipLoaderModule(Protocol): 

140 __loader__: zipimport.zipimporter 

141 

142 

143_PEP440_FALLBACK = re.compile(r"^v?(?P<safe>(?:[0-9]+!)?[0-9]+(?:\.[0-9]+)*)", re.I) 

144 

145 

146class PEP440Warning(RuntimeWarning): 

147 """ 

148 Used when there is an issue with a version or specifier not complying with 

149 PEP 440. 

150 """ 

151 

152 

153parse_version = _packaging_version.Version 

154 

155 

156_state_vars: dict[str, str] = {} 

157 

158 

159def _declare_state(vartype: str, varname: str, initial_value: _T) -> _T: 

160 _state_vars[varname] = vartype 

161 return initial_value 

162 

163 

164def __getstate__() -> dict[str, Any]: 

165 state = {} 

166 g = globals() 

167 for k, v in _state_vars.items(): 

168 state[k] = g['_sget_' + v](g[k]) 

169 return state 

170 

171 

172def __setstate__(state: dict[str, Any]) -> dict[str, Any]: 

173 g = globals() 

174 for k, v in state.items(): 

175 g['_sset_' + _state_vars[k]](k, g[k], v) 

176 return state 

177 

178 

179def _sget_dict(val): 

180 return val.copy() 

181 

182 

183def _sset_dict(key, ob, state): 

184 ob.clear() 

185 ob.update(state) 

186 

187 

188def _sget_object(val): 

189 return val.__getstate__() 

190 

191 

192def _sset_object(key, ob, state): 

193 ob.__setstate__(state) 

194 

195 

196_sget_none = _sset_none = lambda *args: None 

197 

198 

199def get_supported_platform(): 

200 """Return this platform's maximum compatible version. 

201 

202 distutils.util.get_platform() normally reports the minimum version 

203 of macOS that would be required to *use* extensions produced by 

204 distutils. But what we want when checking compatibility is to know the 

205 version of macOS that we are *running*. To allow usage of packages that 

206 explicitly require a newer version of macOS, we must also know the 

207 current version of the OS. 

208 

209 If this condition occurs for any other platform with a version in its 

210 platform strings, this function should be extended accordingly. 

211 """ 

212 plat = get_build_platform() 

213 m = macosVersionString.match(plat) 

214 if m is not None and sys.platform == "darwin": 

215 try: 

216 plat = 'macosx-%s-%s' % ('.'.join(_macos_vers()[:2]), m.group(3)) 

217 except ValueError: 

218 # not macOS 

219 pass 

220 return plat 

221 

222 

223__all__ = [ 

224 # Basic resource access and distribution/entry point discovery 

225 'require', 

226 'run_script', 

227 'get_provider', 

228 'get_distribution', 

229 'load_entry_point', 

230 'get_entry_map', 

231 'get_entry_info', 

232 'iter_entry_points', 

233 'resource_string', 

234 'resource_stream', 

235 'resource_filename', 

236 'resource_listdir', 

237 'resource_exists', 

238 'resource_isdir', 

239 # Environmental control 

240 'declare_namespace', 

241 'working_set', 

242 'add_activation_listener', 

243 'find_distributions', 

244 'set_extraction_path', 

245 'cleanup_resources', 

246 'get_default_cache', 

247 # Primary implementation classes 

248 'Environment', 

249 'WorkingSet', 

250 'ResourceManager', 

251 'Distribution', 

252 'Requirement', 

253 'EntryPoint', 

254 # Exceptions 

255 'ResolutionError', 

256 'VersionConflict', 

257 'DistributionNotFound', 

258 'UnknownExtra', 

259 'ExtractionError', 

260 # Warnings 

261 'PEP440Warning', 

262 # Parsing functions and string utilities 

263 'parse_requirements', 

264 'parse_version', 

265 'safe_name', 

266 'safe_version', 

267 'get_platform', 

268 'compatible_platforms', 

269 'yield_lines', 

270 'split_sections', 

271 'safe_extra', 

272 'to_filename', 

273 'invalid_marker', 

274 'evaluate_marker', 

275 # filesystem utilities 

276 'ensure_directory', 

277 'normalize_path', 

278 # Distribution "precedence" constants 

279 'EGG_DIST', 

280 'BINARY_DIST', 

281 'SOURCE_DIST', 

282 'CHECKOUT_DIST', 

283 'DEVELOP_DIST', 

284 # "Provider" interfaces, implementations, and registration/lookup APIs 

285 'IMetadataProvider', 

286 'IResourceProvider', 

287 'FileMetadata', 

288 'PathMetadata', 

289 'EggMetadata', 

290 'EmptyProvider', 

291 'empty_provider', 

292 'NullProvider', 

293 'EggProvider', 

294 'DefaultProvider', 

295 'ZipProvider', 

296 'register_finder', 

297 'register_namespace_handler', 

298 'register_loader_type', 

299 'fixup_namespace_packages', 

300 'get_importer', 

301 # Warnings 

302 'PkgResourcesDeprecationWarning', 

303 # Deprecated/backward compatibility only 

304 'run_main', 

305 'AvailableDistributions', 

306] 

307 

308 

309class ResolutionError(Exception): 

310 """Abstract base for dependency resolution errors""" 

311 

312 def __repr__(self): 

313 return self.__class__.__name__ + repr(self.args) 

314 

315 

316class VersionConflict(ResolutionError): 

317 """ 

318 An already-installed version conflicts with the requested version. 

319 

320 Should be initialized with the installed Distribution and the requested 

321 Requirement. 

322 """ 

323 

324 _template = "{self.dist} is installed but {self.req} is required" 

325 

326 @property 

327 def dist(self) -> Distribution: 

328 return self.args[0] 

329 

330 @property 

331 def req(self) -> Requirement: 

332 return self.args[1] 

333 

334 def report(self): 

335 return self._template.format(**locals()) 

336 

337 def with_context(self, required_by: set[Distribution | str]): 

338 """ 

339 If required_by is non-empty, return a version of self that is a 

340 ContextualVersionConflict. 

341 """ 

342 if not required_by: 

343 return self 

344 args = self.args + (required_by,) 

345 return ContextualVersionConflict(*args) 

346 

347 

348class ContextualVersionConflict(VersionConflict): 

349 """ 

350 A VersionConflict that accepts a third parameter, the set of the 

351 requirements that required the installed Distribution. 

352 """ 

353 

354 _template = VersionConflict._template + ' by {self.required_by}' 

355 

356 @property 

357 def required_by(self) -> set[str]: 

358 return self.args[2] 

359 

360 

361class DistributionNotFound(ResolutionError): 

362 """A requested distribution was not found""" 

363 

364 _template = ( 

365 "The '{self.req}' distribution was not found " 

366 "and is required by {self.requirers_str}" 

367 ) 

368 

369 @property 

370 def req(self) -> Requirement: 

371 return self.args[0] 

372 

373 @property 

374 def requirers(self) -> set[str] | None: 

375 return self.args[1] 

376 

377 @property 

378 def requirers_str(self): 

379 if not self.requirers: 

380 return 'the application' 

381 return ', '.join(self.requirers) 

382 

383 def report(self): 

384 return self._template.format(**locals()) 

385 

386 def __str__(self): 

387 return self.report() 

388 

389 

390class UnknownExtra(ResolutionError): 

391 """Distribution doesn't have an "extra feature" of the given name""" 

392 

393 

394_provider_factories: dict[type[_ModuleLike], _ProviderFactoryType] = {} 

395 

396PY_MAJOR = '{}.{}'.format(*sys.version_info) 

397EGG_DIST = 3 

398BINARY_DIST = 2 

399SOURCE_DIST = 1 

400CHECKOUT_DIST = 0 

401DEVELOP_DIST = -1 

402 

403 

404def register_loader_type( 

405 loader_type: type[_ModuleLike], provider_factory: _ProviderFactoryType 

406): 

407 """Register `provider_factory` to make providers for `loader_type` 

408 

409 `loader_type` is the type or class of a PEP 302 ``module.__loader__``, 

410 and `provider_factory` is a function that, passed a *module* object, 

411 returns an ``IResourceProvider`` for that module. 

412 """ 

413 _provider_factories[loader_type] = provider_factory 

414 

415 

416@overload 

417def get_provider(moduleOrReq: str) -> IResourceProvider: ... 

418@overload 

419def get_provider(moduleOrReq: Requirement) -> Distribution: ... 

420def get_provider(moduleOrReq: str | Requirement) -> IResourceProvider | Distribution: 

421 """Return an IResourceProvider for the named module or requirement""" 

422 if isinstance(moduleOrReq, Requirement): 

423 return working_set.find(moduleOrReq) or require(str(moduleOrReq))[0] 

424 try: 

425 module = sys.modules[moduleOrReq] 

426 except KeyError: 

427 __import__(moduleOrReq) 

428 module = sys.modules[moduleOrReq] 

429 loader = getattr(module, '__loader__', None) 

430 return _find_adapter(_provider_factories, loader)(module) 

431 

432 

433@functools.lru_cache(maxsize=None) 

434def _macos_vers(): 

435 version = platform.mac_ver()[0] 

436 # fallback for MacPorts 

437 if version == '': 

438 plist = '/System/Library/CoreServices/SystemVersion.plist' 

439 if os.path.exists(plist): 

440 with open(plist, 'rb') as fh: 

441 plist_content = plistlib.load(fh) 

442 if 'ProductVersion' in plist_content: 

443 version = plist_content['ProductVersion'] 

444 return version.split('.') 

445 

446 

447def _macos_arch(machine): 

448 return {'PowerPC': 'ppc', 'Power_Macintosh': 'ppc'}.get(machine, machine) 

449 

450 

451def get_build_platform(): 

452 """Return this platform's string for platform-specific distributions 

453 

454 XXX Currently this is the same as ``distutils.util.get_platform()``, but it 

455 needs some hacks for Linux and macOS. 

456 """ 

457 from sysconfig import get_platform 

458 

459 plat = get_platform() 

460 if sys.platform == "darwin" and not plat.startswith('macosx-'): 

461 try: 

462 version = _macos_vers() 

463 machine = os.uname()[4].replace(" ", "_") 

464 return "macosx-%d.%d-%s" % ( 

465 int(version[0]), 

466 int(version[1]), 

467 _macos_arch(machine), 

468 ) 

469 except ValueError: 

470 # if someone is running a non-Mac darwin system, this will fall 

471 # through to the default implementation 

472 pass 

473 return plat 

474 

475 

476macosVersionString = re.compile(r"macosx-(\d+)\.(\d+)-(.*)") 

477darwinVersionString = re.compile(r"darwin-(\d+)\.(\d+)\.(\d+)-(.*)") 

478# XXX backward compat 

479get_platform = get_build_platform 

480 

481 

482def compatible_platforms(provided: str | None, required: str | None): 

483 """Can code for the `provided` platform run on the `required` platform? 

484 

485 Returns true if either platform is ``None``, or the platforms are equal. 

486 

487 XXX Needs compatibility checks for Linux and other unixy OSes. 

488 """ 

489 if provided is None or required is None or provided == required: 

490 # easy case 

491 return True 

492 

493 # macOS special cases 

494 reqMac = macosVersionString.match(required) 

495 if reqMac: 

496 provMac = macosVersionString.match(provided) 

497 

498 # is this a Mac package? 

499 if not provMac: 

500 # this is backwards compatibility for packages built before 

501 # setuptools 0.6. All packages built after this point will 

502 # use the new macOS designation. 

503 provDarwin = darwinVersionString.match(provided) 

504 if provDarwin: 

505 dversion = int(provDarwin.group(1)) 

506 macosversion = "%s.%s" % (reqMac.group(1), reqMac.group(2)) 

507 if ( 

508 dversion == 7 

509 and macosversion >= "10.3" 

510 or dversion == 8 

511 and macosversion >= "10.4" 

512 ): 

513 return True 

514 # egg isn't macOS or legacy darwin 

515 return False 

516 

517 # are they the same major version and machine type? 

518 if provMac.group(1) != reqMac.group(1) or provMac.group(3) != reqMac.group(3): 

519 return False 

520 

521 # is the required OS major update >= the provided one? 

522 if int(provMac.group(2)) > int(reqMac.group(2)): 

523 return False 

524 

525 return True 

526 

527 # XXX Linux and other platforms' special cases should go here 

528 return False 

529 

530 

531@overload 

532def get_distribution(dist: _DistributionT) -> _DistributionT: ... 

533@overload 

534def get_distribution(dist: _PkgReqType) -> Distribution: ... 

535def get_distribution(dist: Distribution | _PkgReqType) -> Distribution: 

536 """Return a current distribution object for a Requirement or string""" 

537 if isinstance(dist, str): 

538 dist = Requirement.parse(dist) 

539 if isinstance(dist, Requirement): 

540 # Bad type narrowing, dist has to be a Requirement here, so get_provider has to return Distribution 

541 dist = get_provider(dist) # type: ignore[assignment] 

542 if not isinstance(dist, Distribution): 

543 raise TypeError("Expected str, Requirement, or Distribution", dist) 

544 return dist 

545 

546 

547def load_entry_point(dist: _EPDistType, group: str, name: str) -> _ResolvedEntryPoint: 

548 """Return `name` entry point of `group` for `dist` or raise ImportError""" 

549 return get_distribution(dist).load_entry_point(group, name) 

550 

551 

552@overload 

553def get_entry_map( 

554 dist: _EPDistType, group: None = None 

555) -> dict[str, dict[str, EntryPoint]]: ... 

556@overload 

557def get_entry_map(dist: _EPDistType, group: str) -> dict[str, EntryPoint]: ... 

558def get_entry_map(dist: _EPDistType, group: str | None = None): 

559 """Return the entry point map for `group`, or the full entry map""" 

560 return get_distribution(dist).get_entry_map(group) 

561 

562 

563def get_entry_info(dist: _EPDistType, group: str, name: str): 

564 """Return the EntryPoint object for `group`+`name`, or ``None``""" 

565 return get_distribution(dist).get_entry_info(group, name) 

566 

567 

568class IMetadataProvider(Protocol): 

569 def has_metadata(self, name: str) -> bool: 

570 """Does the package's distribution contain the named metadata?""" 

571 

572 def get_metadata(self, name: str) -> str: 

573 """The named metadata resource as a string""" 

574 

575 def get_metadata_lines(self, name: str) -> Iterator[str]: 

576 """Yield named metadata resource as list of non-blank non-comment lines 

577 

578 Leading and trailing whitespace is stripped from each line, and lines 

579 with ``#`` as the first non-blank character are omitted.""" 

580 

581 def metadata_isdir(self, name: str) -> bool: 

582 """Is the named metadata a directory? (like ``os.path.isdir()``)""" 

583 

584 def metadata_listdir(self, name: str) -> list[str]: 

585 """List of metadata names in the directory (like ``os.listdir()``)""" 

586 

587 def run_script(self, script_name: str, namespace: dict[str, Any]) -> None: 

588 """Execute the named script in the supplied namespace dictionary""" 

589 

590 

591class IResourceProvider(IMetadataProvider, Protocol): 

592 """An object that provides access to package resources""" 

593 

594 def get_resource_filename( 

595 self, manager: ResourceManager, resource_name: str 

596 ) -> str: 

597 """Return a true filesystem path for `resource_name` 

598 

599 `manager` must be a ``ResourceManager``""" 

600 

601 def get_resource_stream( 

602 self, manager: ResourceManager, resource_name: str 

603 ) -> _ResourceStream: 

604 """Return a readable file-like object for `resource_name` 

605 

606 `manager` must be a ``ResourceManager``""" 

607 

608 def get_resource_string( 

609 self, manager: ResourceManager, resource_name: str 

610 ) -> bytes: 

611 """Return the contents of `resource_name` as :obj:`bytes` 

612 

613 `manager` must be a ``ResourceManager``""" 

614 

615 def has_resource(self, resource_name: str) -> bool: 

616 """Does the package contain the named resource?""" 

617 

618 def resource_isdir(self, resource_name: str) -> bool: 

619 """Is the named resource a directory? (like ``os.path.isdir()``)""" 

620 

621 def resource_listdir(self, resource_name: str) -> list[str]: 

622 """List of resource names in the directory (like ``os.listdir()``)""" 

623 

624 

625class WorkingSet: 

626 """A collection of active distributions on sys.path (or a similar list)""" 

627 

628 def __init__(self, entries: Iterable[str] | None = None): 

629 """Create working set from list of path entries (default=sys.path)""" 

630 self.entries: list[str] = [] 

631 self.entry_keys = {} 

632 self.by_key = {} 

633 self.normalized_to_canonical_keys = {} 

634 self.callbacks = [] 

635 

636 if entries is None: 

637 entries = sys.path 

638 

639 for entry in entries: 

640 self.add_entry(entry) 

641 

642 @classmethod 

643 def _build_master(cls): 

644 """ 

645 Prepare the master working set. 

646 """ 

647 ws = cls() 

648 try: 

649 from __main__ import __requires__ 

650 except ImportError: 

651 # The main program does not list any requirements 

652 return ws 

653 

654 # ensure the requirements are met 

655 try: 

656 ws.require(__requires__) 

657 except VersionConflict: 

658 return cls._build_from_requirements(__requires__) 

659 

660 return ws 

661 

662 @classmethod 

663 def _build_from_requirements(cls, req_spec): 

664 """ 

665 Build a working set from a requirement spec. Rewrites sys.path. 

666 """ 

667 # try it without defaults already on sys.path 

668 # by starting with an empty path 

669 ws = cls([]) 

670 reqs = parse_requirements(req_spec) 

671 dists = ws.resolve(reqs, Environment()) 

672 for dist in dists: 

673 ws.add(dist) 

674 

675 # add any missing entries from sys.path 

676 for entry in sys.path: 

677 if entry not in ws.entries: 

678 ws.add_entry(entry) 

679 

680 # then copy back to sys.path 

681 sys.path[:] = ws.entries 

682 return ws 

683 

684 def add_entry(self, entry: str): 

685 """Add a path item to ``.entries``, finding any distributions on it 

686 

687 ``find_distributions(entry, True)`` is used to find distributions 

688 corresponding to the path entry, and they are added. `entry` is 

689 always appended to ``.entries``, even if it is already present. 

690 (This is because ``sys.path`` can contain the same value more than 

691 once, and the ``.entries`` of the ``sys.path`` WorkingSet should always 

692 equal ``sys.path``.) 

693 """ 

694 self.entry_keys.setdefault(entry, []) 

695 self.entries.append(entry) 

696 for dist in find_distributions(entry, True): 

697 self.add(dist, entry, False) 

698 

699 def __contains__(self, dist: Distribution) -> bool: 

700 """True if `dist` is the active distribution for its project""" 

701 return self.by_key.get(dist.key) == dist 

702 

703 def find(self, req: Requirement) -> Distribution | None: 

704 """Find a distribution matching requirement `req` 

705 

706 If there is an active distribution for the requested project, this 

707 returns it as long as it meets the version requirement specified by 

708 `req`. But, if there is an active distribution for the project and it 

709 does *not* meet the `req` requirement, ``VersionConflict`` is raised. 

710 If there is no active distribution for the requested project, ``None`` 

711 is returned. 

712 """ 

713 dist = self.by_key.get(req.key) 

714 

715 if dist is None: 

716 canonical_key = self.normalized_to_canonical_keys.get(req.key) 

717 

718 if canonical_key is not None: 

719 req.key = canonical_key 

720 dist = self.by_key.get(canonical_key) 

721 

722 if dist is not None and dist not in req: 

723 # XXX add more info 

724 raise VersionConflict(dist, req) 

725 return dist 

726 

727 def iter_entry_points(self, group: str, name: str | None = None): 

728 """Yield entry point objects from `group` matching `name` 

729 

730 If `name` is None, yields all entry points in `group` from all 

731 distributions in the working set, otherwise only ones matching 

732 both `group` and `name` are yielded (in distribution order). 

733 """ 

734 return ( 

735 entry 

736 for dist in self 

737 for entry in dist.get_entry_map(group).values() 

738 if name is None or name == entry.name 

739 ) 

740 

741 def run_script(self, requires: str, script_name: str): 

742 """Locate distribution for `requires` and run `script_name` script""" 

743 ns = sys._getframe(1).f_globals 

744 name = ns['__name__'] 

745 ns.clear() 

746 ns['__name__'] = name 

747 self.require(requires)[0].run_script(script_name, ns) 

748 

749 def __iter__(self) -> Iterator[Distribution]: 

750 """Yield distributions for non-duplicate projects in the working set 

751 

752 The yield order is the order in which the items' path entries were 

753 added to the working set. 

754 """ 

755 seen = set() 

756 for item in self.entries: 

757 if item not in self.entry_keys: 

758 # workaround a cache issue 

759 continue 

760 

761 for key in self.entry_keys[item]: 

762 if key not in seen: 

763 seen.add(key) 

764 yield self.by_key[key] 

765 

766 def add( 

767 self, 

768 dist: Distribution, 

769 entry: str | None = None, 

770 insert: bool = True, 

771 replace: bool = False, 

772 ): 

773 """Add `dist` to working set, associated with `entry` 

774 

775 If `entry` is unspecified, it defaults to the ``.location`` of `dist`. 

776 On exit from this routine, `entry` is added to the end of the working 

777 set's ``.entries`` (if it wasn't already present). 

778 

779 `dist` is only added to the working set if it's for a project that 

780 doesn't already have a distribution in the set, unless `replace=True`. 

781 If it's added, any callbacks registered with the ``subscribe()`` method 

782 will be called. 

783 """ 

784 if insert: 

785 dist.insert_on(self.entries, entry, replace=replace) 

786 

787 if entry is None: 

788 entry = dist.location 

789 keys = self.entry_keys.setdefault(entry, []) 

790 keys2 = self.entry_keys.setdefault(dist.location, []) 

791 if not replace and dist.key in self.by_key: 

792 # ignore hidden distros 

793 return 

794 

795 self.by_key[dist.key] = dist 

796 normalized_name = _packaging_utils.canonicalize_name(dist.key) 

797 self.normalized_to_canonical_keys[normalized_name] = dist.key 

798 if dist.key not in keys: 

799 keys.append(dist.key) 

800 if dist.key not in keys2: 

801 keys2.append(dist.key) 

802 self._added_new(dist) 

803 

804 @overload 

805 def resolve( 

806 self, 

807 requirements: Iterable[Requirement], 

808 env: Environment | None, 

809 installer: _InstallerTypeT[_DistributionT], 

810 replace_conflicting: bool = False, 

811 extras: tuple[str, ...] | None = None, 

812 ) -> list[_DistributionT]: ... 

813 @overload 

814 def resolve( 

815 self, 

816 requirements: Iterable[Requirement], 

817 env: Environment | None = None, 

818 *, 

819 installer: _InstallerTypeT[_DistributionT], 

820 replace_conflicting: bool = False, 

821 extras: tuple[str, ...] | None = None, 

822 ) -> list[_DistributionT]: ... 

823 @overload 

824 def resolve( 

825 self, 

826 requirements: Iterable[Requirement], 

827 env: Environment | None = None, 

828 installer: _InstallerType | None = None, 

829 replace_conflicting: bool = False, 

830 extras: tuple[str, ...] | None = None, 

831 ) -> list[Distribution]: ... 

832 def resolve( 

833 self, 

834 requirements: Iterable[Requirement], 

835 env: Environment | None = None, 

836 installer: _InstallerType | None | _InstallerTypeT[_DistributionT] = None, 

837 replace_conflicting: bool = False, 

838 extras: tuple[str, ...] | None = None, 

839 ) -> list[Distribution] | list[_DistributionT]: 

840 """List all distributions needed to (recursively) meet `requirements` 

841 

842 `requirements` must be a sequence of ``Requirement`` objects. `env`, 

843 if supplied, should be an ``Environment`` instance. If 

844 not supplied, it defaults to all distributions available within any 

845 entry or distribution in the working set. `installer`, if supplied, 

846 will be invoked with each requirement that cannot be met by an 

847 already-installed distribution; it should return a ``Distribution`` or 

848 ``None``. 

849 

850 Unless `replace_conflicting=True`, raises a VersionConflict exception 

851 if 

852 any requirements are found on the path that have the correct name but 

853 the wrong version. Otherwise, if an `installer` is supplied it will be 

854 invoked to obtain the correct version of the requirement and activate 

855 it. 

856 

857 `extras` is a list of the extras to be used with these requirements. 

858 This is important because extra requirements may look like `my_req; 

859 extra = "my_extra"`, which would otherwise be interpreted as a purely 

860 optional requirement. Instead, we want to be able to assert that these 

861 requirements are truly required. 

862 """ 

863 

864 # set up the stack 

865 requirements = list(requirements)[::-1] 

866 # set of processed requirements 

867 processed = set() 

868 # key -> dist 

869 best = {} 

870 to_activate = [] 

871 

872 req_extras = _ReqExtras() 

873 

874 # Mapping of requirement to set of distributions that required it; 

875 # useful for reporting info about conflicts. 

876 required_by = collections.defaultdict(set) 

877 

878 while requirements: 

879 # process dependencies breadth-first 

880 req = requirements.pop(0) 

881 if req in processed: 

882 # Ignore cyclic or redundant dependencies 

883 continue 

884 

885 if not req_extras.markers_pass(req, extras): 

886 continue 

887 

888 dist = self._resolve_dist( 

889 req, best, replace_conflicting, env, installer, required_by, to_activate 

890 ) 

891 

892 # push the new requirements onto the stack 

893 new_requirements = dist.requires(req.extras)[::-1] 

894 requirements.extend(new_requirements) 

895 

896 # Register the new requirements needed by req 

897 for new_requirement in new_requirements: 

898 required_by[new_requirement].add(req.project_name) 

899 req_extras[new_requirement] = req.extras 

900 

901 processed.add(req) 

902 

903 # return list of distros to activate 

904 return to_activate 

905 

906 def _resolve_dist( 

907 self, req, best, replace_conflicting, env, installer, required_by, to_activate 

908 ) -> Distribution: 

909 dist = best.get(req.key) 

910 if dist is None: 

911 # Find the best distribution and add it to the map 

912 dist = self.by_key.get(req.key) 

913 if dist is None or (dist not in req and replace_conflicting): 

914 ws = self 

915 if env is None: 

916 if dist is None: 

917 env = Environment(self.entries) 

918 else: 

919 # Use an empty environment and workingset to avoid 

920 # any further conflicts with the conflicting 

921 # distribution 

922 env = Environment([]) 

923 ws = WorkingSet([]) 

924 dist = best[req.key] = env.best_match( 

925 req, ws, installer, replace_conflicting=replace_conflicting 

926 ) 

927 if dist is None: 

928 requirers = required_by.get(req, None) 

929 raise DistributionNotFound(req, requirers) 

930 to_activate.append(dist) 

931 if dist not in req: 

932 # Oops, the "best" so far conflicts with a dependency 

933 dependent_req = required_by[req] 

934 raise VersionConflict(dist, req).with_context(dependent_req) 

935 return dist 

936 

937 @overload 

938 def find_plugins( 

939 self, 

940 plugin_env: Environment, 

941 full_env: Environment | None, 

942 installer: _InstallerTypeT[_DistributionT], 

943 fallback: bool = True, 

944 ) -> tuple[list[_DistributionT], dict[Distribution, Exception]]: ... 

945 @overload 

946 def find_plugins( 

947 self, 

948 plugin_env: Environment, 

949 full_env: Environment | None = None, 

950 *, 

951 installer: _InstallerTypeT[_DistributionT], 

952 fallback: bool = True, 

953 ) -> tuple[list[_DistributionT], dict[Distribution, Exception]]: ... 

954 @overload 

955 def find_plugins( 

956 self, 

957 plugin_env: Environment, 

958 full_env: Environment | None = None, 

959 installer: _InstallerType | None = None, 

960 fallback: bool = True, 

961 ) -> tuple[list[Distribution], dict[Distribution, Exception]]: ... 

962 def find_plugins( 

963 self, 

964 plugin_env: Environment, 

965 full_env: Environment | None = None, 

966 installer: _InstallerType | None | _InstallerTypeT[_DistributionT] = None, 

967 fallback: bool = True, 

968 ) -> tuple[ 

969 list[Distribution] | list[_DistributionT], 

970 dict[Distribution, Exception], 

971 ]: 

972 """Find all activatable distributions in `plugin_env` 

973 

974 Example usage:: 

975 

976 distributions, errors = working_set.find_plugins( 

977 Environment(plugin_dirlist) 

978 ) 

979 # add plugins+libs to sys.path 

980 map(working_set.add, distributions) 

981 # display errors 

982 print('Could not load', errors) 

983 

984 The `plugin_env` should be an ``Environment`` instance that contains 

985 only distributions that are in the project's "plugin directory" or 

986 directories. The `full_env`, if supplied, should be an ``Environment`` 

987 contains all currently-available distributions. If `full_env` is not 

988 supplied, one is created automatically from the ``WorkingSet`` this 

989 method is called on, which will typically mean that every directory on 

990 ``sys.path`` will be scanned for distributions. 

991 

992 `installer` is a standard installer callback as used by the 

993 ``resolve()`` method. The `fallback` flag indicates whether we should 

994 attempt to resolve older versions of a plugin if the newest version 

995 cannot be resolved. 

996 

997 This method returns a 2-tuple: (`distributions`, `error_info`), where 

998 `distributions` is a list of the distributions found in `plugin_env` 

999 that were loadable, along with any other distributions that are needed 

1000 to resolve their dependencies. `error_info` is a dictionary mapping 

1001 unloadable plugin distributions to an exception instance describing the 

1002 error that occurred. Usually this will be a ``DistributionNotFound`` or 

1003 ``VersionConflict`` instance. 

1004 """ 

1005 

1006 plugin_projects = list(plugin_env) 

1007 # scan project names in alphabetic order 

1008 plugin_projects.sort() 

1009 

1010 error_info: dict[Distribution, Exception] = {} 

1011 distributions: dict[Distribution, Exception | None] = {} 

1012 

1013 if full_env is None: 

1014 env = Environment(self.entries) 

1015 env += plugin_env 

1016 else: 

1017 env = full_env + plugin_env 

1018 

1019 shadow_set = self.__class__([]) 

1020 # put all our entries in shadow_set 

1021 list(map(shadow_set.add, self)) 

1022 

1023 for project_name in plugin_projects: 

1024 for dist in plugin_env[project_name]: 

1025 req = [dist.as_requirement()] 

1026 

1027 try: 

1028 resolvees = shadow_set.resolve(req, env, installer) 

1029 

1030 except ResolutionError as v: 

1031 # save error info 

1032 error_info[dist] = v 

1033 if fallback: 

1034 # try the next older version of project 

1035 continue 

1036 else: 

1037 # give up on this project, keep going 

1038 break 

1039 

1040 else: 

1041 list(map(shadow_set.add, resolvees)) 

1042 distributions.update(dict.fromkeys(resolvees)) 

1043 

1044 # success, no need to try any more versions of this project 

1045 break 

1046 

1047 sorted_distributions = list(distributions) 

1048 sorted_distributions.sort() 

1049 

1050 return sorted_distributions, error_info 

1051 

1052 def require(self, *requirements: _NestedStr): 

1053 """Ensure that distributions matching `requirements` are activated 

1054 

1055 `requirements` must be a string or a (possibly-nested) sequence 

1056 thereof, specifying the distributions and versions required. The 

1057 return value is a sequence of the distributions that needed to be 

1058 activated to fulfill the requirements; all relevant distributions are 

1059 included, even if they were already activated in this working set. 

1060 """ 

1061 needed = self.resolve(parse_requirements(requirements)) 

1062 

1063 for dist in needed: 

1064 self.add(dist) 

1065 

1066 return needed 

1067 

1068 def subscribe( 

1069 self, callback: Callable[[Distribution], object], existing: bool = True 

1070 ): 

1071 """Invoke `callback` for all distributions 

1072 

1073 If `existing=True` (default), 

1074 call on all existing ones, as well. 

1075 """ 

1076 if callback in self.callbacks: 

1077 return 

1078 self.callbacks.append(callback) 

1079 if not existing: 

1080 return 

1081 for dist in self: 

1082 callback(dist) 

1083 

1084 def _added_new(self, dist): 

1085 for callback in self.callbacks: 

1086 callback(dist) 

1087 

1088 def __getstate__(self): 

1089 return ( 

1090 self.entries[:], 

1091 self.entry_keys.copy(), 

1092 self.by_key.copy(), 

1093 self.normalized_to_canonical_keys.copy(), 

1094 self.callbacks[:], 

1095 ) 

1096 

1097 def __setstate__(self, e_k_b_n_c): 

1098 entries, keys, by_key, normalized_to_canonical_keys, callbacks = e_k_b_n_c 

1099 self.entries = entries[:] 

1100 self.entry_keys = keys.copy() 

1101 self.by_key = by_key.copy() 

1102 self.normalized_to_canonical_keys = normalized_to_canonical_keys.copy() 

1103 self.callbacks = callbacks[:] 

1104 

1105 

1106class _ReqExtras(Dict["Requirement", Tuple[str, ...]]): 

1107 """ 

1108 Map each requirement to the extras that demanded it. 

1109 """ 

1110 

1111 def markers_pass(self, req: Requirement, extras: tuple[str, ...] | None = None): 

1112 """ 

1113 Evaluate markers for req against each extra that 

1114 demanded it. 

1115 

1116 Return False if the req has a marker and fails 

1117 evaluation. Otherwise, return True. 

1118 """ 

1119 extra_evals = ( 

1120 req.marker.evaluate({'extra': extra}) 

1121 for extra in self.get(req, ()) + (extras or (None,)) 

1122 ) 

1123 return not req.marker or any(extra_evals) 

1124 

1125 

1126class Environment: 

1127 """Searchable snapshot of distributions on a search path""" 

1128 

1129 def __init__( 

1130 self, 

1131 search_path: Iterable[str] | None = None, 

1132 platform: str | None = get_supported_platform(), 

1133 python: str | None = PY_MAJOR, 

1134 ): 

1135 """Snapshot distributions available on a search path 

1136 

1137 Any distributions found on `search_path` are added to the environment. 

1138 `search_path` should be a sequence of ``sys.path`` items. If not 

1139 supplied, ``sys.path`` is used. 

1140 

1141 `platform` is an optional string specifying the name of the platform 

1142 that platform-specific distributions must be compatible with. If 

1143 unspecified, it defaults to the current platform. `python` is an 

1144 optional string naming the desired version of Python (e.g. ``'3.6'``); 

1145 it defaults to the current version. 

1146 

1147 You may explicitly set `platform` (and/or `python`) to ``None`` if you 

1148 wish to map *all* distributions, not just those compatible with the 

1149 running platform or Python version. 

1150 """ 

1151 self._distmap = {} 

1152 self.platform = platform 

1153 self.python = python 

1154 self.scan(search_path) 

1155 

1156 def can_add(self, dist: Distribution): 

1157 """Is distribution `dist` acceptable for this environment? 

1158 

1159 The distribution must match the platform and python version 

1160 requirements specified when this environment was created, or False 

1161 is returned. 

1162 """ 

1163 py_compat = ( 

1164 self.python is None 

1165 or dist.py_version is None 

1166 or dist.py_version == self.python 

1167 ) 

1168 return py_compat and compatible_platforms(dist.platform, self.platform) 

1169 

1170 def remove(self, dist: Distribution): 

1171 """Remove `dist` from the environment""" 

1172 self._distmap[dist.key].remove(dist) 

1173 

1174 def scan(self, search_path: Iterable[str] | None = None): 

1175 """Scan `search_path` for distributions usable in this environment 

1176 

1177 Any distributions found are added to the environment. 

1178 `search_path` should be a sequence of ``sys.path`` items. If not 

1179 supplied, ``sys.path`` is used. Only distributions conforming to 

1180 the platform/python version defined at initialization are added. 

1181 """ 

1182 if search_path is None: 

1183 search_path = sys.path 

1184 

1185 for item in search_path: 

1186 for dist in find_distributions(item): 

1187 self.add(dist) 

1188 

1189 def __getitem__(self, project_name: str) -> list[Distribution]: 

1190 """Return a newest-to-oldest list of distributions for `project_name` 

1191 

1192 Uses case-insensitive `project_name` comparison, assuming all the 

1193 project's distributions use their project's name converted to all 

1194 lowercase as their key. 

1195 

1196 """ 

1197 distribution_key = project_name.lower() 

1198 return self._distmap.get(distribution_key, []) 

1199 

1200 def add(self, dist: Distribution): 

1201 """Add `dist` if we ``can_add()`` it and it has not already been added""" 

1202 if self.can_add(dist) and dist.has_version(): 

1203 dists = self._distmap.setdefault(dist.key, []) 

1204 if dist not in dists: 

1205 dists.append(dist) 

1206 dists.sort(key=operator.attrgetter('hashcmp'), reverse=True) 

1207 

1208 @overload 

1209 def best_match( 

1210 self, 

1211 req: Requirement, 

1212 working_set: WorkingSet, 

1213 installer: _InstallerTypeT[_DistributionT], 

1214 replace_conflicting: bool = False, 

1215 ) -> _DistributionT: ... 

1216 @overload 

1217 def best_match( 

1218 self, 

1219 req: Requirement, 

1220 working_set: WorkingSet, 

1221 installer: _InstallerType | None = None, 

1222 replace_conflicting: bool = False, 

1223 ) -> Distribution | None: ... 

1224 def best_match( 

1225 self, 

1226 req: Requirement, 

1227 working_set: WorkingSet, 

1228 installer: _InstallerType | None | _InstallerTypeT[_DistributionT] = None, 

1229 replace_conflicting: bool = False, 

1230 ) -> Distribution | None: 

1231 """Find distribution best matching `req` and usable on `working_set` 

1232 

1233 This calls the ``find(req)`` method of the `working_set` to see if a 

1234 suitable distribution is already active. (This may raise 

1235 ``VersionConflict`` if an unsuitable version of the project is already 

1236 active in the specified `working_set`.) If a suitable distribution 

1237 isn't active, this method returns the newest distribution in the 

1238 environment that meets the ``Requirement`` in `req`. If no suitable 

1239 distribution is found, and `installer` is supplied, then the result of 

1240 calling the environment's ``obtain(req, installer)`` method will be 

1241 returned. 

1242 """ 

1243 try: 

1244 dist = working_set.find(req) 

1245 except VersionConflict: 

1246 if not replace_conflicting: 

1247 raise 

1248 dist = None 

1249 if dist is not None: 

1250 return dist 

1251 for dist in self[req.key]: 

1252 if dist in req: 

1253 return dist 

1254 # try to download/install 

1255 return self.obtain(req, installer) 

1256 

1257 @overload 

1258 def obtain( 

1259 self, 

1260 requirement: Requirement, 

1261 installer: _InstallerTypeT[_DistributionT], 

1262 ) -> _DistributionT: ... 

1263 @overload 

1264 def obtain( 

1265 self, 

1266 requirement: Requirement, 

1267 installer: Callable[[Requirement], None] | None = None, 

1268 ) -> None: ... 

1269 @overload 

1270 def obtain( 

1271 self, 

1272 requirement: Requirement, 

1273 installer: _InstallerType | None = None, 

1274 ) -> Distribution | None: ... 

1275 def obtain( 

1276 self, 

1277 requirement: Requirement, 

1278 installer: Callable[[Requirement], None] 

1279 | _InstallerType 

1280 | None 

1281 | _InstallerTypeT[_DistributionT] = None, 

1282 ) -> Distribution | None: 

1283 """Obtain a distribution matching `requirement` (e.g. via download) 

1284 

1285 Obtain a distro that matches requirement (e.g. via download). In the 

1286 base ``Environment`` class, this routine just returns 

1287 ``installer(requirement)``, unless `installer` is None, in which case 

1288 None is returned instead. This method is a hook that allows subclasses 

1289 to attempt other ways of obtaining a distribution before falling back 

1290 to the `installer` argument.""" 

1291 return installer(requirement) if installer else None 

1292 

1293 def __iter__(self) -> Iterator[str]: 

1294 """Yield the unique project names of the available distributions""" 

1295 for key in self._distmap.keys(): 

1296 if self[key]: 

1297 yield key 

1298 

1299 def __iadd__(self, other: Distribution | Environment): 

1300 """In-place addition of a distribution or environment""" 

1301 if isinstance(other, Distribution): 

1302 self.add(other) 

1303 elif isinstance(other, Environment): 

1304 for project in other: 

1305 for dist in other[project]: 

1306 self.add(dist) 

1307 else: 

1308 raise TypeError("Can't add %r to environment" % (other,)) 

1309 return self 

1310 

1311 def __add__(self, other: Distribution | Environment): 

1312 """Add an environment or distribution to an environment""" 

1313 new = self.__class__([], platform=None, python=None) 

1314 for env in self, other: 

1315 new += env 

1316 return new 

1317 

1318 

1319# XXX backward compatibility 

1320AvailableDistributions = Environment 

1321 

1322 

1323class ExtractionError(RuntimeError): 

1324 """An error occurred extracting a resource 

1325 

1326 The following attributes are available from instances of this exception: 

1327 

1328 manager 

1329 The resource manager that raised this exception 

1330 

1331 cache_path 

1332 The base directory for resource extraction 

1333 

1334 original_error 

1335 The exception instance that caused extraction to fail 

1336 """ 

1337 

1338 manager: ResourceManager 

1339 cache_path: str 

1340 original_error: BaseException | None 

1341 

1342 

1343class ResourceManager: 

1344 """Manage resource extraction and packages""" 

1345 

1346 extraction_path: str | None = None 

1347 

1348 def __init__(self): 

1349 self.cached_files = {} 

1350 

1351 def resource_exists(self, package_or_requirement: _PkgReqType, resource_name: str): 

1352 """Does the named resource exist?""" 

1353 return get_provider(package_or_requirement).has_resource(resource_name) 

1354 

1355 def resource_isdir(self, package_or_requirement: _PkgReqType, resource_name: str): 

1356 """Is the named resource an existing directory?""" 

1357 return get_provider(package_or_requirement).resource_isdir(resource_name) 

1358 

1359 def resource_filename( 

1360 self, package_or_requirement: _PkgReqType, resource_name: str 

1361 ): 

1362 """Return a true filesystem path for specified resource""" 

1363 return get_provider(package_or_requirement).get_resource_filename( 

1364 self, resource_name 

1365 ) 

1366 

1367 def resource_stream(self, package_or_requirement: _PkgReqType, resource_name: str): 

1368 """Return a readable file-like object for specified resource""" 

1369 return get_provider(package_or_requirement).get_resource_stream( 

1370 self, resource_name 

1371 ) 

1372 

1373 def resource_string( 

1374 self, package_or_requirement: _PkgReqType, resource_name: str 

1375 ) -> bytes: 

1376 """Return specified resource as :obj:`bytes`""" 

1377 return get_provider(package_or_requirement).get_resource_string( 

1378 self, resource_name 

1379 ) 

1380 

1381 def resource_listdir(self, package_or_requirement: _PkgReqType, resource_name: str): 

1382 """List the contents of the named resource directory""" 

1383 return get_provider(package_or_requirement).resource_listdir(resource_name) 

1384 

1385 def extraction_error(self) -> NoReturn: 

1386 """Give an error message for problems extracting file(s)""" 

1387 

1388 old_exc = sys.exc_info()[1] 

1389 cache_path = self.extraction_path or get_default_cache() 

1390 

1391 tmpl = textwrap.dedent( 

1392 """ 

1393 Can't extract file(s) to egg cache 

1394 

1395 The following error occurred while trying to extract file(s) 

1396 to the Python egg cache: 

1397 

1398 {old_exc} 

1399 

1400 The Python egg cache directory is currently set to: 

1401 

1402 {cache_path} 

1403 

1404 Perhaps your account does not have write access to this directory? 

1405 You can change the cache directory by setting the PYTHON_EGG_CACHE 

1406 environment variable to point to an accessible directory. 

1407 """ 

1408 ).lstrip() 

1409 err = ExtractionError(tmpl.format(**locals())) 

1410 err.manager = self 

1411 err.cache_path = cache_path 

1412 err.original_error = old_exc 

1413 raise err 

1414 

1415 def get_cache_path(self, archive_name: str, names: Iterable[StrPath] = ()): 

1416 """Return absolute location in cache for `archive_name` and `names` 

1417 

1418 The parent directory of the resulting path will be created if it does 

1419 not already exist. `archive_name` should be the base filename of the 

1420 enclosing egg (which may not be the name of the enclosing zipfile!), 

1421 including its ".egg" extension. `names`, if provided, should be a 

1422 sequence of path name parts "under" the egg's extraction location. 

1423 

1424 This method should only be called by resource providers that need to 

1425 obtain an extraction location, and only for names they intend to 

1426 extract, as it tracks the generated names for possible cleanup later. 

1427 """ 

1428 extract_path = self.extraction_path or get_default_cache() 

1429 target_path = os.path.join(extract_path, archive_name + '-tmp', *names) 

1430 try: 

1431 _bypass_ensure_directory(target_path) 

1432 except Exception: 

1433 self.extraction_error() 

1434 

1435 self._warn_unsafe_extraction_path(extract_path) 

1436 

1437 self.cached_files[target_path] = True 

1438 return target_path 

1439 

1440 @staticmethod 

1441 def _warn_unsafe_extraction_path(path): 

1442 """ 

1443 If the default extraction path is overridden and set to an insecure 

1444 location, such as /tmp, it opens up an opportunity for an attacker to 

1445 replace an extracted file with an unauthorized payload. Warn the user 

1446 if a known insecure location is used. 

1447 

1448 See Distribute #375 for more details. 

1449 """ 

1450 if os.name == 'nt' and not path.startswith(os.environ['windir']): 

1451 # On Windows, permissions are generally restrictive by default 

1452 # and temp directories are not writable by other users, so 

1453 # bypass the warning. 

1454 return 

1455 mode = os.stat(path).st_mode 

1456 if mode & stat.S_IWOTH or mode & stat.S_IWGRP: 

1457 msg = ( 

1458 "Extraction path is writable by group/others " 

1459 "and vulnerable to attack when " 

1460 "used with get_resource_filename ({path}). " 

1461 "Consider a more secure " 

1462 "location (set with .set_extraction_path or the " 

1463 "PYTHON_EGG_CACHE environment variable)." 

1464 ).format(**locals()) 

1465 warnings.warn(msg, UserWarning) 

1466 

1467 def postprocess(self, tempname: StrOrBytesPath, filename: StrOrBytesPath): 

1468 """Perform any platform-specific postprocessing of `tempname` 

1469 

1470 This is where Mac header rewrites should be done; other platforms don't 

1471 have anything special they should do. 

1472 

1473 Resource providers should call this method ONLY after successfully 

1474 extracting a compressed resource. They must NOT call it on resources 

1475 that are already in the filesystem. 

1476 

1477 `tempname` is the current (temporary) name of the file, and `filename` 

1478 is the name it will be renamed to by the caller after this routine 

1479 returns. 

1480 """ 

1481 

1482 if os.name == 'posix': 

1483 # Make the resource executable 

1484 mode = ((os.stat(tempname).st_mode) | 0o555) & 0o7777 

1485 os.chmod(tempname, mode) 

1486 

1487 def set_extraction_path(self, path: str): 

1488 """Set the base path where resources will be extracted to, if needed. 

1489 

1490 If you do not call this routine before any extractions take place, the 

1491 path defaults to the return value of ``get_default_cache()``. (Which 

1492 is based on the ``PYTHON_EGG_CACHE`` environment variable, with various 

1493 platform-specific fallbacks. See that routine's documentation for more 

1494 details.) 

1495 

1496 Resources are extracted to subdirectories of this path based upon 

1497 information given by the ``IResourceProvider``. You may set this to a 

1498 temporary directory, but then you must call ``cleanup_resources()`` to 

1499 delete the extracted files when done. There is no guarantee that 

1500 ``cleanup_resources()`` will be able to remove all extracted files. 

1501 

1502 (Note: you may not change the extraction path for a given resource 

1503 manager once resources have been extracted, unless you first call 

1504 ``cleanup_resources()``.) 

1505 """ 

1506 if self.cached_files: 

1507 raise ValueError("Can't change extraction path, files already extracted") 

1508 

1509 self.extraction_path = path 

1510 

1511 def cleanup_resources(self, force: bool = False) -> list[str]: 

1512 """ 

1513 Delete all extracted resource files and directories, returning a list 

1514 of the file and directory names that could not be successfully removed. 

1515 This function does not have any concurrency protection, so it should 

1516 generally only be called when the extraction path is a temporary 

1517 directory exclusive to a single process. This method is not 

1518 automatically called; you must call it explicitly or register it as an 

1519 ``atexit`` function if you wish to ensure cleanup of a temporary 

1520 directory used for extractions. 

1521 """ 

1522 # XXX 

1523 return [] 

1524 

1525 

1526def get_default_cache() -> str: 

1527 """ 

1528 Return the ``PYTHON_EGG_CACHE`` environment variable 

1529 or a platform-relevant user cache dir for an app 

1530 named "Python-Eggs". 

1531 """ 

1532 return os.environ.get('PYTHON_EGG_CACHE') or _user_cache_dir(appname='Python-Eggs') 

1533 

1534 

1535def safe_name(name: str): 

1536 """Convert an arbitrary string to a standard distribution name 

1537 

1538 Any runs of non-alphanumeric/. characters are replaced with a single '-'. 

1539 """ 

1540 return re.sub('[^A-Za-z0-9.]+', '-', name) 

1541 

1542 

1543def safe_version(version: str): 

1544 """ 

1545 Convert an arbitrary string to a standard version string 

1546 """ 

1547 try: 

1548 # normalize the version 

1549 return str(_packaging_version.Version(version)) 

1550 except _packaging_version.InvalidVersion: 

1551 version = version.replace(' ', '.') 

1552 return re.sub('[^A-Za-z0-9.]+', '-', version) 

1553 

1554 

1555def _forgiving_version(version): 

1556 """Fallback when ``safe_version`` is not safe enough 

1557 >>> parse_version(_forgiving_version('0.23ubuntu1')) 

1558 <Version('0.23.dev0+sanitized.ubuntu1')> 

1559 >>> parse_version(_forgiving_version('0.23-')) 

1560 <Version('0.23.dev0+sanitized')> 

1561 >>> parse_version(_forgiving_version('0.-_')) 

1562 <Version('0.dev0+sanitized')> 

1563 >>> parse_version(_forgiving_version('42.+?1')) 

1564 <Version('42.dev0+sanitized.1')> 

1565 >>> parse_version(_forgiving_version('hello world')) 

1566 <Version('0.dev0+sanitized.hello.world')> 

1567 """ 

1568 version = version.replace(' ', '.') 

1569 match = _PEP440_FALLBACK.search(version) 

1570 if match: 

1571 safe = match["safe"] 

1572 rest = version[len(safe) :] 

1573 else: 

1574 safe = "0" 

1575 rest = version 

1576 local = f"sanitized.{_safe_segment(rest)}".strip(".") 

1577 return f"{safe}.dev0+{local}" 

1578 

1579 

1580def _safe_segment(segment): 

1581 """Convert an arbitrary string into a safe segment""" 

1582 segment = re.sub('[^A-Za-z0-9.]+', '-', segment) 

1583 segment = re.sub('-[^A-Za-z0-9]+', '-', segment) 

1584 return re.sub(r'\.[^A-Za-z0-9]+', '.', segment).strip(".-") 

1585 

1586 

1587def safe_extra(extra: str): 

1588 """Convert an arbitrary string to a standard 'extra' name 

1589 

1590 Any runs of non-alphanumeric characters are replaced with a single '_', 

1591 and the result is always lowercased. 

1592 """ 

1593 return re.sub('[^A-Za-z0-9.-]+', '_', extra).lower() 

1594 

1595 

1596def to_filename(name: str): 

1597 """Convert a project or version name to its filename-escaped form 

1598 

1599 Any '-' characters are currently replaced with '_'. 

1600 """ 

1601 return name.replace('-', '_') 

1602 

1603 

1604def invalid_marker(text: str): 

1605 """ 

1606 Validate text as a PEP 508 environment marker; return an exception 

1607 if invalid or False otherwise. 

1608 """ 

1609 try: 

1610 evaluate_marker(text) 

1611 except SyntaxError as e: 

1612 e.filename = None 

1613 e.lineno = None 

1614 return e 

1615 return False 

1616 

1617 

1618def evaluate_marker(text: str, extra: str | None = None) -> bool: 

1619 """ 

1620 Evaluate a PEP 508 environment marker. 

1621 Return a boolean indicating the marker result in this environment. 

1622 Raise SyntaxError if marker is invalid. 

1623 

1624 This implementation uses the 'pyparsing' module. 

1625 """ 

1626 try: 

1627 marker = _packaging_markers.Marker(text) 

1628 return marker.evaluate() 

1629 except _packaging_markers.InvalidMarker as e: 

1630 raise SyntaxError(e) from e 

1631 

1632 

1633class NullProvider: 

1634 """Try to implement resources and metadata for arbitrary PEP 302 loaders""" 

1635 

1636 egg_name: str | None = None 

1637 egg_info: str | None = None 

1638 loader: _LoaderProtocol | None = None 

1639 

1640 def __init__(self, module: _ModuleLike): 

1641 self.loader = getattr(module, '__loader__', None) 

1642 self.module_path = os.path.dirname(getattr(module, '__file__', '')) 

1643 

1644 def get_resource_filename(self, manager: ResourceManager, resource_name: str): 

1645 return self._fn(self.module_path, resource_name) 

1646 

1647 def get_resource_stream(self, manager: ResourceManager, resource_name: str): 

1648 return io.BytesIO(self.get_resource_string(manager, resource_name)) 

1649 

1650 def get_resource_string( 

1651 self, manager: ResourceManager, resource_name: str 

1652 ) -> bytes: 

1653 return self._get(self._fn(self.module_path, resource_name)) 

1654 

1655 def has_resource(self, resource_name: str): 

1656 return self._has(self._fn(self.module_path, resource_name)) 

1657 

1658 def _get_metadata_path(self, name): 

1659 return self._fn(self.egg_info, name) 

1660 

1661 def has_metadata(self, name: str) -> bool: 

1662 if not self.egg_info: 

1663 return False 

1664 

1665 path = self._get_metadata_path(name) 

1666 return self._has(path) 

1667 

1668 def get_metadata(self, name: str): 

1669 if not self.egg_info: 

1670 return "" 

1671 path = self._get_metadata_path(name) 

1672 value = self._get(path) 

1673 try: 

1674 return value.decode('utf-8') 

1675 except UnicodeDecodeError as exc: 

1676 # Include the path in the error message to simplify 

1677 # troubleshooting, and without changing the exception type. 

1678 exc.reason += ' in {} file at path: {}'.format(name, path) 

1679 raise 

1680 

1681 def get_metadata_lines(self, name: str) -> Iterator[str]: 

1682 return yield_lines(self.get_metadata(name)) 

1683 

1684 def resource_isdir(self, resource_name: str): 

1685 return self._isdir(self._fn(self.module_path, resource_name)) 

1686 

1687 def metadata_isdir(self, name: str) -> bool: 

1688 return bool(self.egg_info and self._isdir(self._fn(self.egg_info, name))) 

1689 

1690 def resource_listdir(self, resource_name: str): 

1691 return self._listdir(self._fn(self.module_path, resource_name)) 

1692 

1693 def metadata_listdir(self, name: str) -> list[str]: 

1694 if self.egg_info: 

1695 return self._listdir(self._fn(self.egg_info, name)) 

1696 return [] 

1697 

1698 def run_script(self, script_name: str, namespace: dict[str, Any]): 

1699 script = 'scripts/' + script_name 

1700 if not self.has_metadata(script): 

1701 raise ResolutionError( 

1702 "Script {script!r} not found in metadata at {self.egg_info!r}".format( 

1703 **locals() 

1704 ), 

1705 ) 

1706 

1707 script_text = self.get_metadata(script).replace('\r\n', '\n') 

1708 script_text = script_text.replace('\r', '\n') 

1709 script_filename = self._fn(self.egg_info, script) 

1710 namespace['__file__'] = script_filename 

1711 if os.path.exists(script_filename): 

1712 source = _read_utf8_with_fallback(script_filename) 

1713 code = compile(source, script_filename, 'exec') 

1714 exec(code, namespace, namespace) 

1715 else: 

1716 from linecache import cache 

1717 

1718 cache[script_filename] = ( 

1719 len(script_text), 

1720 0, 

1721 script_text.split('\n'), 

1722 script_filename, 

1723 ) 

1724 script_code = compile(script_text, script_filename, 'exec') 

1725 exec(script_code, namespace, namespace) 

1726 

1727 def _has(self, path) -> bool: 

1728 raise NotImplementedError( 

1729 "Can't perform this operation for unregistered loader type" 

1730 ) 

1731 

1732 def _isdir(self, path) -> bool: 

1733 raise NotImplementedError( 

1734 "Can't perform this operation for unregistered loader type" 

1735 ) 

1736 

1737 def _listdir(self, path) -> list[str]: 

1738 raise NotImplementedError( 

1739 "Can't perform this operation for unregistered loader type" 

1740 ) 

1741 

1742 def _fn(self, base: str | None, resource_name: str): 

1743 if base is None: 

1744 raise TypeError( 

1745 "`base` parameter in `_fn` is `None`. Either override this method or check the parameter first." 

1746 ) 

1747 self._validate_resource_path(resource_name) 

1748 if resource_name: 

1749 return os.path.join(base, *resource_name.split('/')) 

1750 return base 

1751 

1752 @staticmethod 

1753 def _validate_resource_path(path): 

1754 """ 

1755 Validate the resource paths according to the docs. 

1756 https://setuptools.pypa.io/en/latest/pkg_resources.html#basic-resource-access 

1757 

1758 >>> warned = getfixture('recwarn') 

1759 >>> warnings.simplefilter('always') 

1760 >>> vrp = NullProvider._validate_resource_path 

1761 >>> vrp('foo/bar.txt') 

1762 >>> bool(warned) 

1763 False 

1764 >>> vrp('../foo/bar.txt') 

1765 >>> bool(warned) 

1766 True 

1767 >>> warned.clear() 

1768 >>> vrp('/foo/bar.txt') 

1769 >>> bool(warned) 

1770 True 

1771 >>> vrp('foo/../../bar.txt') 

1772 >>> bool(warned) 

1773 True 

1774 >>> warned.clear() 

1775 >>> vrp('foo/f../bar.txt') 

1776 >>> bool(warned) 

1777 False 

1778 

1779 Windows path separators are straight-up disallowed. 

1780 >>> vrp(r'\\foo/bar.txt') 

1781 Traceback (most recent call last): 

1782 ... 

1783 ValueError: Use of .. or absolute path in a resource path \ 

1784is not allowed. 

1785 

1786 >>> vrp(r'C:\\foo/bar.txt') 

1787 Traceback (most recent call last): 

1788 ... 

1789 ValueError: Use of .. or absolute path in a resource path \ 

1790is not allowed. 

1791 

1792 Blank values are allowed 

1793 

1794 >>> vrp('') 

1795 >>> bool(warned) 

1796 False 

1797 

1798 Non-string values are not. 

1799 

1800 >>> vrp(None) 

1801 Traceback (most recent call last): 

1802 ... 

1803 AttributeError: ... 

1804 """ 

1805 invalid = ( 

1806 os.path.pardir in path.split(posixpath.sep) 

1807 or posixpath.isabs(path) 

1808 or ntpath.isabs(path) 

1809 or path.startswith("\\") 

1810 ) 

1811 if not invalid: 

1812 return 

1813 

1814 msg = "Use of .. or absolute path in a resource path is not allowed." 

1815 

1816 # Aggressively disallow Windows absolute paths 

1817 if (path.startswith("\\") or ntpath.isabs(path)) and not posixpath.isabs(path): 

1818 raise ValueError(msg) 

1819 

1820 # for compatibility, warn; in future 

1821 # raise ValueError(msg) 

1822 issue_warning( 

1823 msg[:-1] + " and will raise exceptions in a future release.", 

1824 DeprecationWarning, 

1825 ) 

1826 

1827 def _get(self, path) -> bytes: 

1828 if hasattr(self.loader, 'get_data') and self.loader: 

1829 # Already checked get_data exists 

1830 return self.loader.get_data(path) # type: ignore[attr-defined] 

1831 raise NotImplementedError( 

1832 "Can't perform this operation for loaders without 'get_data()'" 

1833 ) 

1834 

1835 

1836register_loader_type(object, NullProvider) 

1837 

1838 

1839def _parents(path): 

1840 """ 

1841 yield all parents of path including path 

1842 """ 

1843 last = None 

1844 while path != last: 

1845 yield path 

1846 last = path 

1847 path, _ = os.path.split(path) 

1848 

1849 

1850class EggProvider(NullProvider): 

1851 """Provider based on a virtual filesystem""" 

1852 

1853 def __init__(self, module: _ModuleLike): 

1854 super().__init__(module) 

1855 self._setup_prefix() 

1856 

1857 def _setup_prefix(self): 

1858 # Assume that metadata may be nested inside a "basket" 

1859 # of multiple eggs and use module_path instead of .archive. 

1860 eggs = filter(_is_egg_path, _parents(self.module_path)) 

1861 egg = next(eggs, None) 

1862 egg and self._set_egg(egg) 

1863 

1864 def _set_egg(self, path: str): 

1865 self.egg_name = os.path.basename(path) 

1866 self.egg_info = os.path.join(path, 'EGG-INFO') 

1867 self.egg_root = path 

1868 

1869 

1870class DefaultProvider(EggProvider): 

1871 """Provides access to package resources in the filesystem""" 

1872 

1873 def _has(self, path) -> bool: 

1874 return os.path.exists(path) 

1875 

1876 def _isdir(self, path) -> bool: 

1877 return os.path.isdir(path) 

1878 

1879 def _listdir(self, path): 

1880 return os.listdir(path) 

1881 

1882 def get_resource_stream(self, manager: object, resource_name: str): 

1883 return open(self._fn(self.module_path, resource_name), 'rb') 

1884 

1885 def _get(self, path) -> bytes: 

1886 with open(path, 'rb') as stream: 

1887 return stream.read() 

1888 

1889 @classmethod 

1890 def _register(cls): 

1891 loader_names = ( 

1892 'SourceFileLoader', 

1893 'SourcelessFileLoader', 

1894 ) 

1895 for name in loader_names: 

1896 loader_cls = getattr(importlib.machinery, name, type(None)) 

1897 register_loader_type(loader_cls, cls) 

1898 

1899 

1900DefaultProvider._register() 

1901 

1902 

1903class EmptyProvider(NullProvider): 

1904 """Provider that returns nothing for all requests""" 

1905 

1906 # A special case, we don't want all Providers inheriting from NullProvider to have a potentially None module_path 

1907 module_path: str | None = None # type: ignore[assignment] 

1908 

1909 _isdir = _has = lambda self, path: False 

1910 

1911 def _get(self, path) -> bytes: 

1912 return b'' 

1913 

1914 def _listdir(self, path): 

1915 return [] 

1916 

1917 def __init__(self): 

1918 pass 

1919 

1920 

1921empty_provider = EmptyProvider() 

1922 

1923 

1924class ZipManifests(Dict[str, "MemoizedZipManifests.manifest_mod"]): 

1925 """ 

1926 zip manifest builder 

1927 """ 

1928 

1929 # `path` could be `StrPath | IO[bytes]` but that violates the LSP for `MemoizedZipManifests.load` 

1930 @classmethod 

1931 def build(cls, path: str): 

1932 """ 

1933 Build a dictionary similar to the zipimport directory 

1934 caches, except instead of tuples, store ZipInfo objects. 

1935 

1936 Use a platform-specific path separator (os.sep) for the path keys 

1937 for compatibility with pypy on Windows. 

1938 """ 

1939 with zipfile.ZipFile(path) as zfile: 

1940 items = ( 

1941 ( 

1942 name.replace('/', os.sep), 

1943 zfile.getinfo(name), 

1944 ) 

1945 for name in zfile.namelist() 

1946 ) 

1947 return dict(items) 

1948 

1949 load = build 

1950 

1951 

1952class MemoizedZipManifests(ZipManifests): 

1953 """ 

1954 Memoized zipfile manifests. 

1955 """ 

1956 

1957 class manifest_mod(NamedTuple): 

1958 manifest: dict[str, zipfile.ZipInfo] 

1959 mtime: float 

1960 

1961 def load(self, path: str) -> dict[str, zipfile.ZipInfo]: # type: ignore[override] # ZipManifests.load is a classmethod 

1962 """ 

1963 Load a manifest at path or return a suitable manifest already loaded. 

1964 """ 

1965 path = os.path.normpath(path) 

1966 mtime = os.stat(path).st_mtime 

1967 

1968 if path not in self or self[path].mtime != mtime: 

1969 manifest = self.build(path) 

1970 self[path] = self.manifest_mod(manifest, mtime) 

1971 

1972 return self[path].manifest 

1973 

1974 

1975class ZipProvider(EggProvider): 

1976 """Resource support for zips and eggs""" 

1977 

1978 eagers: list[str] | None = None 

1979 _zip_manifests = MemoizedZipManifests() 

1980 # ZipProvider's loader should always be a zipimporter or equivalent 

1981 loader: zipimport.zipimporter 

1982 

1983 def __init__(self, module: _ZipLoaderModule): 

1984 super().__init__(module) 

1985 self.zip_pre = self.loader.archive + os.sep 

1986 

1987 def _zipinfo_name(self, fspath): 

1988 # Convert a virtual filename (full path to file) into a zipfile subpath 

1989 # usable with the zipimport directory cache for our target archive 

1990 fspath = fspath.rstrip(os.sep) 

1991 if fspath == self.loader.archive: 

1992 return '' 

1993 if fspath.startswith(self.zip_pre): 

1994 return fspath[len(self.zip_pre) :] 

1995 raise AssertionError("%s is not a subpath of %s" % (fspath, self.zip_pre)) 

1996 

1997 def _parts(self, zip_path): 

1998 # Convert a zipfile subpath into an egg-relative path part list. 

1999 # pseudo-fs path 

2000 fspath = self.zip_pre + zip_path 

2001 if fspath.startswith(self.egg_root + os.sep): 

2002 return fspath[len(self.egg_root) + 1 :].split(os.sep) 

2003 raise AssertionError("%s is not a subpath of %s" % (fspath, self.egg_root)) 

2004 

2005 @property 

2006 def zipinfo(self): 

2007 return self._zip_manifests.load(self.loader.archive) 

2008 

2009 def get_resource_filename(self, manager: ResourceManager, resource_name: str): 

2010 if not self.egg_name: 

2011 raise NotImplementedError( 

2012 "resource_filename() only supported for .egg, not .zip" 

2013 ) 

2014 # no need to lock for extraction, since we use temp names 

2015 zip_path = self._resource_to_zip(resource_name) 

2016 eagers = self._get_eager_resources() 

2017 if '/'.join(self._parts(zip_path)) in eagers: 

2018 for name in eagers: 

2019 self._extract_resource(manager, self._eager_to_zip(name)) 

2020 return self._extract_resource(manager, zip_path) 

2021 

2022 @staticmethod 

2023 def _get_date_and_size(zip_stat): 

2024 size = zip_stat.file_size 

2025 # ymdhms+wday, yday, dst 

2026 date_time = zip_stat.date_time + (0, 0, -1) 

2027 # 1980 offset already done 

2028 timestamp = time.mktime(date_time) 

2029 return timestamp, size 

2030 

2031 # FIXME: 'ZipProvider._extract_resource' is too complex (12) 

2032 def _extract_resource(self, manager: ResourceManager, zip_path) -> str: # noqa: C901 

2033 if zip_path in self._index(): 

2034 for name in self._index()[zip_path]: 

2035 last = self._extract_resource(manager, os.path.join(zip_path, name)) 

2036 # return the extracted directory name 

2037 return os.path.dirname(last) 

2038 

2039 timestamp, size = self._get_date_and_size(self.zipinfo[zip_path]) 

2040 

2041 if not WRITE_SUPPORT: 

2042 raise OSError( 

2043 '"os.rename" and "os.unlink" are not supported on this platform' 

2044 ) 

2045 try: 

2046 if not self.egg_name: 

2047 raise OSError( 

2048 '"egg_name" is empty. This likely means no egg could be found from the "module_path".' 

2049 ) 

2050 real_path = manager.get_cache_path(self.egg_name, self._parts(zip_path)) 

2051 

2052 if self._is_current(real_path, zip_path): 

2053 return real_path 

2054 

2055 outf, tmpnam = _mkstemp( 

2056 ".$extract", 

2057 dir=os.path.dirname(real_path), 

2058 ) 

2059 os.write(outf, self.loader.get_data(zip_path)) 

2060 os.close(outf) 

2061 utime(tmpnam, (timestamp, timestamp)) 

2062 manager.postprocess(tmpnam, real_path) 

2063 

2064 try: 

2065 rename(tmpnam, real_path) 

2066 

2067 except OSError: 

2068 if os.path.isfile(real_path): 

2069 if self._is_current(real_path, zip_path): 

2070 # the file became current since it was checked above, 

2071 # so proceed. 

2072 return real_path 

2073 # Windows, del old file and retry 

2074 elif os.name == 'nt': 

2075 unlink(real_path) 

2076 rename(tmpnam, real_path) 

2077 return real_path 

2078 raise 

2079 

2080 except OSError: 

2081 # report a user-friendly error 

2082 manager.extraction_error() 

2083 

2084 return real_path 

2085 

2086 def _is_current(self, file_path, zip_path): 

2087 """ 

2088 Return True if the file_path is current for this zip_path 

2089 """ 

2090 timestamp, size = self._get_date_and_size(self.zipinfo[zip_path]) 

2091 if not os.path.isfile(file_path): 

2092 return False 

2093 stat = os.stat(file_path) 

2094 if stat.st_size != size or stat.st_mtime != timestamp: 

2095 return False 

2096 # check that the contents match 

2097 zip_contents = self.loader.get_data(zip_path) 

2098 with open(file_path, 'rb') as f: 

2099 file_contents = f.read() 

2100 return zip_contents == file_contents 

2101 

2102 def _get_eager_resources(self): 

2103 if self.eagers is None: 

2104 eagers = [] 

2105 for name in ('native_libs.txt', 'eager_resources.txt'): 

2106 if self.has_metadata(name): 

2107 eagers.extend(self.get_metadata_lines(name)) 

2108 self.eagers = eagers 

2109 return self.eagers 

2110 

2111 def _index(self): 

2112 try: 

2113 return self._dirindex 

2114 except AttributeError: 

2115 ind = {} 

2116 for path in self.zipinfo: 

2117 parts = path.split(os.sep) 

2118 while parts: 

2119 parent = os.sep.join(parts[:-1]) 

2120 if parent in ind: 

2121 ind[parent].append(parts[-1]) 

2122 break 

2123 else: 

2124 ind[parent] = [parts.pop()] 

2125 self._dirindex = ind 

2126 return ind 

2127 

2128 def _has(self, fspath) -> bool: 

2129 zip_path = self._zipinfo_name(fspath) 

2130 return zip_path in self.zipinfo or zip_path in self._index() 

2131 

2132 def _isdir(self, fspath) -> bool: 

2133 return self._zipinfo_name(fspath) in self._index() 

2134 

2135 def _listdir(self, fspath): 

2136 return list(self._index().get(self._zipinfo_name(fspath), ())) 

2137 

2138 def _eager_to_zip(self, resource_name: str): 

2139 return self._zipinfo_name(self._fn(self.egg_root, resource_name)) 

2140 

2141 def _resource_to_zip(self, resource_name: str): 

2142 return self._zipinfo_name(self._fn(self.module_path, resource_name)) 

2143 

2144 

2145register_loader_type(zipimport.zipimporter, ZipProvider) 

2146 

2147 

2148class FileMetadata(EmptyProvider): 

2149 """Metadata handler for standalone PKG-INFO files 

2150 

2151 Usage:: 

2152 

2153 metadata = FileMetadata("/path/to/PKG-INFO") 

2154 

2155 This provider rejects all data and metadata requests except for PKG-INFO, 

2156 which is treated as existing, and will be the contents of the file at 

2157 the provided location. 

2158 """ 

2159 

2160 def __init__(self, path: StrPath): 

2161 self.path = path 

2162 

2163 def _get_metadata_path(self, name): 

2164 return self.path 

2165 

2166 def has_metadata(self, name: str) -> bool: 

2167 return name == 'PKG-INFO' and os.path.isfile(self.path) 

2168 

2169 def get_metadata(self, name: str): 

2170 if name != 'PKG-INFO': 

2171 raise KeyError("No metadata except PKG-INFO is available") 

2172 

2173 with open(self.path, encoding='utf-8', errors="replace") as f: 

2174 metadata = f.read() 

2175 self._warn_on_replacement(metadata) 

2176 return metadata 

2177 

2178 def _warn_on_replacement(self, metadata): 

2179 replacement_char = '�' 

2180 if replacement_char in metadata: 

2181 tmpl = "{self.path} could not be properly decoded in UTF-8" 

2182 msg = tmpl.format(**locals()) 

2183 warnings.warn(msg) 

2184 

2185 def get_metadata_lines(self, name: str) -> Iterator[str]: 

2186 return yield_lines(self.get_metadata(name)) 

2187 

2188 

2189class PathMetadata(DefaultProvider): 

2190 """Metadata provider for egg directories 

2191 

2192 Usage:: 

2193 

2194 # Development eggs: 

2195 

2196 egg_info = "/path/to/PackageName.egg-info" 

2197 base_dir = os.path.dirname(egg_info) 

2198 metadata = PathMetadata(base_dir, egg_info) 

2199 dist_name = os.path.splitext(os.path.basename(egg_info))[0] 

2200 dist = Distribution(basedir, project_name=dist_name, metadata=metadata) 

2201 

2202 # Unpacked egg directories: 

2203 

2204 egg_path = "/path/to/PackageName-ver-pyver-etc.egg" 

2205 metadata = PathMetadata(egg_path, os.path.join(egg_path,'EGG-INFO')) 

2206 dist = Distribution.from_filename(egg_path, metadata=metadata) 

2207 """ 

2208 

2209 def __init__(self, path: str, egg_info: str): 

2210 self.module_path = path 

2211 self.egg_info = egg_info 

2212 

2213 

2214class EggMetadata(ZipProvider): 

2215 """Metadata provider for .egg files""" 

2216 

2217 def __init__(self, importer: zipimport.zipimporter): 

2218 """Create a metadata provider from a zipimporter""" 

2219 

2220 self.zip_pre = importer.archive + os.sep 

2221 self.loader = importer 

2222 if importer.prefix: 

2223 self.module_path = os.path.join(importer.archive, importer.prefix) 

2224 else: 

2225 self.module_path = importer.archive 

2226 self._setup_prefix() 

2227 

2228 

2229_distribution_finders: dict[type, _DistFinderType[Any]] = _declare_state( 

2230 'dict', '_distribution_finders', {} 

2231) 

2232 

2233 

2234def register_finder(importer_type: type[_T], distribution_finder: _DistFinderType[_T]): 

2235 """Register `distribution_finder` to find distributions in sys.path items 

2236 

2237 `importer_type` is the type or class of a PEP 302 "Importer" (sys.path item 

2238 handler), and `distribution_finder` is a callable that, passed a path 

2239 item and the importer instance, yields ``Distribution`` instances found on 

2240 that path item. See ``pkg_resources.find_on_path`` for an example.""" 

2241 _distribution_finders[importer_type] = distribution_finder 

2242 

2243 

2244def find_distributions(path_item: str, only: bool = False): 

2245 """Yield distributions accessible via `path_item`""" 

2246 importer = get_importer(path_item) 

2247 finder = _find_adapter(_distribution_finders, importer) 

2248 return finder(importer, path_item, only) 

2249 

2250 

2251def find_eggs_in_zip( 

2252 importer: zipimport.zipimporter, path_item: str, only: bool = False 

2253) -> Iterator[Distribution]: 

2254 """ 

2255 Find eggs in zip files; possibly multiple nested eggs. 

2256 """ 

2257 if importer.archive.endswith('.whl'): 

2258 # wheels are not supported with this finder 

2259 # they don't have PKG-INFO metadata, and won't ever contain eggs 

2260 return 

2261 metadata = EggMetadata(importer) 

2262 if metadata.has_metadata('PKG-INFO'): 

2263 yield Distribution.from_filename(path_item, metadata=metadata) 

2264 if only: 

2265 # don't yield nested distros 

2266 return 

2267 for subitem in metadata.resource_listdir(''): 

2268 if _is_egg_path(subitem): 

2269 subpath = os.path.join(path_item, subitem) 

2270 dists = find_eggs_in_zip(zipimport.zipimporter(subpath), subpath) 

2271 yield from dists 

2272 elif subitem.lower().endswith(('.dist-info', '.egg-info')): 

2273 subpath = os.path.join(path_item, subitem) 

2274 submeta = EggMetadata(zipimport.zipimporter(subpath)) 

2275 submeta.egg_info = subpath 

2276 yield Distribution.from_location(path_item, subitem, submeta) 

2277 

2278 

2279register_finder(zipimport.zipimporter, find_eggs_in_zip) 

2280 

2281 

2282def find_nothing( 

2283 importer: object | None, path_item: str | None, only: bool | None = False 

2284): 

2285 return () 

2286 

2287 

2288register_finder(object, find_nothing) 

2289 

2290 

2291def find_on_path(importer: object | None, path_item, only=False): 

2292 """Yield distributions accessible on a sys.path directory""" 

2293 path_item = _normalize_cached(path_item) 

2294 

2295 if _is_unpacked_egg(path_item): 

2296 yield Distribution.from_filename( 

2297 path_item, 

2298 metadata=PathMetadata(path_item, os.path.join(path_item, 'EGG-INFO')), 

2299 ) 

2300 return 

2301 

2302 entries = (os.path.join(path_item, child) for child in safe_listdir(path_item)) 

2303 

2304 # scan for .egg and .egg-info in directory 

2305 for entry in sorted(entries): 

2306 fullpath = os.path.join(path_item, entry) 

2307 factory = dist_factory(path_item, entry, only) 

2308 yield from factory(fullpath) 

2309 

2310 

2311def dist_factory(path_item, entry, only): 

2312 """Return a dist_factory for the given entry.""" 

2313 lower = entry.lower() 

2314 is_egg_info = lower.endswith('.egg-info') 

2315 is_dist_info = lower.endswith('.dist-info') and os.path.isdir( 

2316 os.path.join(path_item, entry) 

2317 ) 

2318 is_meta = is_egg_info or is_dist_info 

2319 return ( 

2320 distributions_from_metadata 

2321 if is_meta 

2322 else find_distributions 

2323 if not only and _is_egg_path(entry) 

2324 else resolve_egg_link 

2325 if not only and lower.endswith('.egg-link') 

2326 else NoDists() 

2327 ) 

2328 

2329 

2330class NoDists: 

2331 """ 

2332 >>> bool(NoDists()) 

2333 False 

2334 

2335 >>> list(NoDists()('anything')) 

2336 [] 

2337 """ 

2338 

2339 def __bool__(self): 

2340 return False 

2341 

2342 def __call__(self, fullpath): 

2343 return iter(()) 

2344 

2345 

2346def safe_listdir(path: StrOrBytesPath): 

2347 """ 

2348 Attempt to list contents of path, but suppress some exceptions. 

2349 """ 

2350 try: 

2351 return os.listdir(path) 

2352 except (PermissionError, NotADirectoryError): 

2353 pass 

2354 except OSError as e: 

2355 # Ignore the directory if does not exist, not a directory or 

2356 # permission denied 

2357 if e.errno not in (errno.ENOTDIR, errno.EACCES, errno.ENOENT): 

2358 raise 

2359 return () 

2360 

2361 

2362def distributions_from_metadata(path: str): 

2363 root = os.path.dirname(path) 

2364 if os.path.isdir(path): 

2365 if len(os.listdir(path)) == 0: 

2366 # empty metadata dir; skip 

2367 return 

2368 metadata: _MetadataType = PathMetadata(root, path) 

2369 else: 

2370 metadata = FileMetadata(path) 

2371 entry = os.path.basename(path) 

2372 yield Distribution.from_location( 

2373 root, 

2374 entry, 

2375 metadata, 

2376 precedence=DEVELOP_DIST, 

2377 ) 

2378 

2379 

2380def non_empty_lines(path): 

2381 """ 

2382 Yield non-empty lines from file at path 

2383 """ 

2384 for line in _read_utf8_with_fallback(path).splitlines(): 

2385 line = line.strip() 

2386 if line: 

2387 yield line 

2388 

2389 

2390def resolve_egg_link(path): 

2391 """ 

2392 Given a path to an .egg-link, resolve distributions 

2393 present in the referenced path. 

2394 """ 

2395 referenced_paths = non_empty_lines(path) 

2396 resolved_paths = ( 

2397 os.path.join(os.path.dirname(path), ref) for ref in referenced_paths 

2398 ) 

2399 dist_groups = map(find_distributions, resolved_paths) 

2400 return next(dist_groups, ()) 

2401 

2402 

2403if hasattr(pkgutil, 'ImpImporter'): 

2404 register_finder(pkgutil.ImpImporter, find_on_path) 

2405 

2406register_finder(importlib.machinery.FileFinder, find_on_path) 

2407 

2408_namespace_handlers: dict[type, _NSHandlerType[Any]] = _declare_state( 

2409 'dict', '_namespace_handlers', {} 

2410) 

2411_namespace_packages: dict[str | None, list[str]] = _declare_state( 

2412 'dict', '_namespace_packages', {} 

2413) 

2414 

2415 

2416def register_namespace_handler( 

2417 importer_type: type[_T], namespace_handler: _NSHandlerType[_T] 

2418): 

2419 """Register `namespace_handler` to declare namespace packages 

2420 

2421 `importer_type` is the type or class of a PEP 302 "Importer" (sys.path item 

2422 handler), and `namespace_handler` is a callable like this:: 

2423 

2424 def namespace_handler(importer, path_entry, moduleName, module): 

2425 # return a path_entry to use for child packages 

2426 

2427 Namespace handlers are only called if the importer object has already 

2428 agreed that it can handle the relevant path item, and they should only 

2429 return a subpath if the module __path__ does not already contain an 

2430 equivalent subpath. For an example namespace handler, see 

2431 ``pkg_resources.file_ns_handler``. 

2432 """ 

2433 _namespace_handlers[importer_type] = namespace_handler 

2434 

2435 

2436def _handle_ns(packageName, path_item): 

2437 """Ensure that named package includes a subpath of path_item (if needed)""" 

2438 

2439 importer = get_importer(path_item) 

2440 if importer is None: 

2441 return None 

2442 

2443 # use find_spec (PEP 451) and fall-back to find_module (PEP 302) 

2444 try: 

2445 spec = importer.find_spec(packageName) 

2446 except AttributeError: 

2447 # capture warnings due to #1111 

2448 with warnings.catch_warnings(): 

2449 warnings.simplefilter("ignore") 

2450 loader = importer.find_module(packageName) 

2451 else: 

2452 loader = spec.loader if spec else None 

2453 

2454 if loader is None: 

2455 return None 

2456 module = sys.modules.get(packageName) 

2457 if module is None: 

2458 module = sys.modules[packageName] = types.ModuleType(packageName) 

2459 module.__path__ = [] 

2460 _set_parent_ns(packageName) 

2461 elif not hasattr(module, '__path__'): 

2462 raise TypeError("Not a package:", packageName) 

2463 handler = _find_adapter(_namespace_handlers, importer) 

2464 subpath = handler(importer, path_item, packageName, module) 

2465 if subpath is not None: 

2466 path = module.__path__ 

2467 path.append(subpath) 

2468 importlib.import_module(packageName) 

2469 _rebuild_mod_path(path, packageName, module) 

2470 return subpath 

2471 

2472 

2473def _rebuild_mod_path(orig_path, package_name, module: types.ModuleType): 

2474 """ 

2475 Rebuild module.__path__ ensuring that all entries are ordered 

2476 corresponding to their sys.path order 

2477 """ 

2478 sys_path = [_normalize_cached(p) for p in sys.path] 

2479 

2480 def safe_sys_path_index(entry): 

2481 """ 

2482 Workaround for #520 and #513. 

2483 """ 

2484 try: 

2485 return sys_path.index(entry) 

2486 except ValueError: 

2487 return float('inf') 

2488 

2489 def position_in_sys_path(path): 

2490 """ 

2491 Return the ordinal of the path based on its position in sys.path 

2492 """ 

2493 path_parts = path.split(os.sep) 

2494 module_parts = package_name.count('.') + 1 

2495 parts = path_parts[:-module_parts] 

2496 return safe_sys_path_index(_normalize_cached(os.sep.join(parts))) 

2497 

2498 new_path = sorted(orig_path, key=position_in_sys_path) 

2499 new_path = [_normalize_cached(p) for p in new_path] 

2500 

2501 if isinstance(module.__path__, list): 

2502 module.__path__[:] = new_path 

2503 else: 

2504 module.__path__ = new_path 

2505 

2506 

2507def declare_namespace(packageName: str): 

2508 """Declare that package 'packageName' is a namespace package""" 

2509 

2510 msg = ( 

2511 f"Deprecated call to `pkg_resources.declare_namespace({packageName!r})`.\n" 

2512 "Implementing implicit namespace packages (as specified in PEP 420) " 

2513 "is preferred to `pkg_resources.declare_namespace`. " 

2514 "See https://setuptools.pypa.io/en/latest/references/" 

2515 "keywords.html#keyword-namespace-packages" 

2516 ) 

2517 warnings.warn(msg, DeprecationWarning, stacklevel=2) 

2518 

2519 _imp.acquire_lock() 

2520 try: 

2521 if packageName in _namespace_packages: 

2522 return 

2523 

2524 path: MutableSequence[str] = sys.path 

2525 parent, _, _ = packageName.rpartition('.') 

2526 

2527 if parent: 

2528 declare_namespace(parent) 

2529 if parent not in _namespace_packages: 

2530 __import__(parent) 

2531 try: 

2532 path = sys.modules[parent].__path__ 

2533 except AttributeError as e: 

2534 raise TypeError("Not a package:", parent) from e 

2535 

2536 # Track what packages are namespaces, so when new path items are added, 

2537 # they can be updated 

2538 _namespace_packages.setdefault(parent or None, []).append(packageName) 

2539 _namespace_packages.setdefault(packageName, []) 

2540 

2541 for path_item in path: 

2542 # Ensure all the parent's path items are reflected in the child, 

2543 # if they apply 

2544 _handle_ns(packageName, path_item) 

2545 

2546 finally: 

2547 _imp.release_lock() 

2548 

2549 

2550def fixup_namespace_packages(path_item: str, parent: str | None = None): 

2551 """Ensure that previously-declared namespace packages include path_item""" 

2552 _imp.acquire_lock() 

2553 try: 

2554 for package in _namespace_packages.get(parent, ()): 

2555 subpath = _handle_ns(package, path_item) 

2556 if subpath: 

2557 fixup_namespace_packages(subpath, package) 

2558 finally: 

2559 _imp.release_lock() 

2560 

2561 

2562def file_ns_handler( 

2563 importer: object, 

2564 path_item: StrPath, 

2565 packageName: str, 

2566 module: types.ModuleType, 

2567): 

2568 """Compute an ns-package subpath for a filesystem or zipfile importer""" 

2569 

2570 subpath = os.path.join(path_item, packageName.split('.')[-1]) 

2571 normalized = _normalize_cached(subpath) 

2572 for item in module.__path__: 

2573 if _normalize_cached(item) == normalized: 

2574 break 

2575 else: 

2576 # Only return the path if it's not already there 

2577 return subpath 

2578 

2579 

2580if hasattr(pkgutil, 'ImpImporter'): 

2581 register_namespace_handler(pkgutil.ImpImporter, file_ns_handler) 

2582 

2583register_namespace_handler(zipimport.zipimporter, file_ns_handler) 

2584register_namespace_handler(importlib.machinery.FileFinder, file_ns_handler) 

2585 

2586 

2587def null_ns_handler( 

2588 importer: object, 

2589 path_item: str | None, 

2590 packageName: str | None, 

2591 module: _ModuleLike | None, 

2592): 

2593 return None 

2594 

2595 

2596register_namespace_handler(object, null_ns_handler) 

2597 

2598 

2599@overload 

2600def normalize_path(filename: StrPath) -> str: ... 

2601@overload 

2602def normalize_path(filename: BytesPath) -> bytes: ... 

2603def normalize_path(filename: StrOrBytesPath): 

2604 """Normalize a file/dir name for comparison purposes""" 

2605 return os.path.normcase(os.path.realpath(os.path.normpath(_cygwin_patch(filename)))) 

2606 

2607 

2608def _cygwin_patch(filename: StrOrBytesPath): # pragma: nocover 

2609 """ 

2610 Contrary to POSIX 2008, on Cygwin, getcwd (3) contains 

2611 symlink components. Using 

2612 os.path.abspath() works around this limitation. A fix in os.getcwd() 

2613 would probably better, in Cygwin even more so, except 

2614 that this seems to be by design... 

2615 """ 

2616 return os.path.abspath(filename) if sys.platform == 'cygwin' else filename 

2617 

2618 

2619if TYPE_CHECKING: 

2620 # https://github.com/python/mypy/issues/16261 

2621 # https://github.com/python/typeshed/issues/6347 

2622 @overload 

2623 def _normalize_cached(filename: StrPath) -> str: ... 

2624 @overload 

2625 def _normalize_cached(filename: BytesPath) -> bytes: ... 

2626 def _normalize_cached(filename: StrOrBytesPath) -> str | bytes: ... 

2627else: 

2628 

2629 @functools.lru_cache(maxsize=None) 

2630 def _normalize_cached(filename): 

2631 return normalize_path(filename) 

2632 

2633 

2634def _is_egg_path(path): 

2635 """ 

2636 Determine if given path appears to be an egg. 

2637 """ 

2638 return _is_zip_egg(path) or _is_unpacked_egg(path) 

2639 

2640 

2641def _is_zip_egg(path): 

2642 return ( 

2643 path.lower().endswith('.egg') 

2644 and os.path.isfile(path) 

2645 and zipfile.is_zipfile(path) 

2646 ) 

2647 

2648 

2649def _is_unpacked_egg(path): 

2650 """ 

2651 Determine if given path appears to be an unpacked egg. 

2652 """ 

2653 return path.lower().endswith('.egg') and os.path.isfile( 

2654 os.path.join(path, 'EGG-INFO', 'PKG-INFO') 

2655 ) 

2656 

2657 

2658def _set_parent_ns(packageName): 

2659 parts = packageName.split('.') 

2660 name = parts.pop() 

2661 if parts: 

2662 parent = '.'.join(parts) 

2663 setattr(sys.modules[parent], name, sys.modules[packageName]) 

2664 

2665 

2666MODULE = re.compile(r"\w+(\.\w+)*$").match 

2667EGG_NAME = re.compile( 

2668 r""" 

2669 (?P<name>[^-]+) ( 

2670 -(?P<ver>[^-]+) ( 

2671 -py(?P<pyver>[^-]+) ( 

2672 -(?P<plat>.+) 

2673 )? 

2674 )? 

2675 )? 

2676 """, 

2677 re.VERBOSE | re.IGNORECASE, 

2678).match 

2679 

2680 

2681class EntryPoint: 

2682 """Object representing an advertised importable object""" 

2683 

2684 def __init__( 

2685 self, 

2686 name: str, 

2687 module_name: str, 

2688 attrs: Iterable[str] = (), 

2689 extras: Iterable[str] = (), 

2690 dist: Distribution | None = None, 

2691 ): 

2692 if not MODULE(module_name): 

2693 raise ValueError("Invalid module name", module_name) 

2694 self.name = name 

2695 self.module_name = module_name 

2696 self.attrs = tuple(attrs) 

2697 self.extras = tuple(extras) 

2698 self.dist = dist 

2699 

2700 def __str__(self): 

2701 s = "%s = %s" % (self.name, self.module_name) 

2702 if self.attrs: 

2703 s += ':' + '.'.join(self.attrs) 

2704 if self.extras: 

2705 s += ' [%s]' % ','.join(self.extras) 

2706 return s 

2707 

2708 def __repr__(self): 

2709 return "EntryPoint.parse(%r)" % str(self) 

2710 

2711 @overload 

2712 def load( 

2713 self, 

2714 require: Literal[True] = True, 

2715 env: Environment | None = None, 

2716 installer: _InstallerType | None = None, 

2717 ) -> _ResolvedEntryPoint: ... 

2718 @overload 

2719 def load( 

2720 self, 

2721 require: Literal[False], 

2722 *args: Any, 

2723 **kwargs: Any, 

2724 ) -> _ResolvedEntryPoint: ... 

2725 def load( 

2726 self, 

2727 require: bool = True, 

2728 *args: Environment | _InstallerType | None, 

2729 **kwargs: Environment | _InstallerType | None, 

2730 ) -> _ResolvedEntryPoint: 

2731 """ 

2732 Require packages for this EntryPoint, then resolve it. 

2733 """ 

2734 if not require or args or kwargs: 

2735 warnings.warn( 

2736 "Parameters to load are deprecated. Call .resolve and " 

2737 ".require separately.", 

2738 PkgResourcesDeprecationWarning, 

2739 stacklevel=2, 

2740 ) 

2741 if require: 

2742 # We could pass `env` and `installer` directly, 

2743 # but keeping `*args` and `**kwargs` for backwards compatibility 

2744 self.require(*args, **kwargs) # type: ignore 

2745 return self.resolve() 

2746 

2747 def resolve(self) -> _ResolvedEntryPoint: 

2748 """ 

2749 Resolve the entry point from its module and attrs. 

2750 """ 

2751 module = __import__(self.module_name, fromlist=['__name__'], level=0) 

2752 try: 

2753 return functools.reduce(getattr, self.attrs, module) 

2754 except AttributeError as exc: 

2755 raise ImportError(str(exc)) from exc 

2756 

2757 def require( 

2758 self, 

2759 env: Environment | None = None, 

2760 installer: _InstallerType | None = None, 

2761 ): 

2762 if not self.dist: 

2763 error_cls = UnknownExtra if self.extras else AttributeError 

2764 raise error_cls("Can't require() without a distribution", self) 

2765 

2766 # Get the requirements for this entry point with all its extras and 

2767 # then resolve them. We have to pass `extras` along when resolving so 

2768 # that the working set knows what extras we want. Otherwise, for 

2769 # dist-info distributions, the working set will assume that the 

2770 # requirements for that extra are purely optional and skip over them. 

2771 reqs = self.dist.requires(self.extras) 

2772 items = working_set.resolve(reqs, env, installer, extras=self.extras) 

2773 list(map(working_set.add, items)) 

2774 

2775 pattern = re.compile( 

2776 r'\s*' 

2777 r'(?P<name>.+?)\s*' 

2778 r'=\s*' 

2779 r'(?P<module>[\w.]+)\s*' 

2780 r'(:\s*(?P<attr>[\w.]+))?\s*' 

2781 r'(?P<extras>\[.*\])?\s*$' 

2782 ) 

2783 

2784 @classmethod 

2785 def parse(cls, src: str, dist: Distribution | None = None): 

2786 """Parse a single entry point from string `src` 

2787 

2788 Entry point syntax follows the form:: 

2789 

2790 name = some.module:some.attr [extra1, extra2] 

2791 

2792 The entry name and module name are required, but the ``:attrs`` and 

2793 ``[extras]`` parts are optional 

2794 """ 

2795 m = cls.pattern.match(src) 

2796 if not m: 

2797 msg = "EntryPoint must be in 'name=module:attrs [extras]' format" 

2798 raise ValueError(msg, src) 

2799 res = m.groupdict() 

2800 extras = cls._parse_extras(res['extras']) 

2801 attrs = res['attr'].split('.') if res['attr'] else () 

2802 return cls(res['name'], res['module'], attrs, extras, dist) 

2803 

2804 @classmethod 

2805 def _parse_extras(cls, extras_spec): 

2806 if not extras_spec: 

2807 return () 

2808 req = Requirement.parse('x' + extras_spec) 

2809 if req.specs: 

2810 raise ValueError 

2811 return req.extras 

2812 

2813 @classmethod 

2814 def parse_group( 

2815 cls, 

2816 group: str, 

2817 lines: _NestedStr, 

2818 dist: Distribution | None = None, 

2819 ): 

2820 """Parse an entry point group""" 

2821 if not MODULE(group): 

2822 raise ValueError("Invalid group name", group) 

2823 this: dict[str, Self] = {} 

2824 for line in yield_lines(lines): 

2825 ep = cls.parse(line, dist) 

2826 if ep.name in this: 

2827 raise ValueError("Duplicate entry point", group, ep.name) 

2828 this[ep.name] = ep 

2829 return this 

2830 

2831 @classmethod 

2832 def parse_map( 

2833 cls, 

2834 data: str | Iterable[str] | dict[str, str | Iterable[str]], 

2835 dist: Distribution | None = None, 

2836 ): 

2837 """Parse a map of entry point groups""" 

2838 _data: Iterable[tuple[str | None, str | Iterable[str]]] 

2839 if isinstance(data, dict): 

2840 _data = data.items() 

2841 else: 

2842 _data = split_sections(data) 

2843 maps: dict[str, dict[str, Self]] = {} 

2844 for group, lines in _data: 

2845 if group is None: 

2846 if not lines: 

2847 continue 

2848 raise ValueError("Entry points must be listed in groups") 

2849 group = group.strip() 

2850 if group in maps: 

2851 raise ValueError("Duplicate group name", group) 

2852 maps[group] = cls.parse_group(group, lines, dist) 

2853 return maps 

2854 

2855 

2856def _version_from_file(lines): 

2857 """ 

2858 Given an iterable of lines from a Metadata file, return 

2859 the value of the Version field, if present, or None otherwise. 

2860 """ 

2861 

2862 def is_version_line(line): 

2863 return line.lower().startswith('version:') 

2864 

2865 version_lines = filter(is_version_line, lines) 

2866 line = next(iter(version_lines), '') 

2867 _, _, value = line.partition(':') 

2868 return safe_version(value.strip()) or None 

2869 

2870 

2871class Distribution: 

2872 """Wrap an actual or potential sys.path entry w/metadata""" 

2873 

2874 PKG_INFO = 'PKG-INFO' 

2875 

2876 def __init__( 

2877 self, 

2878 location: str | None = None, 

2879 metadata: _MetadataType = None, 

2880 project_name: str | None = None, 

2881 version: str | None = None, 

2882 py_version: str | None = PY_MAJOR, 

2883 platform: str | None = None, 

2884 precedence: int = EGG_DIST, 

2885 ): 

2886 self.project_name = safe_name(project_name or 'Unknown') 

2887 if version is not None: 

2888 self._version = safe_version(version) 

2889 self.py_version = py_version 

2890 self.platform = platform 

2891 self.location = location 

2892 self.precedence = precedence 

2893 self._provider = metadata or empty_provider 

2894 

2895 @classmethod 

2896 def from_location( 

2897 cls, 

2898 location: str, 

2899 basename: StrPath, 

2900 metadata: _MetadataType = None, 

2901 **kw: int, # We could set `precedence` explicitly, but keeping this as `**kw` for full backwards and subclassing compatibility 

2902 ) -> Distribution: 

2903 project_name, version, py_version, platform = [None] * 4 

2904 basename, ext = os.path.splitext(basename) 

2905 if ext.lower() in _distributionImpl: 

2906 cls = _distributionImpl[ext.lower()] 

2907 

2908 match = EGG_NAME(basename) 

2909 if match: 

2910 project_name, version, py_version, platform = match.group( 

2911 'name', 'ver', 'pyver', 'plat' 

2912 ) 

2913 return cls( 

2914 location, 

2915 metadata, 

2916 project_name=project_name, 

2917 version=version, 

2918 py_version=py_version, 

2919 platform=platform, 

2920 **kw, 

2921 )._reload_version() 

2922 

2923 def _reload_version(self): 

2924 return self 

2925 

2926 @property 

2927 def hashcmp(self): 

2928 return ( 

2929 self._forgiving_parsed_version, 

2930 self.precedence, 

2931 self.key, 

2932 self.location, 

2933 self.py_version or '', 

2934 self.platform or '', 

2935 ) 

2936 

2937 def __hash__(self): 

2938 return hash(self.hashcmp) 

2939 

2940 def __lt__(self, other: Distribution): 

2941 return self.hashcmp < other.hashcmp 

2942 

2943 def __le__(self, other: Distribution): 

2944 return self.hashcmp <= other.hashcmp 

2945 

2946 def __gt__(self, other: Distribution): 

2947 return self.hashcmp > other.hashcmp 

2948 

2949 def __ge__(self, other: Distribution): 

2950 return self.hashcmp >= other.hashcmp 

2951 

2952 def __eq__(self, other: object): 

2953 if not isinstance(other, self.__class__): 

2954 # It's not a Distribution, so they are not equal 

2955 return False 

2956 return self.hashcmp == other.hashcmp 

2957 

2958 def __ne__(self, other: object): 

2959 return not self == other 

2960 

2961 # These properties have to be lazy so that we don't have to load any 

2962 # metadata until/unless it's actually needed. (i.e., some distributions 

2963 # may not know their name or version without loading PKG-INFO) 

2964 

2965 @property 

2966 def key(self): 

2967 try: 

2968 return self._key 

2969 except AttributeError: 

2970 self._key = key = self.project_name.lower() 

2971 return key 

2972 

2973 @property 

2974 def parsed_version(self): 

2975 if not hasattr(self, "_parsed_version"): 

2976 try: 

2977 self._parsed_version = parse_version(self.version) 

2978 except _packaging_version.InvalidVersion as ex: 

2979 info = f"(package: {self.project_name})" 

2980 if hasattr(ex, "add_note"): 

2981 ex.add_note(info) # PEP 678 

2982 raise 

2983 raise _packaging_version.InvalidVersion(f"{str(ex)} {info}") from None 

2984 

2985 return self._parsed_version 

2986 

2987 @property 

2988 def _forgiving_parsed_version(self): 

2989 try: 

2990 return self.parsed_version 

2991 except _packaging_version.InvalidVersion as ex: 

2992 self._parsed_version = parse_version(_forgiving_version(self.version)) 

2993 

2994 notes = "\n".join(getattr(ex, "__notes__", [])) # PEP 678 

2995 msg = f"""!!\n\n 

2996 ************************************************************************* 

2997 {str(ex)}\n{notes} 

2998 

2999 This is a long overdue deprecation. 

3000 For the time being, `pkg_resources` will use `{self._parsed_version}` 

3001 as a replacement to avoid breaking existing environments, 

3002 but no future compatibility is guaranteed. 

3003 

3004 If you maintain package {self.project_name} you should implement 

3005 the relevant changes to adequate the project to PEP 440 immediately. 

3006 ************************************************************************* 

3007 \n\n!! 

3008 """ 

3009 warnings.warn(msg, DeprecationWarning) 

3010 

3011 return self._parsed_version 

3012 

3013 @property 

3014 def version(self): 

3015 try: 

3016 return self._version 

3017 except AttributeError as e: 

3018 version = self._get_version() 

3019 if version is None: 

3020 path = self._get_metadata_path_for_display(self.PKG_INFO) 

3021 msg = ("Missing 'Version:' header and/or {} file at path: {}").format( 

3022 self.PKG_INFO, path 

3023 ) 

3024 raise ValueError(msg, self) from e 

3025 

3026 return version 

3027 

3028 @property 

3029 def _dep_map(self): 

3030 """ 

3031 A map of extra to its list of (direct) requirements 

3032 for this distribution, including the null extra. 

3033 """ 

3034 try: 

3035 return self.__dep_map 

3036 except AttributeError: 

3037 self.__dep_map = self._filter_extras(self._build_dep_map()) 

3038 return self.__dep_map 

3039 

3040 @staticmethod 

3041 def _filter_extras(dm: dict[str | None, list[Requirement]]): 

3042 """ 

3043 Given a mapping of extras to dependencies, strip off 

3044 environment markers and filter out any dependencies 

3045 not matching the markers. 

3046 """ 

3047 for extra in list(filter(None, dm)): 

3048 new_extra: str | None = extra 

3049 reqs = dm.pop(extra) 

3050 new_extra, _, marker = extra.partition(':') 

3051 fails_marker = marker and ( 

3052 invalid_marker(marker) or not evaluate_marker(marker) 

3053 ) 

3054 if fails_marker: 

3055 reqs = [] 

3056 new_extra = safe_extra(new_extra) or None 

3057 

3058 dm.setdefault(new_extra, []).extend(reqs) 

3059 return dm 

3060 

3061 def _build_dep_map(self): 

3062 dm = {} 

3063 for name in 'requires.txt', 'depends.txt': 

3064 for extra, reqs in split_sections(self._get_metadata(name)): 

3065 dm.setdefault(extra, []).extend(parse_requirements(reqs)) 

3066 return dm 

3067 

3068 def requires(self, extras: Iterable[str] = ()): 

3069 """List of Requirements needed for this distro if `extras` are used""" 

3070 dm = self._dep_map 

3071 deps: list[Requirement] = [] 

3072 deps.extend(dm.get(None, ())) 

3073 for ext in extras: 

3074 try: 

3075 deps.extend(dm[safe_extra(ext)]) 

3076 except KeyError as e: 

3077 raise UnknownExtra( 

3078 "%s has no such extra feature %r" % (self, ext) 

3079 ) from e 

3080 return deps 

3081 

3082 def _get_metadata_path_for_display(self, name): 

3083 """ 

3084 Return the path to the given metadata file, if available. 

3085 """ 

3086 try: 

3087 # We need to access _get_metadata_path() on the provider object 

3088 # directly rather than through this class's __getattr__() 

3089 # since _get_metadata_path() is marked private. 

3090 path = self._provider._get_metadata_path(name) 

3091 

3092 # Handle exceptions e.g. in case the distribution's metadata 

3093 # provider doesn't support _get_metadata_path(). 

3094 except Exception: 

3095 return '[could not detect]' 

3096 

3097 return path 

3098 

3099 def _get_metadata(self, name): 

3100 if self.has_metadata(name): 

3101 yield from self.get_metadata_lines(name) 

3102 

3103 def _get_version(self): 

3104 lines = self._get_metadata(self.PKG_INFO) 

3105 return _version_from_file(lines) 

3106 

3107 def activate(self, path: list[str] | None = None, replace: bool = False): 

3108 """Ensure distribution is importable on `path` (default=sys.path)""" 

3109 if path is None: 

3110 path = sys.path 

3111 self.insert_on(path, replace=replace) 

3112 if path is sys.path and self.location is not None: 

3113 fixup_namespace_packages(self.location) 

3114 for pkg in self._get_metadata('namespace_packages.txt'): 

3115 if pkg in sys.modules: 

3116 declare_namespace(pkg) 

3117 

3118 def egg_name(self): 

3119 """Return what this distribution's standard .egg filename should be""" 

3120 filename = "%s-%s-py%s" % ( 

3121 to_filename(self.project_name), 

3122 to_filename(self.version), 

3123 self.py_version or PY_MAJOR, 

3124 ) 

3125 

3126 if self.platform: 

3127 filename += '-' + self.platform 

3128 return filename 

3129 

3130 def __repr__(self): 

3131 if self.location: 

3132 return "%s (%s)" % (self, self.location) 

3133 else: 

3134 return str(self) 

3135 

3136 def __str__(self): 

3137 try: 

3138 version = getattr(self, 'version', None) 

3139 except ValueError: 

3140 version = None 

3141 version = version or "[unknown version]" 

3142 return "%s %s" % (self.project_name, version) 

3143 

3144 def __getattr__(self, attr): 

3145 """Delegate all unrecognized public attributes to .metadata provider""" 

3146 if attr.startswith('_'): 

3147 raise AttributeError(attr) 

3148 return getattr(self._provider, attr) 

3149 

3150 def __dir__(self): 

3151 return list( 

3152 set(super().__dir__()) 

3153 | set(attr for attr in self._provider.__dir__() if not attr.startswith('_')) 

3154 ) 

3155 

3156 @classmethod 

3157 def from_filename( 

3158 cls, 

3159 filename: StrPath, 

3160 metadata: _MetadataType = None, 

3161 **kw: int, # We could set `precedence` explicitly, but keeping this as `**kw` for full backwards and subclassing compatibility 

3162 ): 

3163 return cls.from_location( 

3164 _normalize_cached(filename), os.path.basename(filename), metadata, **kw 

3165 ) 

3166 

3167 def as_requirement(self): 

3168 """Return a ``Requirement`` that matches this distribution exactly""" 

3169 if isinstance(self.parsed_version, _packaging_version.Version): 

3170 spec = "%s==%s" % (self.project_name, self.parsed_version) 

3171 else: 

3172 spec = "%s===%s" % (self.project_name, self.parsed_version) 

3173 

3174 return Requirement.parse(spec) 

3175 

3176 def load_entry_point(self, group: str, name: str) -> _ResolvedEntryPoint: 

3177 """Return the `name` entry point of `group` or raise ImportError""" 

3178 ep = self.get_entry_info(group, name) 

3179 if ep is None: 

3180 raise ImportError("Entry point %r not found" % ((group, name),)) 

3181 return ep.load() 

3182 

3183 @overload 

3184 def get_entry_map(self, group: None = None) -> dict[str, dict[str, EntryPoint]]: ... 

3185 @overload 

3186 def get_entry_map(self, group: str) -> dict[str, EntryPoint]: ... 

3187 def get_entry_map(self, group: str | None = None): 

3188 """Return the entry point map for `group`, or the full entry map""" 

3189 if not hasattr(self, "_ep_map"): 

3190 self._ep_map = EntryPoint.parse_map( 

3191 self._get_metadata('entry_points.txt'), self 

3192 ) 

3193 if group is not None: 

3194 return self._ep_map.get(group, {}) 

3195 return self._ep_map 

3196 

3197 def get_entry_info(self, group: str, name: str): 

3198 """Return the EntryPoint object for `group`+`name`, or ``None``""" 

3199 return self.get_entry_map(group).get(name) 

3200 

3201 # FIXME: 'Distribution.insert_on' is too complex (13) 

3202 def insert_on( # noqa: C901 

3203 self, 

3204 path: list[str], 

3205 loc=None, 

3206 replace: bool = False, 

3207 ): 

3208 """Ensure self.location is on path 

3209 

3210 If replace=False (default): 

3211 - If location is already in path anywhere, do nothing. 

3212 - Else: 

3213 - If it's an egg and its parent directory is on path, 

3214 insert just ahead of the parent. 

3215 - Else: add to the end of path. 

3216 If replace=True: 

3217 - If location is already on path anywhere (not eggs) 

3218 or higher priority than its parent (eggs) 

3219 do nothing. 

3220 - Else: 

3221 - If it's an egg and its parent directory is on path, 

3222 insert just ahead of the parent, 

3223 removing any lower-priority entries. 

3224 - Else: add it to the front of path. 

3225 """ 

3226 

3227 loc = loc or self.location 

3228 if not loc: 

3229 return 

3230 

3231 nloc = _normalize_cached(loc) 

3232 bdir = os.path.dirname(nloc) 

3233 npath = [(p and _normalize_cached(p) or p) for p in path] 

3234 

3235 for p, item in enumerate(npath): 

3236 if item == nloc: 

3237 if replace: 

3238 break 

3239 else: 

3240 # don't modify path (even removing duplicates) if 

3241 # found and not replace 

3242 return 

3243 elif item == bdir and self.precedence == EGG_DIST: 

3244 # if it's an .egg, give it precedence over its directory 

3245 # UNLESS it's already been added to sys.path and replace=False 

3246 if (not replace) and nloc in npath[p:]: 

3247 return 

3248 if path is sys.path: 

3249 self.check_version_conflict() 

3250 path.insert(p, loc) 

3251 npath.insert(p, nloc) 

3252 break 

3253 else: 

3254 if path is sys.path: 

3255 self.check_version_conflict() 

3256 if replace: 

3257 path.insert(0, loc) 

3258 else: 

3259 path.append(loc) 

3260 return 

3261 

3262 # p is the spot where we found or inserted loc; now remove duplicates 

3263 while True: 

3264 try: 

3265 np = npath.index(nloc, p + 1) 

3266 except ValueError: 

3267 break 

3268 else: 

3269 del npath[np], path[np] 

3270 # ha! 

3271 p = np 

3272 

3273 return 

3274 

3275 def check_version_conflict(self): 

3276 if self.key == 'setuptools': 

3277 # ignore the inevitable setuptools self-conflicts :( 

3278 return 

3279 

3280 nsp = dict.fromkeys(self._get_metadata('namespace_packages.txt')) 

3281 loc = normalize_path(self.location) 

3282 for modname in self._get_metadata('top_level.txt'): 

3283 if ( 

3284 modname not in sys.modules 

3285 or modname in nsp 

3286 or modname in _namespace_packages 

3287 ): 

3288 continue 

3289 if modname in ('pkg_resources', 'setuptools', 'site'): 

3290 continue 

3291 fn = getattr(sys.modules[modname], '__file__', None) 

3292 if fn and ( 

3293 normalize_path(fn).startswith(loc) or fn.startswith(self.location) 

3294 ): 

3295 continue 

3296 issue_warning( 

3297 "Module %s was already imported from %s, but %s is being added" 

3298 " to sys.path" % (modname, fn, self.location), 

3299 ) 

3300 

3301 def has_version(self): 

3302 try: 

3303 self.version 

3304 except ValueError: 

3305 issue_warning("Unbuilt egg for " + repr(self)) 

3306 return False 

3307 except SystemError: 

3308 # TODO: remove this except clause when python/cpython#103632 is fixed. 

3309 return False 

3310 return True 

3311 

3312 def clone(self, **kw: str | int | IResourceProvider | None): 

3313 """Copy this distribution, substituting in any changed keyword args""" 

3314 names = 'project_name version py_version platform location precedence' 

3315 for attr in names.split(): 

3316 kw.setdefault(attr, getattr(self, attr, None)) 

3317 kw.setdefault('metadata', self._provider) 

3318 # Unsafely unpacking. But keeping **kw for backwards and subclassing compatibility 

3319 return self.__class__(**kw) # type:ignore[arg-type] 

3320 

3321 @property 

3322 def extras(self): 

3323 return [dep for dep in self._dep_map if dep] 

3324 

3325 

3326class EggInfoDistribution(Distribution): 

3327 def _reload_version(self): 

3328 """ 

3329 Packages installed by distutils (e.g. numpy or scipy), 

3330 which uses an old safe_version, and so 

3331 their version numbers can get mangled when 

3332 converted to filenames (e.g., 1.11.0.dev0+2329eae to 

3333 1.11.0.dev0_2329eae). These distributions will not be 

3334 parsed properly 

3335 downstream by Distribution and safe_version, so 

3336 take an extra step and try to get the version number from 

3337 the metadata file itself instead of the filename. 

3338 """ 

3339 md_version = self._get_version() 

3340 if md_version: 

3341 self._version = md_version 

3342 return self 

3343 

3344 

3345class DistInfoDistribution(Distribution): 

3346 """ 

3347 Wrap an actual or potential sys.path entry 

3348 w/metadata, .dist-info style. 

3349 """ 

3350 

3351 PKG_INFO = 'METADATA' 

3352 EQEQ = re.compile(r"([\(,])\s*(\d.*?)\s*([,\)])") 

3353 

3354 @property 

3355 def _parsed_pkg_info(self): 

3356 """Parse and cache metadata""" 

3357 try: 

3358 return self._pkg_info 

3359 except AttributeError: 

3360 metadata = self.get_metadata(self.PKG_INFO) 

3361 self._pkg_info = email.parser.Parser().parsestr(metadata) 

3362 return self._pkg_info 

3363 

3364 @property 

3365 def _dep_map(self): 

3366 try: 

3367 return self.__dep_map 

3368 except AttributeError: 

3369 self.__dep_map = self._compute_dependencies() 

3370 return self.__dep_map 

3371 

3372 def _compute_dependencies(self) -> dict[str | None, list[Requirement]]: 

3373 """Recompute this distribution's dependencies.""" 

3374 self.__dep_map: dict[str | None, list[Requirement]] = {None: []} 

3375 

3376 reqs: list[Requirement] = [] 

3377 # Including any condition expressions 

3378 for req in self._parsed_pkg_info.get_all('Requires-Dist') or []: 

3379 reqs.extend(parse_requirements(req)) 

3380 

3381 def reqs_for_extra(extra): 

3382 for req in reqs: 

3383 if not req.marker or req.marker.evaluate({'extra': extra}): 

3384 yield req 

3385 

3386 common = types.MappingProxyType(dict.fromkeys(reqs_for_extra(None))) 

3387 self.__dep_map[None].extend(common) 

3388 

3389 for extra in self._parsed_pkg_info.get_all('Provides-Extra') or []: 

3390 s_extra = safe_extra(extra.strip()) 

3391 self.__dep_map[s_extra] = [ 

3392 r for r in reqs_for_extra(extra) if r not in common 

3393 ] 

3394 

3395 return self.__dep_map 

3396 

3397 

3398_distributionImpl = { 

3399 '.egg': Distribution, 

3400 '.egg-info': EggInfoDistribution, 

3401 '.dist-info': DistInfoDistribution, 

3402} 

3403 

3404 

3405def issue_warning(*args, **kw): 

3406 level = 1 

3407 g = globals() 

3408 try: 

3409 # find the first stack frame that is *not* code in 

3410 # the pkg_resources module, to use for the warning 

3411 while sys._getframe(level).f_globals is g: 

3412 level += 1 

3413 except ValueError: 

3414 pass 

3415 warnings.warn(stacklevel=level + 1, *args, **kw) 

3416 

3417 

3418def parse_requirements(strs: _NestedStr): 

3419 """ 

3420 Yield ``Requirement`` objects for each specification in `strs`. 

3421 

3422 `strs` must be a string, or a (possibly-nested) iterable thereof. 

3423 """ 

3424 return map(Requirement, join_continuation(map(drop_comment, yield_lines(strs)))) 

3425 

3426 

3427class RequirementParseError(_packaging_requirements.InvalidRequirement): 

3428 "Compatibility wrapper for InvalidRequirement" 

3429 

3430 

3431class Requirement(_packaging_requirements.Requirement): 

3432 def __init__(self, requirement_string: str): 

3433 """DO NOT CALL THIS UNDOCUMENTED METHOD; use Requirement.parse()!""" 

3434 super().__init__(requirement_string) 

3435 self.unsafe_name = self.name 

3436 project_name = safe_name(self.name) 

3437 self.project_name, self.key = project_name, project_name.lower() 

3438 self.specs = [(spec.operator, spec.version) for spec in self.specifier] 

3439 # packaging.requirements.Requirement uses a set for its extras. We use a variable-length tuple 

3440 self.extras: tuple[str] = tuple(map(safe_extra, self.extras)) 

3441 self.hashCmp = ( 

3442 self.key, 

3443 self.url, 

3444 self.specifier, 

3445 frozenset(self.extras), 

3446 str(self.marker) if self.marker else None, 

3447 ) 

3448 self.__hash = hash(self.hashCmp) 

3449 

3450 def __eq__(self, other: object): 

3451 return isinstance(other, Requirement) and self.hashCmp == other.hashCmp 

3452 

3453 def __ne__(self, other): 

3454 return not self == other 

3455 

3456 def __contains__(self, item: Distribution | str | tuple[str, ...]) -> bool: 

3457 if isinstance(item, Distribution): 

3458 if item.key != self.key: 

3459 return False 

3460 

3461 item = item.version 

3462 

3463 # Allow prereleases always in order to match the previous behavior of 

3464 # this method. In the future this should be smarter and follow PEP 440 

3465 # more accurately. 

3466 return self.specifier.contains(item, prereleases=True) 

3467 

3468 def __hash__(self): 

3469 return self.__hash 

3470 

3471 def __repr__(self): 

3472 return "Requirement.parse(%r)" % str(self) 

3473 

3474 @staticmethod 

3475 def parse(s: str | Iterable[str]): 

3476 (req,) = parse_requirements(s) 

3477 return req 

3478 

3479 

3480def _always_object(classes): 

3481 """ 

3482 Ensure object appears in the mro even 

3483 for old-style classes. 

3484 """ 

3485 if object not in classes: 

3486 return classes + (object,) 

3487 return classes 

3488 

3489 

3490def _find_adapter(registry: Mapping[type, _AdapterT], ob: object) -> _AdapterT: 

3491 """Return an adapter factory for `ob` from `registry`""" 

3492 types = _always_object(inspect.getmro(getattr(ob, '__class__', type(ob)))) 

3493 for t in types: 

3494 if t in registry: 

3495 return registry[t] 

3496 # _find_adapter would previously return None, and immediately be called. 

3497 # So we're raising a TypeError to keep backward compatibility if anyone depended on that behaviour. 

3498 raise TypeError(f"Could not find adapter for {registry} and {ob}") 

3499 

3500 

3501def ensure_directory(path: StrOrBytesPath): 

3502 """Ensure that the parent directory of `path` exists""" 

3503 dirname = os.path.dirname(path) 

3504 os.makedirs(dirname, exist_ok=True) 

3505 

3506 

3507def _bypass_ensure_directory(path): 

3508 """Sandbox-bypassing version of ensure_directory()""" 

3509 if not WRITE_SUPPORT: 

3510 raise OSError('"os.mkdir" not supported on this platform.') 

3511 dirname, filename = split(path) 

3512 if dirname and filename and not isdir(dirname): 

3513 _bypass_ensure_directory(dirname) 

3514 try: 

3515 mkdir(dirname, 0o755) 

3516 except FileExistsError: 

3517 pass 

3518 

3519 

3520def split_sections(s: _NestedStr) -> Iterator[tuple[str | None, list[str]]]: 

3521 """Split a string or iterable thereof into (section, content) pairs 

3522 

3523 Each ``section`` is a stripped version of the section header ("[section]") 

3524 and each ``content`` is a list of stripped lines excluding blank lines and 

3525 comment-only lines. If there are any such lines before the first section 

3526 header, they're returned in a first ``section`` of ``None``. 

3527 """ 

3528 section = None 

3529 content = [] 

3530 for line in yield_lines(s): 

3531 if line.startswith("["): 

3532 if line.endswith("]"): 

3533 if section or content: 

3534 yield section, content 

3535 section = line[1:-1].strip() 

3536 content = [] 

3537 else: 

3538 raise ValueError("Invalid section heading", line) 

3539 else: 

3540 content.append(line) 

3541 

3542 # wrap up last segment 

3543 yield section, content 

3544 

3545 

3546def _mkstemp(*args, **kw): 

3547 old_open = os.open 

3548 try: 

3549 # temporarily bypass sandboxing 

3550 os.open = os_open 

3551 return tempfile.mkstemp(*args, **kw) 

3552 finally: 

3553 # and then put it back 

3554 os.open = old_open 

3555 

3556 

3557# Silence the PEP440Warning by default, so that end users don't get hit by it 

3558# randomly just because they use pkg_resources. We want to append the rule 

3559# because we want earlier uses of filterwarnings to take precedence over this 

3560# one. 

3561warnings.filterwarnings("ignore", category=PEP440Warning, append=True) 

3562 

3563 

3564class PkgResourcesDeprecationWarning(Warning): 

3565 """ 

3566 Base class for warning about deprecations in ``pkg_resources`` 

3567 

3568 This class is not derived from ``DeprecationWarning``, and as such is 

3569 visible by default. 

3570 """ 

3571 

3572 

3573# Ported from ``setuptools`` to avoid introducing an import inter-dependency: 

3574_LOCALE_ENCODING = "locale" if sys.version_info >= (3, 10) else None 

3575 

3576 

3577def _read_utf8_with_fallback(file: str, fallback_encoding=_LOCALE_ENCODING) -> str: 

3578 """See setuptools.unicode_utils._read_utf8_with_fallback""" 

3579 try: 

3580 with open(file, "r", encoding="utf-8") as f: 

3581 return f.read() 

3582 except UnicodeDecodeError: # pragma: no cover 

3583 msg = f"""\ 

3584 ******************************************************************************** 

3585 `encoding="utf-8"` fails with {file!r}, trying `encoding={fallback_encoding!r}`. 

3586 

3587 This fallback behaviour is considered **deprecated** and future versions of 

3588 `setuptools/pkg_resources` may not implement it. 

3589 

3590 Please encode {file!r} with "utf-8" to ensure future builds will succeed. 

3591 

3592 If this file was produced by `setuptools` itself, cleaning up the cached files 

3593 and re-building/re-installing the package with a newer version of `setuptools` 

3594 (e.g. by updating `build-system.requires` in its `pyproject.toml`) 

3595 might solve the problem. 

3596 ******************************************************************************** 

3597 """ 

3598 # TODO: Add a deadline? 

3599 # See comment in setuptools.unicode_utils._Utf8EncodingNeeded 

3600 warnings.warn(msg, PkgResourcesDeprecationWarning, stacklevel=2) 

3601 with open(file, "r", encoding=fallback_encoding) as f: 

3602 return f.read() 

3603 

3604 

3605# from jaraco.functools 1.3 

3606def _call_aside(f, *args, **kwargs): 

3607 f(*args, **kwargs) 

3608 return f 

3609 

3610 

3611@_call_aside 

3612def _initialize(g=globals()): 

3613 "Set up global resource manager (deliberately not state-saved)" 

3614 manager = ResourceManager() 

3615 g['_manager'] = manager 

3616 g.update( 

3617 (name, getattr(manager, name)) 

3618 for name in dir(manager) 

3619 if not name.startswith('_') 

3620 ) 

3621 

3622 

3623@_call_aside 

3624def _initialize_master_working_set(): 

3625 """ 

3626 Prepare the master working set and make the ``require()`` 

3627 API available. 

3628 

3629 This function has explicit effects on the global state 

3630 of pkg_resources. It is intended to be invoked once at 

3631 the initialization of this module. 

3632 

3633 Invocation by other packages is unsupported and done 

3634 at their own risk. 

3635 """ 

3636 working_set = _declare_state('object', 'working_set', WorkingSet._build_master()) 

3637 

3638 require = working_set.require 

3639 iter_entry_points = working_set.iter_entry_points 

3640 add_activation_listener = working_set.subscribe 

3641 run_script = working_set.run_script 

3642 # backward compatibility 

3643 run_main = run_script 

3644 # Activate all distributions already on sys.path with replace=False and 

3645 # ensure that all distributions added to the working set in the future 

3646 # (e.g. by calling ``require()``) will get activated as well, 

3647 # with higher priority (replace=True). 

3648 tuple(dist.activate(replace=False) for dist in working_set) 

3649 add_activation_listener( 

3650 lambda dist: dist.activate(replace=True), 

3651 existing=False, 

3652 ) 

3653 working_set.entries = [] 

3654 # match order 

3655 list(map(working_set.add_entry, sys.path)) 

3656 globals().update(locals()) 

3657 

3658 

3659if TYPE_CHECKING: 

3660 # All of these are set by the @_call_aside methods above 

3661 __resource_manager = ResourceManager() # Won't exist at runtime 

3662 resource_exists = __resource_manager.resource_exists 

3663 resource_isdir = __resource_manager.resource_isdir 

3664 resource_filename = __resource_manager.resource_filename 

3665 resource_stream = __resource_manager.resource_stream 

3666 resource_string = __resource_manager.resource_string 

3667 resource_listdir = __resource_manager.resource_listdir 

3668 set_extraction_path = __resource_manager.set_extraction_path 

3669 cleanup_resources = __resource_manager.cleanup_resources 

3670 

3671 working_set = WorkingSet() 

3672 require = working_set.require 

3673 iter_entry_points = working_set.iter_entry_points 

3674 add_activation_listener = working_set.subscribe 

3675 run_script = working_set.run_script 

3676 run_main = run_script