Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pkg_resources/__init__.py: 1%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1640 statements  

1# TODO: Add Generic type annotations to initialized collections. 

2# For now we'd simply use implicit Any/Unknown which would add redundant annotations 

3# mypy: disable-error-code="var-annotated" 

4""" 

5Package resource API 

6-------------------- 

7 

8A resource is a logical file contained within a package, or a logical 

9subdirectory thereof. The package resource API expects resource names 

10to have their path parts separated with ``/``, *not* whatever the local 

11path separator is. Do not use os.path operations to manipulate resource 

12names being passed into the API. 

13 

14The package resource API is designed to work with normal filesystem packages, 

15.egg files, and unpacked .egg files. It can also work in a limited way with 

16.zip files and with custom PEP 302 loaders that support the ``get_data()`` 

17method. 

18 

19This module is deprecated. Users are directed to :mod:`importlib.resources`, 

20:mod:`importlib.metadata` and :pypi:`packaging` instead. 

21""" 

22 

23from __future__ import annotations 

24 

25import sys 

26 

27if sys.version_info < (3, 8): # noqa: UP036 # Check for unsupported versions 

28 raise RuntimeError("Python 3.8 or later is required") 

29 

30import os 

31import io 

32import time 

33import re 

34import types 

35from typing import ( 

36 Any, 

37 BinaryIO, 

38 Literal, 

39 Dict, 

40 Iterator, 

41 Mapping, 

42 MutableSequence, 

43 NamedTuple, 

44 NoReturn, 

45 Tuple, 

46 Union, 

47 TYPE_CHECKING, 

48 Protocol, 

49 Callable, 

50 Iterable, 

51 TypeVar, 

52 overload, 

53) 

54import zipfile 

55import zipimport 

56import warnings 

57import stat 

58import functools 

59import pkgutil 

60import operator 

61import platform 

62import collections 

63import plistlib 

64import email.parser 

65import errno 

66import tempfile 

67import textwrap 

68import inspect 

69import ntpath 

70import posixpath 

71import importlib 

72import importlib.abc 

73import importlib.machinery 

74from pkgutil import get_importer 

75 

76import _imp 

77 

78sys.path.extend(((vendor_path := os.path.join(os.path.dirname(os.path.dirname(__file__)), 'setuptools', '_vendor')) not in sys.path) * [vendor_path]) # fmt: skip 

79# workaround for #4476 

80sys.modules.pop('backports', None) 

81 

82# capture these to bypass sandboxing 

83from os import utime 

84from os import open as os_open 

85from os.path import isdir, split 

86 

87try: 

88 from os import mkdir, rename, unlink 

89 

90 WRITE_SUPPORT = True 

91except ImportError: 

92 # no write support, probably under GAE 

93 WRITE_SUPPORT = False 

94 

95import packaging.specifiers 

96from jaraco.text import ( 

97 yield_lines, 

98 drop_comment, 

99 join_continuation, 

100) 

101from packaging import markers as _packaging_markers 

102from packaging import requirements as _packaging_requirements 

103from packaging import utils as _packaging_utils 

104from packaging import version as _packaging_version 

105from platformdirs import user_cache_dir as _user_cache_dir 

106 

107if TYPE_CHECKING: 

108 from _typeshed import BytesPath, StrPath, StrOrBytesPath 

109 from typing_extensions import Self 

110 

111warnings.warn( 

112 "pkg_resources is deprecated as an API. " 

113 "See https://setuptools.pypa.io/en/latest/pkg_resources.html", 

114 DeprecationWarning, 

115 stacklevel=2, 

116) 

117 

118_T = TypeVar("_T") 

119_DistributionT = TypeVar("_DistributionT", bound="Distribution") 

120# Type aliases 

121_NestedStr = Union[str, Iterable[Union[str, Iterable["_NestedStr"]]]] 

122_InstallerTypeT = Callable[["Requirement"], "_DistributionT"] 

123_InstallerType = Callable[["Requirement"], Union["Distribution", None]] 

124_PkgReqType = Union[str, "Requirement"] 

125_EPDistType = Union["Distribution", _PkgReqType] 

126_MetadataType = Union["IResourceProvider", None] 

127_ResolvedEntryPoint = Any # Can be any attribute in the module 

128_ResourceStream = Any # TODO / Incomplete: A readable file-like object 

129# Any object works, but let's indicate we expect something like a module (optionally has __loader__ or __file__) 

130_ModuleLike = Union[object, types.ModuleType] 

131# Any: Should be _ModuleLike but we end up with issues where _ModuleLike doesn't have _ZipLoaderModule's __loader__ 

132_ProviderFactoryType = Callable[[Any], "IResourceProvider"] 

133_DistFinderType = Callable[[_T, str, bool], Iterable["Distribution"]] 

134_NSHandlerType = Callable[[_T, str, str, types.ModuleType], Union[str, None]] 

135_AdapterT = TypeVar( 

136 "_AdapterT", _DistFinderType[Any], _ProviderFactoryType, _NSHandlerType[Any] 

137) 

138 

139 

140# Use _typeshed.importlib.LoaderProtocol once available https://github.com/python/typeshed/pull/11890 

141class _LoaderProtocol(Protocol): 

142 def load_module(self, fullname: str, /) -> types.ModuleType: ... 

143 

144 

145class _ZipLoaderModule(Protocol): 

146 __loader__: zipimport.zipimporter 

147 

148 

149_PEP440_FALLBACK = re.compile(r"^v?(?P<safe>(?:[0-9]+!)?[0-9]+(?:\.[0-9]+)*)", re.I) 

150 

151 

152class PEP440Warning(RuntimeWarning): 

153 """ 

154 Used when there is an issue with a version or specifier not complying with 

155 PEP 440. 

156 """ 

157 

158 

159parse_version = _packaging_version.Version 

160 

161_state_vars: dict[str, str] = {} 

162 

163 

164def _declare_state(vartype: str, varname: str, initial_value: _T) -> _T: 

165 _state_vars[varname] = vartype 

166 return initial_value 

167 

168 

169def __getstate__() -> dict[str, Any]: 

170 state = {} 

171 g = globals() 

172 for k, v in _state_vars.items(): 

173 state[k] = g['_sget_' + v](g[k]) 

174 return state 

175 

176 

177def __setstate__(state: dict[str, Any]) -> dict[str, Any]: 

178 g = globals() 

179 for k, v in state.items(): 

180 g['_sset_' + _state_vars[k]](k, g[k], v) 

181 return state 

182 

183 

184def _sget_dict(val): 

185 return val.copy() 

186 

187 

188def _sset_dict(key, ob, state): 

189 ob.clear() 

190 ob.update(state) 

191 

192 

193def _sget_object(val): 

194 return val.__getstate__() 

195 

196 

197def _sset_object(key, ob, state): 

198 ob.__setstate__(state) 

199 

200 

201_sget_none = _sset_none = lambda *args: None 

202 

203 

204def get_supported_platform(): 

205 """Return this platform's maximum compatible version. 

206 

207 distutils.util.get_platform() normally reports the minimum version 

208 of macOS that would be required to *use* extensions produced by 

209 distutils. But what we want when checking compatibility is to know the 

210 version of macOS that we are *running*. To allow usage of packages that 

211 explicitly require a newer version of macOS, we must also know the 

212 current version of the OS. 

213 

214 If this condition occurs for any other platform with a version in its 

215 platform strings, this function should be extended accordingly. 

216 """ 

217 plat = get_build_platform() 

218 m = macosVersionString.match(plat) 

219 if m is not None and sys.platform == "darwin": 

220 try: 

221 plat = 'macosx-%s-%s' % ('.'.join(_macos_vers()[:2]), m.group(3)) 

222 except ValueError: 

223 # not macOS 

224 pass 

225 return plat 

226 

227 

228__all__ = [ 

229 # Basic resource access and distribution/entry point discovery 

230 'require', 

231 'run_script', 

232 'get_provider', 

233 'get_distribution', 

234 'load_entry_point', 

235 'get_entry_map', 

236 'get_entry_info', 

237 'iter_entry_points', 

238 'resource_string', 

239 'resource_stream', 

240 'resource_filename', 

241 'resource_listdir', 

242 'resource_exists', 

243 'resource_isdir', 

244 # Environmental control 

245 'declare_namespace', 

246 'working_set', 

247 'add_activation_listener', 

248 'find_distributions', 

249 'set_extraction_path', 

250 'cleanup_resources', 

251 'get_default_cache', 

252 # Primary implementation classes 

253 'Environment', 

254 'WorkingSet', 

255 'ResourceManager', 

256 'Distribution', 

257 'Requirement', 

258 'EntryPoint', 

259 # Exceptions 

260 'ResolutionError', 

261 'VersionConflict', 

262 'DistributionNotFound', 

263 'UnknownExtra', 

264 'ExtractionError', 

265 # Warnings 

266 'PEP440Warning', 

267 # Parsing functions and string utilities 

268 'parse_requirements', 

269 'parse_version', 

270 'safe_name', 

271 'safe_version', 

272 'get_platform', 

273 'compatible_platforms', 

274 'yield_lines', 

275 'split_sections', 

276 'safe_extra', 

277 'to_filename', 

278 'invalid_marker', 

279 'evaluate_marker', 

280 # filesystem utilities 

281 'ensure_directory', 

282 'normalize_path', 

283 # Distribution "precedence" constants 

284 'EGG_DIST', 

285 'BINARY_DIST', 

286 'SOURCE_DIST', 

287 'CHECKOUT_DIST', 

288 'DEVELOP_DIST', 

289 # "Provider" interfaces, implementations, and registration/lookup APIs 

290 'IMetadataProvider', 

291 'IResourceProvider', 

292 'FileMetadata', 

293 'PathMetadata', 

294 'EggMetadata', 

295 'EmptyProvider', 

296 'empty_provider', 

297 'NullProvider', 

298 'EggProvider', 

299 'DefaultProvider', 

300 'ZipProvider', 

301 'register_finder', 

302 'register_namespace_handler', 

303 'register_loader_type', 

304 'fixup_namespace_packages', 

305 'get_importer', 

306 # Warnings 

307 'PkgResourcesDeprecationWarning', 

308 # Deprecated/backward compatibility only 

309 'run_main', 

310 'AvailableDistributions', 

311] 

312 

313 

314class ResolutionError(Exception): 

315 """Abstract base for dependency resolution errors""" 

316 

317 def __repr__(self): 

318 return self.__class__.__name__ + repr(self.args) 

319 

320 

321class VersionConflict(ResolutionError): 

322 """ 

323 An already-installed version conflicts with the requested version. 

324 

325 Should be initialized with the installed Distribution and the requested 

326 Requirement. 

327 """ 

328 

329 _template = "{self.dist} is installed but {self.req} is required" 

330 

331 @property 

332 def dist(self) -> Distribution: 

333 return self.args[0] 

334 

335 @property 

336 def req(self) -> Requirement: 

337 return self.args[1] 

338 

339 def report(self): 

340 return self._template.format(**locals()) 

341 

342 def with_context( 

343 self, required_by: set[Distribution | str] 

344 ) -> Self | ContextualVersionConflict: 

345 """ 

346 If required_by is non-empty, return a version of self that is a 

347 ContextualVersionConflict. 

348 """ 

349 if not required_by: 

350 return self 

351 args = self.args + (required_by,) 

352 return ContextualVersionConflict(*args) 

353 

354 

355class ContextualVersionConflict(VersionConflict): 

356 """ 

357 A VersionConflict that accepts a third parameter, the set of the 

358 requirements that required the installed Distribution. 

359 """ 

360 

361 _template = VersionConflict._template + ' by {self.required_by}' 

362 

363 @property 

364 def required_by(self) -> set[str]: 

365 return self.args[2] 

366 

367 

368class DistributionNotFound(ResolutionError): 

369 """A requested distribution was not found""" 

370 

371 _template = ( 

372 "The '{self.req}' distribution was not found " 

373 "and is required by {self.requirers_str}" 

374 ) 

375 

376 @property 

377 def req(self) -> Requirement: 

378 return self.args[0] 

379 

380 @property 

381 def requirers(self) -> set[str] | None: 

382 return self.args[1] 

383 

384 @property 

385 def requirers_str(self): 

386 if not self.requirers: 

387 return 'the application' 

388 return ', '.join(self.requirers) 

389 

390 def report(self): 

391 return self._template.format(**locals()) 

392 

393 def __str__(self): 

394 return self.report() 

395 

396 

397class UnknownExtra(ResolutionError): 

398 """Distribution doesn't have an "extra feature" of the given name""" 

399 

400 

401_provider_factories: dict[type[_ModuleLike], _ProviderFactoryType] = {} 

402 

403PY_MAJOR = '{}.{}'.format(*sys.version_info) 

404EGG_DIST = 3 

405BINARY_DIST = 2 

406SOURCE_DIST = 1 

407CHECKOUT_DIST = 0 

408DEVELOP_DIST = -1 

409 

410 

411def register_loader_type( 

412 loader_type: type[_ModuleLike], provider_factory: _ProviderFactoryType 

413) -> None: 

414 """Register `provider_factory` to make providers for `loader_type` 

415 

416 `loader_type` is the type or class of a PEP 302 ``module.__loader__``, 

417 and `provider_factory` is a function that, passed a *module* object, 

418 returns an ``IResourceProvider`` for that module. 

419 """ 

420 _provider_factories[loader_type] = provider_factory 

421 

422 

423@overload 

424def get_provider(moduleOrReq: str) -> IResourceProvider: ... 

425@overload 

426def get_provider(moduleOrReq: Requirement) -> Distribution: ... 

427def get_provider(moduleOrReq: str | Requirement) -> IResourceProvider | Distribution: 

428 """Return an IResourceProvider for the named module or requirement""" 

429 if isinstance(moduleOrReq, Requirement): 

430 return working_set.find(moduleOrReq) or require(str(moduleOrReq))[0] 

431 try: 

432 module = sys.modules[moduleOrReq] 

433 except KeyError: 

434 __import__(moduleOrReq) 

435 module = sys.modules[moduleOrReq] 

436 loader = getattr(module, '__loader__', None) 

437 return _find_adapter(_provider_factories, loader)(module) 

438 

439 

440@functools.lru_cache(maxsize=None) 

441def _macos_vers(): 

442 version = platform.mac_ver()[0] 

443 # fallback for MacPorts 

444 if version == '': 

445 plist = '/System/Library/CoreServices/SystemVersion.plist' 

446 if os.path.exists(plist): 

447 with open(plist, 'rb') as fh: 

448 plist_content = plistlib.load(fh) 

449 if 'ProductVersion' in plist_content: 

450 version = plist_content['ProductVersion'] 

451 return version.split('.') 

452 

453 

454def _macos_arch(machine): 

455 return {'PowerPC': 'ppc', 'Power_Macintosh': 'ppc'}.get(machine, machine) 

456 

457 

458def get_build_platform(): 

459 """Return this platform's string for platform-specific distributions 

460 

461 XXX Currently this is the same as ``distutils.util.get_platform()``, but it 

462 needs some hacks for Linux and macOS. 

463 """ 

464 from sysconfig import get_platform 

465 

466 plat = get_platform() 

467 if sys.platform == "darwin" and not plat.startswith('macosx-'): 

468 try: 

469 version = _macos_vers() 

470 machine = os.uname()[4].replace(" ", "_") 

471 return "macosx-%d.%d-%s" % ( 

472 int(version[0]), 

473 int(version[1]), 

474 _macos_arch(machine), 

475 ) 

476 except ValueError: 

477 # if someone is running a non-Mac darwin system, this will fall 

478 # through to the default implementation 

479 pass 

480 return plat 

481 

482 

483macosVersionString = re.compile(r"macosx-(\d+)\.(\d+)-(.*)") 

484darwinVersionString = re.compile(r"darwin-(\d+)\.(\d+)\.(\d+)-(.*)") 

485# XXX backward compat 

486get_platform = get_build_platform 

487 

488 

489def compatible_platforms(provided: str | None, required: str | None) -> bool: 

490 """Can code for the `provided` platform run on the `required` platform? 

491 

492 Returns true if either platform is ``None``, or the platforms are equal. 

493 

494 XXX Needs compatibility checks for Linux and other unixy OSes. 

495 """ 

496 if provided is None or required is None or provided == required: 

497 # easy case 

498 return True 

499 

500 # macOS special cases 

501 reqMac = macosVersionString.match(required) 

502 if reqMac: 

503 provMac = macosVersionString.match(provided) 

504 

505 # is this a Mac package? 

506 if not provMac: 

507 # this is backwards compatibility for packages built before 

508 # setuptools 0.6. All packages built after this point will 

509 # use the new macOS designation. 

510 provDarwin = darwinVersionString.match(provided) 

511 if provDarwin: 

512 dversion = int(provDarwin.group(1)) 

513 macosversion = "%s.%s" % (reqMac.group(1), reqMac.group(2)) 

514 if ( 

515 dversion == 7 

516 and macosversion >= "10.3" 

517 or dversion == 8 

518 and macosversion >= "10.4" 

519 ): 

520 return True 

521 # egg isn't macOS or legacy darwin 

522 return False 

523 

524 # are they the same major version and machine type? 

525 if provMac.group(1) != reqMac.group(1) or provMac.group(3) != reqMac.group(3): 

526 return False 

527 

528 # is the required OS major update >= the provided one? 

529 if int(provMac.group(2)) > int(reqMac.group(2)): 

530 return False 

531 

532 return True 

533 

534 # XXX Linux and other platforms' special cases should go here 

535 return False 

536 

537 

538@overload 

539def get_distribution(dist: _DistributionT) -> _DistributionT: ... 

540@overload 

541def get_distribution(dist: _PkgReqType) -> Distribution: ... 

542def get_distribution(dist: Distribution | _PkgReqType) -> Distribution: 

543 """Return a current distribution object for a Requirement or string""" 

544 if isinstance(dist, str): 

545 dist = Requirement.parse(dist) 

546 if isinstance(dist, Requirement): 

547 dist = get_provider(dist) 

548 if not isinstance(dist, Distribution): 

549 raise TypeError("Expected str, Requirement, or Distribution", dist) 

550 return dist 

551 

552 

553def load_entry_point(dist: _EPDistType, group: str, name: str) -> _ResolvedEntryPoint: 

554 """Return `name` entry point of `group` for `dist` or raise ImportError""" 

555 return get_distribution(dist).load_entry_point(group, name) 

556 

557 

558@overload 

559def get_entry_map( 

560 dist: _EPDistType, group: None = None 

561) -> dict[str, dict[str, EntryPoint]]: ... 

562@overload 

563def get_entry_map(dist: _EPDistType, group: str) -> dict[str, EntryPoint]: ... 

564def get_entry_map(dist: _EPDistType, group: str | None = None): 

565 """Return the entry point map for `group`, or the full entry map""" 

566 return get_distribution(dist).get_entry_map(group) 

567 

568 

569def get_entry_info(dist: _EPDistType, group: str, name: str) -> EntryPoint | None: 

570 """Return the EntryPoint object for `group`+`name`, or ``None``""" 

571 return get_distribution(dist).get_entry_info(group, name) 

572 

573 

574class IMetadataProvider(Protocol): 

575 def has_metadata(self, name: str) -> bool: 

576 """Does the package's distribution contain the named metadata?""" 

577 

578 def get_metadata(self, name: str) -> str: 

579 """The named metadata resource as a string""" 

580 

581 def get_metadata_lines(self, name: str) -> Iterator[str]: 

582 """Yield named metadata resource as list of non-blank non-comment lines 

583 

584 Leading and trailing whitespace is stripped from each line, and lines 

585 with ``#`` as the first non-blank character are omitted.""" 

586 

587 def metadata_isdir(self, name: str) -> bool: 

588 """Is the named metadata a directory? (like ``os.path.isdir()``)""" 

589 

590 def metadata_listdir(self, name: str) -> list[str]: 

591 """List of metadata names in the directory (like ``os.listdir()``)""" 

592 

593 def run_script(self, script_name: str, namespace: dict[str, Any]) -> None: 

594 """Execute the named script in the supplied namespace dictionary""" 

595 

596 

597class IResourceProvider(IMetadataProvider, Protocol): 

598 """An object that provides access to package resources""" 

599 

600 def get_resource_filename( 

601 self, manager: ResourceManager, resource_name: str 

602 ) -> str: 

603 """Return a true filesystem path for `resource_name` 

604 

605 `manager` must be a ``ResourceManager``""" 

606 

607 def get_resource_stream( 

608 self, manager: ResourceManager, resource_name: str 

609 ) -> _ResourceStream: 

610 """Return a readable file-like object for `resource_name` 

611 

612 `manager` must be a ``ResourceManager``""" 

613 

614 def get_resource_string( 

615 self, manager: ResourceManager, resource_name: str 

616 ) -> bytes: 

617 """Return the contents of `resource_name` as :obj:`bytes` 

618 

619 `manager` must be a ``ResourceManager``""" 

620 

621 def has_resource(self, resource_name: str) -> bool: 

622 """Does the package contain the named resource?""" 

623 

624 def resource_isdir(self, resource_name: str) -> bool: 

625 """Is the named resource a directory? (like ``os.path.isdir()``)""" 

626 

627 def resource_listdir(self, resource_name: str) -> list[str]: 

628 """List of resource names in the directory (like ``os.listdir()``)""" 

629 

630 

631class WorkingSet: 

632 """A collection of active distributions on sys.path (or a similar list)""" 

633 

634 def __init__(self, entries: Iterable[str] | None = None): 

635 """Create working set from list of path entries (default=sys.path)""" 

636 self.entries: list[str] = [] 

637 self.entry_keys = {} 

638 self.by_key = {} 

639 self.normalized_to_canonical_keys = {} 

640 self.callbacks = [] 

641 

642 if entries is None: 

643 entries = sys.path 

644 

645 for entry in entries: 

646 self.add_entry(entry) 

647 

648 @classmethod 

649 def _build_master(cls): 

650 """ 

651 Prepare the master working set. 

652 """ 

653 ws = cls() 

654 try: 

655 from __main__ import __requires__ 

656 except ImportError: 

657 # The main program does not list any requirements 

658 return ws 

659 

660 # ensure the requirements are met 

661 try: 

662 ws.require(__requires__) 

663 except VersionConflict: 

664 return cls._build_from_requirements(__requires__) 

665 

666 return ws 

667 

668 @classmethod 

669 def _build_from_requirements(cls, req_spec): 

670 """ 

671 Build a working set from a requirement spec. Rewrites sys.path. 

672 """ 

673 # try it without defaults already on sys.path 

674 # by starting with an empty path 

675 ws = cls([]) 

676 reqs = parse_requirements(req_spec) 

677 dists = ws.resolve(reqs, Environment()) 

678 for dist in dists: 

679 ws.add(dist) 

680 

681 # add any missing entries from sys.path 

682 for entry in sys.path: 

683 if entry not in ws.entries: 

684 ws.add_entry(entry) 

685 

686 # then copy back to sys.path 

687 sys.path[:] = ws.entries 

688 return ws 

689 

690 def add_entry(self, entry: str) -> None: 

691 """Add a path item to ``.entries``, finding any distributions on it 

692 

693 ``find_distributions(entry, True)`` is used to find distributions 

694 corresponding to the path entry, and they are added. `entry` is 

695 always appended to ``.entries``, even if it is already present. 

696 (This is because ``sys.path`` can contain the same value more than 

697 once, and the ``.entries`` of the ``sys.path`` WorkingSet should always 

698 equal ``sys.path``.) 

699 """ 

700 self.entry_keys.setdefault(entry, []) 

701 self.entries.append(entry) 

702 for dist in find_distributions(entry, True): 

703 self.add(dist, entry, False) 

704 

705 def __contains__(self, dist: Distribution) -> bool: 

706 """True if `dist` is the active distribution for its project""" 

707 return self.by_key.get(dist.key) == dist 

708 

709 def find(self, req: Requirement) -> Distribution | None: 

710 """Find a distribution matching requirement `req` 

711 

712 If there is an active distribution for the requested project, this 

713 returns it as long as it meets the version requirement specified by 

714 `req`. But, if there is an active distribution for the project and it 

715 does *not* meet the `req` requirement, ``VersionConflict`` is raised. 

716 If there is no active distribution for the requested project, ``None`` 

717 is returned. 

718 """ 

719 dist = self.by_key.get(req.key) 

720 

721 if dist is None: 

722 canonical_key = self.normalized_to_canonical_keys.get(req.key) 

723 

724 if canonical_key is not None: 

725 req.key = canonical_key 

726 dist = self.by_key.get(canonical_key) 

727 

728 if dist is not None and dist not in req: 

729 # XXX add more info 

730 raise VersionConflict(dist, req) 

731 return dist 

732 

733 def iter_entry_points( 

734 self, group: str, name: str | None = None 

735 ) -> Iterator[EntryPoint]: 

736 """Yield entry point objects from `group` matching `name` 

737 

738 If `name` is None, yields all entry points in `group` from all 

739 distributions in the working set, otherwise only ones matching 

740 both `group` and `name` are yielded (in distribution order). 

741 """ 

742 return ( 

743 entry 

744 for dist in self 

745 for entry in dist.get_entry_map(group).values() 

746 if name is None or name == entry.name 

747 ) 

748 

749 def run_script(self, requires: str, script_name: str) -> None: 

750 """Locate distribution for `requires` and run `script_name` script""" 

751 ns = sys._getframe(1).f_globals 

752 name = ns['__name__'] 

753 ns.clear() 

754 ns['__name__'] = name 

755 self.require(requires)[0].run_script(script_name, ns) 

756 

757 def __iter__(self) -> Iterator[Distribution]: 

758 """Yield distributions for non-duplicate projects in the working set 

759 

760 The yield order is the order in which the items' path entries were 

761 added to the working set. 

762 """ 

763 seen = set() 

764 for item in self.entries: 

765 if item not in self.entry_keys: 

766 # workaround a cache issue 

767 continue 

768 

769 for key in self.entry_keys[item]: 

770 if key not in seen: 

771 seen.add(key) 

772 yield self.by_key[key] 

773 

774 def add( 

775 self, 

776 dist: Distribution, 

777 entry: str | None = None, 

778 insert: bool = True, 

779 replace: bool = False, 

780 ) -> None: 

781 """Add `dist` to working set, associated with `entry` 

782 

783 If `entry` is unspecified, it defaults to the ``.location`` of `dist`. 

784 On exit from this routine, `entry` is added to the end of the working 

785 set's ``.entries`` (if it wasn't already present). 

786 

787 `dist` is only added to the working set if it's for a project that 

788 doesn't already have a distribution in the set, unless `replace=True`. 

789 If it's added, any callbacks registered with the ``subscribe()`` method 

790 will be called. 

791 """ 

792 if insert: 

793 dist.insert_on(self.entries, entry, replace=replace) 

794 

795 if entry is None: 

796 entry = dist.location 

797 keys = self.entry_keys.setdefault(entry, []) 

798 keys2 = self.entry_keys.setdefault(dist.location, []) 

799 if not replace and dist.key in self.by_key: 

800 # ignore hidden distros 

801 return 

802 

803 self.by_key[dist.key] = dist 

804 normalized_name = _packaging_utils.canonicalize_name(dist.key) 

805 self.normalized_to_canonical_keys[normalized_name] = dist.key 

806 if dist.key not in keys: 

807 keys.append(dist.key) 

808 if dist.key not in keys2: 

809 keys2.append(dist.key) 

810 self._added_new(dist) 

811 

812 @overload 

813 def resolve( 

814 self, 

815 requirements: Iterable[Requirement], 

816 env: Environment | None, 

817 installer: _InstallerTypeT[_DistributionT], 

818 replace_conflicting: bool = False, 

819 extras: tuple[str, ...] | None = None, 

820 ) -> list[_DistributionT]: ... 

821 @overload 

822 def resolve( 

823 self, 

824 requirements: Iterable[Requirement], 

825 env: Environment | None = None, 

826 *, 

827 installer: _InstallerTypeT[_DistributionT], 

828 replace_conflicting: bool = False, 

829 extras: tuple[str, ...] | None = None, 

830 ) -> list[_DistributionT]: ... 

831 @overload 

832 def resolve( 

833 self, 

834 requirements: Iterable[Requirement], 

835 env: Environment | None = None, 

836 installer: _InstallerType | None = None, 

837 replace_conflicting: bool = False, 

838 extras: tuple[str, ...] | None = None, 

839 ) -> list[Distribution]: ... 

840 def resolve( 

841 self, 

842 requirements: Iterable[Requirement], 

843 env: Environment | None = None, 

844 installer: _InstallerType | None | _InstallerTypeT[_DistributionT] = None, 

845 replace_conflicting: bool = False, 

846 extras: tuple[str, ...] | None = None, 

847 ) -> list[Distribution] | list[_DistributionT]: 

848 """List all distributions needed to (recursively) meet `requirements` 

849 

850 `requirements` must be a sequence of ``Requirement`` objects. `env`, 

851 if supplied, should be an ``Environment`` instance. If 

852 not supplied, it defaults to all distributions available within any 

853 entry or distribution in the working set. `installer`, if supplied, 

854 will be invoked with each requirement that cannot be met by an 

855 already-installed distribution; it should return a ``Distribution`` or 

856 ``None``. 

857 

858 Unless `replace_conflicting=True`, raises a VersionConflict exception 

859 if 

860 any requirements are found on the path that have the correct name but 

861 the wrong version. Otherwise, if an `installer` is supplied it will be 

862 invoked to obtain the correct version of the requirement and activate 

863 it. 

864 

865 `extras` is a list of the extras to be used with these requirements. 

866 This is important because extra requirements may look like `my_req; 

867 extra = "my_extra"`, which would otherwise be interpreted as a purely 

868 optional requirement. Instead, we want to be able to assert that these 

869 requirements are truly required. 

870 """ 

871 

872 # set up the stack 

873 requirements = list(requirements)[::-1] 

874 # set of processed requirements 

875 processed = set() 

876 # key -> dist 

877 best = {} 

878 to_activate = [] 

879 

880 req_extras = _ReqExtras() 

881 

882 # Mapping of requirement to set of distributions that required it; 

883 # useful for reporting info about conflicts. 

884 required_by = collections.defaultdict(set) 

885 

886 while requirements: 

887 # process dependencies breadth-first 

888 req = requirements.pop(0) 

889 if req in processed: 

890 # Ignore cyclic or redundant dependencies 

891 continue 

892 

893 if not req_extras.markers_pass(req, extras): 

894 continue 

895 

896 dist = self._resolve_dist( 

897 req, best, replace_conflicting, env, installer, required_by, to_activate 

898 ) 

899 

900 # push the new requirements onto the stack 

901 new_requirements = dist.requires(req.extras)[::-1] 

902 requirements.extend(new_requirements) 

903 

904 # Register the new requirements needed by req 

905 for new_requirement in new_requirements: 

906 required_by[new_requirement].add(req.project_name) 

907 req_extras[new_requirement] = req.extras 

908 

909 processed.add(req) 

910 

911 # return list of distros to activate 

912 return to_activate 

913 

914 def _resolve_dist( 

915 self, req, best, replace_conflicting, env, installer, required_by, to_activate 

916 ) -> Distribution: 

917 dist = best.get(req.key) 

918 if dist is None: 

919 # Find the best distribution and add it to the map 

920 dist = self.by_key.get(req.key) 

921 if dist is None or (dist not in req and replace_conflicting): 

922 ws = self 

923 if env is None: 

924 if dist is None: 

925 env = Environment(self.entries) 

926 else: 

927 # Use an empty environment and workingset to avoid 

928 # any further conflicts with the conflicting 

929 # distribution 

930 env = Environment([]) 

931 ws = WorkingSet([]) 

932 dist = best[req.key] = env.best_match( 

933 req, ws, installer, replace_conflicting=replace_conflicting 

934 ) 

935 if dist is None: 

936 requirers = required_by.get(req, None) 

937 raise DistributionNotFound(req, requirers) 

938 to_activate.append(dist) 

939 if dist not in req: 

940 # Oops, the "best" so far conflicts with a dependency 

941 dependent_req = required_by[req] 

942 raise VersionConflict(dist, req).with_context(dependent_req) 

943 return dist 

944 

945 @overload 

946 def find_plugins( 

947 self, 

948 plugin_env: Environment, 

949 full_env: Environment | None, 

950 installer: _InstallerTypeT[_DistributionT], 

951 fallback: bool = True, 

952 ) -> tuple[list[_DistributionT], dict[Distribution, Exception]]: ... 

953 @overload 

954 def find_plugins( 

955 self, 

956 plugin_env: Environment, 

957 full_env: Environment | None = None, 

958 *, 

959 installer: _InstallerTypeT[_DistributionT], 

960 fallback: bool = True, 

961 ) -> tuple[list[_DistributionT], dict[Distribution, Exception]]: ... 

962 @overload 

963 def find_plugins( 

964 self, 

965 plugin_env: Environment, 

966 full_env: Environment | None = None, 

967 installer: _InstallerType | None = None, 

968 fallback: bool = True, 

969 ) -> tuple[list[Distribution], dict[Distribution, Exception]]: ... 

970 def find_plugins( 

971 self, 

972 plugin_env: Environment, 

973 full_env: Environment | None = None, 

974 installer: _InstallerType | None | _InstallerTypeT[_DistributionT] = None, 

975 fallback: bool = True, 

976 ) -> tuple[ 

977 list[Distribution] | list[_DistributionT], 

978 dict[Distribution, Exception], 

979 ]: 

980 """Find all activatable distributions in `plugin_env` 

981 

982 Example usage:: 

983 

984 distributions, errors = working_set.find_plugins( 

985 Environment(plugin_dirlist) 

986 ) 

987 # add plugins+libs to sys.path 

988 map(working_set.add, distributions) 

989 # display errors 

990 print('Could not load', errors) 

991 

992 The `plugin_env` should be an ``Environment`` instance that contains 

993 only distributions that are in the project's "plugin directory" or 

994 directories. The `full_env`, if supplied, should be an ``Environment`` 

995 contains all currently-available distributions. If `full_env` is not 

996 supplied, one is created automatically from the ``WorkingSet`` this 

997 method is called on, which will typically mean that every directory on 

998 ``sys.path`` will be scanned for distributions. 

999 

1000 `installer` is a standard installer callback as used by the 

1001 ``resolve()`` method. The `fallback` flag indicates whether we should 

1002 attempt to resolve older versions of a plugin if the newest version 

1003 cannot be resolved. 

1004 

1005 This method returns a 2-tuple: (`distributions`, `error_info`), where 

1006 `distributions` is a list of the distributions found in `plugin_env` 

1007 that were loadable, along with any other distributions that are needed 

1008 to resolve their dependencies. `error_info` is a dictionary mapping 

1009 unloadable plugin distributions to an exception instance describing the 

1010 error that occurred. Usually this will be a ``DistributionNotFound`` or 

1011 ``VersionConflict`` instance. 

1012 """ 

1013 

1014 plugin_projects = list(plugin_env) 

1015 # scan project names in alphabetic order 

1016 plugin_projects.sort() 

1017 

1018 error_info: dict[Distribution, Exception] = {} 

1019 distributions: dict[Distribution, Exception | None] = {} 

1020 

1021 if full_env is None: 

1022 env = Environment(self.entries) 

1023 env += plugin_env 

1024 else: 

1025 env = full_env + plugin_env 

1026 

1027 shadow_set = self.__class__([]) 

1028 # put all our entries in shadow_set 

1029 list(map(shadow_set.add, self)) 

1030 

1031 for project_name in plugin_projects: 

1032 for dist in plugin_env[project_name]: 

1033 req = [dist.as_requirement()] 

1034 

1035 try: 

1036 resolvees = shadow_set.resolve(req, env, installer) 

1037 

1038 except ResolutionError as v: 

1039 # save error info 

1040 error_info[dist] = v 

1041 if fallback: 

1042 # try the next older version of project 

1043 continue 

1044 else: 

1045 # give up on this project, keep going 

1046 break 

1047 

1048 else: 

1049 list(map(shadow_set.add, resolvees)) 

1050 distributions.update(dict.fromkeys(resolvees)) 

1051 

1052 # success, no need to try any more versions of this project 

1053 break 

1054 

1055 sorted_distributions = list(distributions) 

1056 sorted_distributions.sort() 

1057 

1058 return sorted_distributions, error_info 

1059 

1060 def require(self, *requirements: _NestedStr) -> list[Distribution]: 

1061 """Ensure that distributions matching `requirements` are activated 

1062 

1063 `requirements` must be a string or a (possibly-nested) sequence 

1064 thereof, specifying the distributions and versions required. The 

1065 return value is a sequence of the distributions that needed to be 

1066 activated to fulfill the requirements; all relevant distributions are 

1067 included, even if they were already activated in this working set. 

1068 """ 

1069 needed = self.resolve(parse_requirements(requirements)) 

1070 

1071 for dist in needed: 

1072 self.add(dist) 

1073 

1074 return needed 

1075 

1076 def subscribe( 

1077 self, callback: Callable[[Distribution], object], existing: bool = True 

1078 ) -> None: 

1079 """Invoke `callback` for all distributions 

1080 

1081 If `existing=True` (default), 

1082 call on all existing ones, as well. 

1083 """ 

1084 if callback in self.callbacks: 

1085 return 

1086 self.callbacks.append(callback) 

1087 if not existing: 

1088 return 

1089 for dist in self: 

1090 callback(dist) 

1091 

1092 def _added_new(self, dist): 

1093 for callback in self.callbacks: 

1094 callback(dist) 

1095 

1096 def __getstate__(self): 

1097 return ( 

1098 self.entries[:], 

1099 self.entry_keys.copy(), 

1100 self.by_key.copy(), 

1101 self.normalized_to_canonical_keys.copy(), 

1102 self.callbacks[:], 

1103 ) 

1104 

1105 def __setstate__(self, e_k_b_n_c): 

1106 entries, keys, by_key, normalized_to_canonical_keys, callbacks = e_k_b_n_c 

1107 self.entries = entries[:] 

1108 self.entry_keys = keys.copy() 

1109 self.by_key = by_key.copy() 

1110 self.normalized_to_canonical_keys = normalized_to_canonical_keys.copy() 

1111 self.callbacks = callbacks[:] 

1112 

1113 

1114class _ReqExtras(Dict["Requirement", Tuple[str, ...]]): 

1115 """ 

1116 Map each requirement to the extras that demanded it. 

1117 """ 

1118 

1119 def markers_pass(self, req: Requirement, extras: tuple[str, ...] | None = None): 

1120 """ 

1121 Evaluate markers for req against each extra that 

1122 demanded it. 

1123 

1124 Return False if the req has a marker and fails 

1125 evaluation. Otherwise, return True. 

1126 """ 

1127 return not req.marker or any( 

1128 req.marker.evaluate({'extra': extra}) 

1129 for extra in self.get(req, ()) + (extras or ("",)) 

1130 ) 

1131 

1132 

1133class Environment: 

1134 """Searchable snapshot of distributions on a search path""" 

1135 

1136 def __init__( 

1137 self, 

1138 search_path: Iterable[str] | None = None, 

1139 platform: str | None = get_supported_platform(), 

1140 python: str | None = PY_MAJOR, 

1141 ): 

1142 """Snapshot distributions available on a search path 

1143 

1144 Any distributions found on `search_path` are added to the environment. 

1145 `search_path` should be a sequence of ``sys.path`` items. If not 

1146 supplied, ``sys.path`` is used. 

1147 

1148 `platform` is an optional string specifying the name of the platform 

1149 that platform-specific distributions must be compatible with. If 

1150 unspecified, it defaults to the current platform. `python` is an 

1151 optional string naming the desired version of Python (e.g. ``'3.6'``); 

1152 it defaults to the current version. 

1153 

1154 You may explicitly set `platform` (and/or `python`) to ``None`` if you 

1155 wish to map *all* distributions, not just those compatible with the 

1156 running platform or Python version. 

1157 """ 

1158 self._distmap = {} 

1159 self.platform = platform 

1160 self.python = python 

1161 self.scan(search_path) 

1162 

1163 def can_add(self, dist: Distribution) -> bool: 

1164 """Is distribution `dist` acceptable for this environment? 

1165 

1166 The distribution must match the platform and python version 

1167 requirements specified when this environment was created, or False 

1168 is returned. 

1169 """ 

1170 py_compat = ( 

1171 self.python is None 

1172 or dist.py_version is None 

1173 or dist.py_version == self.python 

1174 ) 

1175 return py_compat and compatible_platforms(dist.platform, self.platform) 

1176 

1177 def remove(self, dist: Distribution) -> None: 

1178 """Remove `dist` from the environment""" 

1179 self._distmap[dist.key].remove(dist) 

1180 

1181 def scan(self, search_path: Iterable[str] | None = None) -> None: 

1182 """Scan `search_path` for distributions usable in this environment 

1183 

1184 Any distributions found are added to the environment. 

1185 `search_path` should be a sequence of ``sys.path`` items. If not 

1186 supplied, ``sys.path`` is used. Only distributions conforming to 

1187 the platform/python version defined at initialization are added. 

1188 """ 

1189 if search_path is None: 

1190 search_path = sys.path 

1191 

1192 for item in search_path: 

1193 for dist in find_distributions(item): 

1194 self.add(dist) 

1195 

1196 def __getitem__(self, project_name: str) -> list[Distribution]: 

1197 """Return a newest-to-oldest list of distributions for `project_name` 

1198 

1199 Uses case-insensitive `project_name` comparison, assuming all the 

1200 project's distributions use their project's name converted to all 

1201 lowercase as their key. 

1202 

1203 """ 

1204 distribution_key = project_name.lower() 

1205 return self._distmap.get(distribution_key, []) 

1206 

1207 def add(self, dist: Distribution) -> None: 

1208 """Add `dist` if we ``can_add()`` it and it has not already been added""" 

1209 if self.can_add(dist) and dist.has_version(): 

1210 dists = self._distmap.setdefault(dist.key, []) 

1211 if dist not in dists: 

1212 dists.append(dist) 

1213 dists.sort(key=operator.attrgetter('hashcmp'), reverse=True) 

1214 

1215 @overload 

1216 def best_match( 

1217 self, 

1218 req: Requirement, 

1219 working_set: WorkingSet, 

1220 installer: _InstallerTypeT[_DistributionT], 

1221 replace_conflicting: bool = False, 

1222 ) -> _DistributionT: ... 

1223 @overload 

1224 def best_match( 

1225 self, 

1226 req: Requirement, 

1227 working_set: WorkingSet, 

1228 installer: _InstallerType | None = None, 

1229 replace_conflicting: bool = False, 

1230 ) -> Distribution | None: ... 

1231 def best_match( 

1232 self, 

1233 req: Requirement, 

1234 working_set: WorkingSet, 

1235 installer: _InstallerType | None | _InstallerTypeT[_DistributionT] = None, 

1236 replace_conflicting: bool = False, 

1237 ) -> Distribution | None: 

1238 """Find distribution best matching `req` and usable on `working_set` 

1239 

1240 This calls the ``find(req)`` method of the `working_set` to see if a 

1241 suitable distribution is already active. (This may raise 

1242 ``VersionConflict`` if an unsuitable version of the project is already 

1243 active in the specified `working_set`.) If a suitable distribution 

1244 isn't active, this method returns the newest distribution in the 

1245 environment that meets the ``Requirement`` in `req`. If no suitable 

1246 distribution is found, and `installer` is supplied, then the result of 

1247 calling the environment's ``obtain(req, installer)`` method will be 

1248 returned. 

1249 """ 

1250 try: 

1251 dist = working_set.find(req) 

1252 except VersionConflict: 

1253 if not replace_conflicting: 

1254 raise 

1255 dist = None 

1256 if dist is not None: 

1257 return dist 

1258 for dist in self[req.key]: 

1259 if dist in req: 

1260 return dist 

1261 # try to download/install 

1262 return self.obtain(req, installer) 

1263 

1264 @overload 

1265 def obtain( 

1266 self, 

1267 requirement: Requirement, 

1268 installer: _InstallerTypeT[_DistributionT], 

1269 ) -> _DistributionT: ... 

1270 @overload 

1271 def obtain( 

1272 self, 

1273 requirement: Requirement, 

1274 installer: Callable[[Requirement], None] | None = None, 

1275 ) -> None: ... 

1276 @overload 

1277 def obtain( 

1278 self, 

1279 requirement: Requirement, 

1280 installer: _InstallerType | None = None, 

1281 ) -> Distribution | None: ... 

1282 def obtain( 

1283 self, 

1284 requirement: Requirement, 

1285 installer: Callable[[Requirement], None] 

1286 | _InstallerType 

1287 | None 

1288 | _InstallerTypeT[_DistributionT] = None, 

1289 ) -> Distribution | None: 

1290 """Obtain a distribution matching `requirement` (e.g. via download) 

1291 

1292 Obtain a distro that matches requirement (e.g. via download). In the 

1293 base ``Environment`` class, this routine just returns 

1294 ``installer(requirement)``, unless `installer` is None, in which case 

1295 None is returned instead. This method is a hook that allows subclasses 

1296 to attempt other ways of obtaining a distribution before falling back 

1297 to the `installer` argument.""" 

1298 return installer(requirement) if installer else None 

1299 

1300 def __iter__(self) -> Iterator[str]: 

1301 """Yield the unique project names of the available distributions""" 

1302 for key in self._distmap.keys(): 

1303 if self[key]: 

1304 yield key 

1305 

1306 def __iadd__(self, other: Distribution | Environment): 

1307 """In-place addition of a distribution or environment""" 

1308 if isinstance(other, Distribution): 

1309 self.add(other) 

1310 elif isinstance(other, Environment): 

1311 for project in other: 

1312 for dist in other[project]: 

1313 self.add(dist) 

1314 else: 

1315 raise TypeError("Can't add %r to environment" % (other,)) 

1316 return self 

1317 

1318 def __add__(self, other: Distribution | Environment): 

1319 """Add an environment or distribution to an environment""" 

1320 new = self.__class__([], platform=None, python=None) 

1321 for env in self, other: 

1322 new += env 

1323 return new 

1324 

1325 

1326# XXX backward compatibility 

1327AvailableDistributions = Environment 

1328 

1329 

1330class ExtractionError(RuntimeError): 

1331 """An error occurred extracting a resource 

1332 

1333 The following attributes are available from instances of this exception: 

1334 

1335 manager 

1336 The resource manager that raised this exception 

1337 

1338 cache_path 

1339 The base directory for resource extraction 

1340 

1341 original_error 

1342 The exception instance that caused extraction to fail 

1343 """ 

1344 

1345 manager: ResourceManager 

1346 cache_path: str 

1347 original_error: BaseException | None 

1348 

1349 

1350class ResourceManager: 

1351 """Manage resource extraction and packages""" 

1352 

1353 extraction_path: str | None = None 

1354 

1355 def __init__(self): 

1356 self.cached_files = {} 

1357 

1358 def resource_exists( 

1359 self, package_or_requirement: _PkgReqType, resource_name: str 

1360 ) -> bool: 

1361 """Does the named resource exist?""" 

1362 return get_provider(package_or_requirement).has_resource(resource_name) 

1363 

1364 def resource_isdir( 

1365 self, package_or_requirement: _PkgReqType, resource_name: str 

1366 ) -> bool: 

1367 """Is the named resource an existing directory?""" 

1368 return get_provider(package_or_requirement).resource_isdir(resource_name) 

1369 

1370 def resource_filename( 

1371 self, package_or_requirement: _PkgReqType, resource_name: str 

1372 ) -> str: 

1373 """Return a true filesystem path for specified resource""" 

1374 return get_provider(package_or_requirement).get_resource_filename( 

1375 self, resource_name 

1376 ) 

1377 

1378 def resource_stream( 

1379 self, package_or_requirement: _PkgReqType, resource_name: str 

1380 ) -> _ResourceStream: 

1381 """Return a readable file-like object for specified resource""" 

1382 return get_provider(package_or_requirement).get_resource_stream( 

1383 self, resource_name 

1384 ) 

1385 

1386 def resource_string( 

1387 self, package_or_requirement: _PkgReqType, resource_name: str 

1388 ) -> bytes: 

1389 """Return specified resource as :obj:`bytes`""" 

1390 return get_provider(package_or_requirement).get_resource_string( 

1391 self, resource_name 

1392 ) 

1393 

1394 def resource_listdir( 

1395 self, package_or_requirement: _PkgReqType, resource_name: str 

1396 ) -> list[str]: 

1397 """List the contents of the named resource directory""" 

1398 return get_provider(package_or_requirement).resource_listdir(resource_name) 

1399 

1400 def extraction_error(self) -> NoReturn: 

1401 """Give an error message for problems extracting file(s)""" 

1402 

1403 old_exc = sys.exc_info()[1] 

1404 cache_path = self.extraction_path or get_default_cache() 

1405 

1406 tmpl = textwrap.dedent( 

1407 """ 

1408 Can't extract file(s) to egg cache 

1409 

1410 The following error occurred while trying to extract file(s) 

1411 to the Python egg cache: 

1412 

1413 {old_exc} 

1414 

1415 The Python egg cache directory is currently set to: 

1416 

1417 {cache_path} 

1418 

1419 Perhaps your account does not have write access to this directory? 

1420 You can change the cache directory by setting the PYTHON_EGG_CACHE 

1421 environment variable to point to an accessible directory. 

1422 """ 

1423 ).lstrip() 

1424 err = ExtractionError(tmpl.format(**locals())) 

1425 err.manager = self 

1426 err.cache_path = cache_path 

1427 err.original_error = old_exc 

1428 raise err 

1429 

1430 def get_cache_path(self, archive_name: str, names: Iterable[StrPath] = ()) -> str: 

1431 """Return absolute location in cache for `archive_name` and `names` 

1432 

1433 The parent directory of the resulting path will be created if it does 

1434 not already exist. `archive_name` should be the base filename of the 

1435 enclosing egg (which may not be the name of the enclosing zipfile!), 

1436 including its ".egg" extension. `names`, if provided, should be a 

1437 sequence of path name parts "under" the egg's extraction location. 

1438 

1439 This method should only be called by resource providers that need to 

1440 obtain an extraction location, and only for names they intend to 

1441 extract, as it tracks the generated names for possible cleanup later. 

1442 """ 

1443 extract_path = self.extraction_path or get_default_cache() 

1444 target_path = os.path.join(extract_path, archive_name + '-tmp', *names) 

1445 try: 

1446 _bypass_ensure_directory(target_path) 

1447 except Exception: 

1448 self.extraction_error() 

1449 

1450 self._warn_unsafe_extraction_path(extract_path) 

1451 

1452 self.cached_files[target_path] = True 

1453 return target_path 

1454 

1455 @staticmethod 

1456 def _warn_unsafe_extraction_path(path): 

1457 """ 

1458 If the default extraction path is overridden and set to an insecure 

1459 location, such as /tmp, it opens up an opportunity for an attacker to 

1460 replace an extracted file with an unauthorized payload. Warn the user 

1461 if a known insecure location is used. 

1462 

1463 See Distribute #375 for more details. 

1464 """ 

1465 if os.name == 'nt' and not path.startswith(os.environ['windir']): 

1466 # On Windows, permissions are generally restrictive by default 

1467 # and temp directories are not writable by other users, so 

1468 # bypass the warning. 

1469 return 

1470 mode = os.stat(path).st_mode 

1471 if mode & stat.S_IWOTH or mode & stat.S_IWGRP: 

1472 msg = ( 

1473 "Extraction path is writable by group/others " 

1474 "and vulnerable to attack when " 

1475 "used with get_resource_filename ({path}). " 

1476 "Consider a more secure " 

1477 "location (set with .set_extraction_path or the " 

1478 "PYTHON_EGG_CACHE environment variable)." 

1479 ).format(**locals()) 

1480 warnings.warn(msg, UserWarning) 

1481 

1482 def postprocess(self, tempname: StrOrBytesPath, filename: StrOrBytesPath) -> None: 

1483 """Perform any platform-specific postprocessing of `tempname` 

1484 

1485 This is where Mac header rewrites should be done; other platforms don't 

1486 have anything special they should do. 

1487 

1488 Resource providers should call this method ONLY after successfully 

1489 extracting a compressed resource. They must NOT call it on resources 

1490 that are already in the filesystem. 

1491 

1492 `tempname` is the current (temporary) name of the file, and `filename` 

1493 is the name it will be renamed to by the caller after this routine 

1494 returns. 

1495 """ 

1496 

1497 if os.name == 'posix': 

1498 # Make the resource executable 

1499 mode = ((os.stat(tempname).st_mode) | 0o555) & 0o7777 

1500 os.chmod(tempname, mode) 

1501 

1502 def set_extraction_path(self, path: str) -> None: 

1503 """Set the base path where resources will be extracted to, if needed. 

1504 

1505 If you do not call this routine before any extractions take place, the 

1506 path defaults to the return value of ``get_default_cache()``. (Which 

1507 is based on the ``PYTHON_EGG_CACHE`` environment variable, with various 

1508 platform-specific fallbacks. See that routine's documentation for more 

1509 details.) 

1510 

1511 Resources are extracted to subdirectories of this path based upon 

1512 information given by the ``IResourceProvider``. You may set this to a 

1513 temporary directory, but then you must call ``cleanup_resources()`` to 

1514 delete the extracted files when done. There is no guarantee that 

1515 ``cleanup_resources()`` will be able to remove all extracted files. 

1516 

1517 (Note: you may not change the extraction path for a given resource 

1518 manager once resources have been extracted, unless you first call 

1519 ``cleanup_resources()``.) 

1520 """ 

1521 if self.cached_files: 

1522 raise ValueError("Can't change extraction path, files already extracted") 

1523 

1524 self.extraction_path = path 

1525 

1526 def cleanup_resources(self, force: bool = False) -> list[str]: 

1527 """ 

1528 Delete all extracted resource files and directories, returning a list 

1529 of the file and directory names that could not be successfully removed. 

1530 This function does not have any concurrency protection, so it should 

1531 generally only be called when the extraction path is a temporary 

1532 directory exclusive to a single process. This method is not 

1533 automatically called; you must call it explicitly or register it as an 

1534 ``atexit`` function if you wish to ensure cleanup of a temporary 

1535 directory used for extractions. 

1536 """ 

1537 # XXX 

1538 return [] 

1539 

1540 

1541def get_default_cache() -> str: 

1542 """ 

1543 Return the ``PYTHON_EGG_CACHE`` environment variable 

1544 or a platform-relevant user cache dir for an app 

1545 named "Python-Eggs". 

1546 """ 

1547 return os.environ.get('PYTHON_EGG_CACHE') or _user_cache_dir(appname='Python-Eggs') 

1548 

1549 

1550def safe_name(name: str) -> str: 

1551 """Convert an arbitrary string to a standard distribution name 

1552 

1553 Any runs of non-alphanumeric/. characters are replaced with a single '-'. 

1554 """ 

1555 return re.sub('[^A-Za-z0-9.]+', '-', name) 

1556 

1557 

1558def safe_version(version: str) -> str: 

1559 """ 

1560 Convert an arbitrary string to a standard version string 

1561 """ 

1562 try: 

1563 # normalize the version 

1564 return str(_packaging_version.Version(version)) 

1565 except _packaging_version.InvalidVersion: 

1566 version = version.replace(' ', '.') 

1567 return re.sub('[^A-Za-z0-9.]+', '-', version) 

1568 

1569 

1570def _forgiving_version(version): 

1571 """Fallback when ``safe_version`` is not safe enough 

1572 >>> parse_version(_forgiving_version('0.23ubuntu1')) 

1573 <Version('0.23.dev0+sanitized.ubuntu1')> 

1574 >>> parse_version(_forgiving_version('0.23-')) 

1575 <Version('0.23.dev0+sanitized')> 

1576 >>> parse_version(_forgiving_version('0.-_')) 

1577 <Version('0.dev0+sanitized')> 

1578 >>> parse_version(_forgiving_version('42.+?1')) 

1579 <Version('42.dev0+sanitized.1')> 

1580 >>> parse_version(_forgiving_version('hello world')) 

1581 <Version('0.dev0+sanitized.hello.world')> 

1582 """ 

1583 version = version.replace(' ', '.') 

1584 match = _PEP440_FALLBACK.search(version) 

1585 if match: 

1586 safe = match["safe"] 

1587 rest = version[len(safe) :] 

1588 else: 

1589 safe = "0" 

1590 rest = version 

1591 local = f"sanitized.{_safe_segment(rest)}".strip(".") 

1592 return f"{safe}.dev0+{local}" 

1593 

1594 

1595def _safe_segment(segment): 

1596 """Convert an arbitrary string into a safe segment""" 

1597 segment = re.sub('[^A-Za-z0-9.]+', '-', segment) 

1598 segment = re.sub('-[^A-Za-z0-9]+', '-', segment) 

1599 return re.sub(r'\.[^A-Za-z0-9]+', '.', segment).strip(".-") 

1600 

1601 

1602def safe_extra(extra: str) -> str: 

1603 """Convert an arbitrary string to a standard 'extra' name 

1604 

1605 Any runs of non-alphanumeric characters are replaced with a single '_', 

1606 and the result is always lowercased. 

1607 """ 

1608 return re.sub('[^A-Za-z0-9.-]+', '_', extra).lower() 

1609 

1610 

1611def to_filename(name: str) -> str: 

1612 """Convert a project or version name to its filename-escaped form 

1613 

1614 Any '-' characters are currently replaced with '_'. 

1615 """ 

1616 return name.replace('-', '_') 

1617 

1618 

1619def invalid_marker(text: str) -> SyntaxError | Literal[False]: 

1620 """ 

1621 Validate text as a PEP 508 environment marker; return an exception 

1622 if invalid or False otherwise. 

1623 """ 

1624 try: 

1625 evaluate_marker(text) 

1626 except SyntaxError as e: 

1627 e.filename = None 

1628 e.lineno = None 

1629 return e 

1630 return False 

1631 

1632 

1633def evaluate_marker(text: str, extra: str | None = None) -> bool: 

1634 """ 

1635 Evaluate a PEP 508 environment marker. 

1636 Return a boolean indicating the marker result in this environment. 

1637 Raise SyntaxError if marker is invalid. 

1638 

1639 This implementation uses the 'pyparsing' module. 

1640 """ 

1641 try: 

1642 marker = _packaging_markers.Marker(text) 

1643 return marker.evaluate() 

1644 except _packaging_markers.InvalidMarker as e: 

1645 raise SyntaxError(e) from e 

1646 

1647 

1648class NullProvider: 

1649 """Try to implement resources and metadata for arbitrary PEP 302 loaders""" 

1650 

1651 egg_name: str | None = None 

1652 egg_info: str | None = None 

1653 loader: _LoaderProtocol | None = None 

1654 

1655 def __init__(self, module: _ModuleLike): 

1656 self.loader = getattr(module, '__loader__', None) 

1657 self.module_path = os.path.dirname(getattr(module, '__file__', '')) 

1658 

1659 def get_resource_filename( 

1660 self, manager: ResourceManager, resource_name: str 

1661 ) -> str: 

1662 return self._fn(self.module_path, resource_name) 

1663 

1664 def get_resource_stream( 

1665 self, manager: ResourceManager, resource_name: str 

1666 ) -> BinaryIO: 

1667 return io.BytesIO(self.get_resource_string(manager, resource_name)) 

1668 

1669 def get_resource_string( 

1670 self, manager: ResourceManager, resource_name: str 

1671 ) -> bytes: 

1672 return self._get(self._fn(self.module_path, resource_name)) 

1673 

1674 def has_resource(self, resource_name: str) -> bool: 

1675 return self._has(self._fn(self.module_path, resource_name)) 

1676 

1677 def _get_metadata_path(self, name): 

1678 return self._fn(self.egg_info, name) 

1679 

1680 def has_metadata(self, name: str) -> bool: 

1681 if not self.egg_info: 

1682 return False 

1683 

1684 path = self._get_metadata_path(name) 

1685 return self._has(path) 

1686 

1687 def get_metadata(self, name: str) -> str: 

1688 if not self.egg_info: 

1689 return "" 

1690 path = self._get_metadata_path(name) 

1691 value = self._get(path) 

1692 try: 

1693 return value.decode('utf-8') 

1694 except UnicodeDecodeError as exc: 

1695 # Include the path in the error message to simplify 

1696 # troubleshooting, and without changing the exception type. 

1697 exc.reason += ' in {} file at path: {}'.format(name, path) 

1698 raise 

1699 

1700 def get_metadata_lines(self, name: str) -> Iterator[str]: 

1701 return yield_lines(self.get_metadata(name)) 

1702 

1703 def resource_isdir(self, resource_name: str) -> bool: 

1704 return self._isdir(self._fn(self.module_path, resource_name)) 

1705 

1706 def metadata_isdir(self, name: str) -> bool: 

1707 return bool(self.egg_info and self._isdir(self._fn(self.egg_info, name))) 

1708 

1709 def resource_listdir(self, resource_name: str) -> list[str]: 

1710 return self._listdir(self._fn(self.module_path, resource_name)) 

1711 

1712 def metadata_listdir(self, name: str) -> list[str]: 

1713 if self.egg_info: 

1714 return self._listdir(self._fn(self.egg_info, name)) 

1715 return [] 

1716 

1717 def run_script(self, script_name: str, namespace: dict[str, Any]) -> None: 

1718 script = 'scripts/' + script_name 

1719 if not self.has_metadata(script): 

1720 raise ResolutionError( 

1721 "Script {script!r} not found in metadata at {self.egg_info!r}".format( 

1722 **locals() 

1723 ), 

1724 ) 

1725 

1726 script_text = self.get_metadata(script).replace('\r\n', '\n') 

1727 script_text = script_text.replace('\r', '\n') 

1728 script_filename = self._fn(self.egg_info, script) 

1729 namespace['__file__'] = script_filename 

1730 if os.path.exists(script_filename): 

1731 source = _read_utf8_with_fallback(script_filename) 

1732 code = compile(source, script_filename, 'exec') 

1733 exec(code, namespace, namespace) 

1734 else: 

1735 from linecache import cache 

1736 

1737 cache[script_filename] = ( 

1738 len(script_text), 

1739 0, 

1740 script_text.split('\n'), 

1741 script_filename, 

1742 ) 

1743 script_code = compile(script_text, script_filename, 'exec') 

1744 exec(script_code, namespace, namespace) 

1745 

1746 def _has(self, path) -> bool: 

1747 raise NotImplementedError( 

1748 "Can't perform this operation for unregistered loader type" 

1749 ) 

1750 

1751 def _isdir(self, path) -> bool: 

1752 raise NotImplementedError( 

1753 "Can't perform this operation for unregistered loader type" 

1754 ) 

1755 

1756 def _listdir(self, path) -> list[str]: 

1757 raise NotImplementedError( 

1758 "Can't perform this operation for unregistered loader type" 

1759 ) 

1760 

1761 def _fn(self, base: str | None, resource_name: str): 

1762 if base is None: 

1763 raise TypeError( 

1764 "`base` parameter in `_fn` is `None`. Either override this method or check the parameter first." 

1765 ) 

1766 self._validate_resource_path(resource_name) 

1767 if resource_name: 

1768 return os.path.join(base, *resource_name.split('/')) 

1769 return base 

1770 

1771 @staticmethod 

1772 def _validate_resource_path(path): 

1773 """ 

1774 Validate the resource paths according to the docs. 

1775 https://setuptools.pypa.io/en/latest/pkg_resources.html#basic-resource-access 

1776 

1777 >>> warned = getfixture('recwarn') 

1778 >>> warnings.simplefilter('always') 

1779 >>> vrp = NullProvider._validate_resource_path 

1780 >>> vrp('foo/bar.txt') 

1781 >>> bool(warned) 

1782 False 

1783 >>> vrp('../foo/bar.txt') 

1784 >>> bool(warned) 

1785 True 

1786 >>> warned.clear() 

1787 >>> vrp('/foo/bar.txt') 

1788 >>> bool(warned) 

1789 True 

1790 >>> vrp('foo/../../bar.txt') 

1791 >>> bool(warned) 

1792 True 

1793 >>> warned.clear() 

1794 >>> vrp('foo/f../bar.txt') 

1795 >>> bool(warned) 

1796 False 

1797 

1798 Windows path separators are straight-up disallowed. 

1799 >>> vrp(r'\\foo/bar.txt') 

1800 Traceback (most recent call last): 

1801 ... 

1802 ValueError: Use of .. or absolute path in a resource path \ 

1803is not allowed. 

1804 

1805 >>> vrp(r'C:\\foo/bar.txt') 

1806 Traceback (most recent call last): 

1807 ... 

1808 ValueError: Use of .. or absolute path in a resource path \ 

1809is not allowed. 

1810 

1811 Blank values are allowed 

1812 

1813 >>> vrp('') 

1814 >>> bool(warned) 

1815 False 

1816 

1817 Non-string values are not. 

1818 

1819 >>> vrp(None) 

1820 Traceback (most recent call last): 

1821 ... 

1822 AttributeError: ... 

1823 """ 

1824 invalid = ( 

1825 os.path.pardir in path.split(posixpath.sep) 

1826 or posixpath.isabs(path) 

1827 or ntpath.isabs(path) 

1828 or path.startswith("\\") 

1829 ) 

1830 if not invalid: 

1831 return 

1832 

1833 msg = "Use of .. or absolute path in a resource path is not allowed." 

1834 

1835 # Aggressively disallow Windows absolute paths 

1836 if (path.startswith("\\") or ntpath.isabs(path)) and not posixpath.isabs(path): 

1837 raise ValueError(msg) 

1838 

1839 # for compatibility, warn; in future 

1840 # raise ValueError(msg) 

1841 issue_warning( 

1842 msg[:-1] + " and will raise exceptions in a future release.", 

1843 DeprecationWarning, 

1844 ) 

1845 

1846 def _get(self, path) -> bytes: 

1847 if hasattr(self.loader, 'get_data') and self.loader: 

1848 # Already checked get_data exists 

1849 return self.loader.get_data(path) # type: ignore[attr-defined] 

1850 raise NotImplementedError( 

1851 "Can't perform this operation for loaders without 'get_data()'" 

1852 ) 

1853 

1854 

1855register_loader_type(object, NullProvider) 

1856 

1857 

1858def _parents(path): 

1859 """ 

1860 yield all parents of path including path 

1861 """ 

1862 last = None 

1863 while path != last: 

1864 yield path 

1865 last = path 

1866 path, _ = os.path.split(path) 

1867 

1868 

1869class EggProvider(NullProvider): 

1870 """Provider based on a virtual filesystem""" 

1871 

1872 def __init__(self, module: _ModuleLike): 

1873 super().__init__(module) 

1874 self._setup_prefix() 

1875 

1876 def _setup_prefix(self): 

1877 # Assume that metadata may be nested inside a "basket" 

1878 # of multiple eggs and use module_path instead of .archive. 

1879 eggs = filter(_is_egg_path, _parents(self.module_path)) 

1880 egg = next(eggs, None) 

1881 egg and self._set_egg(egg) 

1882 

1883 def _set_egg(self, path: str): 

1884 self.egg_name = os.path.basename(path) 

1885 self.egg_info = os.path.join(path, 'EGG-INFO') 

1886 self.egg_root = path 

1887 

1888 

1889class DefaultProvider(EggProvider): 

1890 """Provides access to package resources in the filesystem""" 

1891 

1892 def _has(self, path) -> bool: 

1893 return os.path.exists(path) 

1894 

1895 def _isdir(self, path) -> bool: 

1896 return os.path.isdir(path) 

1897 

1898 def _listdir(self, path): 

1899 return os.listdir(path) 

1900 

1901 def get_resource_stream( 

1902 self, manager: object, resource_name: str 

1903 ) -> io.BufferedReader: 

1904 return open(self._fn(self.module_path, resource_name), 'rb') 

1905 

1906 def _get(self, path) -> bytes: 

1907 with open(path, 'rb') as stream: 

1908 return stream.read() 

1909 

1910 @classmethod 

1911 def _register(cls): 

1912 loader_names = ( 

1913 'SourceFileLoader', 

1914 'SourcelessFileLoader', 

1915 ) 

1916 for name in loader_names: 

1917 loader_cls = getattr(importlib.machinery, name, type(None)) 

1918 register_loader_type(loader_cls, cls) 

1919 

1920 

1921DefaultProvider._register() 

1922 

1923 

1924class EmptyProvider(NullProvider): 

1925 """Provider that returns nothing for all requests""" 

1926 

1927 # A special case, we don't want all Providers inheriting from NullProvider to have a potentially None module_path 

1928 module_path: str | None = None # type: ignore[assignment] 

1929 

1930 _isdir = _has = lambda self, path: False 

1931 

1932 def _get(self, path) -> bytes: 

1933 return b'' 

1934 

1935 def _listdir(self, path): 

1936 return [] 

1937 

1938 def __init__(self): 

1939 pass 

1940 

1941 

1942empty_provider = EmptyProvider() 

1943 

1944 

1945class ZipManifests(Dict[str, "MemoizedZipManifests.manifest_mod"]): 

1946 """ 

1947 zip manifest builder 

1948 """ 

1949 

1950 # `path` could be `StrPath | IO[bytes]` but that violates the LSP for `MemoizedZipManifests.load` 

1951 @classmethod 

1952 def build(cls, path: str) -> dict[str, zipfile.ZipInfo]: 

1953 """ 

1954 Build a dictionary similar to the zipimport directory 

1955 caches, except instead of tuples, store ZipInfo objects. 

1956 

1957 Use a platform-specific path separator (os.sep) for the path keys 

1958 for compatibility with pypy on Windows. 

1959 """ 

1960 with zipfile.ZipFile(path) as zfile: 

1961 items = ( 

1962 ( 

1963 name.replace('/', os.sep), 

1964 zfile.getinfo(name), 

1965 ) 

1966 for name in zfile.namelist() 

1967 ) 

1968 return dict(items) 

1969 

1970 load = build 

1971 

1972 

1973class MemoizedZipManifests(ZipManifests): 

1974 """ 

1975 Memoized zipfile manifests. 

1976 """ 

1977 

1978 class manifest_mod(NamedTuple): 

1979 manifest: dict[str, zipfile.ZipInfo] 

1980 mtime: float 

1981 

1982 def load(self, path: str) -> dict[str, zipfile.ZipInfo]: # type: ignore[override] # ZipManifests.load is a classmethod 

1983 """ 

1984 Load a manifest at path or return a suitable manifest already loaded. 

1985 """ 

1986 path = os.path.normpath(path) 

1987 mtime = os.stat(path).st_mtime 

1988 

1989 if path not in self or self[path].mtime != mtime: 

1990 manifest = self.build(path) 

1991 self[path] = self.manifest_mod(manifest, mtime) 

1992 

1993 return self[path].manifest 

1994 

1995 

1996class ZipProvider(EggProvider): 

1997 """Resource support for zips and eggs""" 

1998 

1999 eagers: list[str] | None = None 

2000 _zip_manifests = MemoizedZipManifests() 

2001 # ZipProvider's loader should always be a zipimporter or equivalent 

2002 loader: zipimport.zipimporter 

2003 

2004 def __init__(self, module: _ZipLoaderModule): 

2005 super().__init__(module) 

2006 self.zip_pre = self.loader.archive + os.sep 

2007 

2008 def _zipinfo_name(self, fspath): 

2009 # Convert a virtual filename (full path to file) into a zipfile subpath 

2010 # usable with the zipimport directory cache for our target archive 

2011 fspath = fspath.rstrip(os.sep) 

2012 if fspath == self.loader.archive: 

2013 return '' 

2014 if fspath.startswith(self.zip_pre): 

2015 return fspath[len(self.zip_pre) :] 

2016 raise AssertionError("%s is not a subpath of %s" % (fspath, self.zip_pre)) 

2017 

2018 def _parts(self, zip_path): 

2019 # Convert a zipfile subpath into an egg-relative path part list. 

2020 # pseudo-fs path 

2021 fspath = self.zip_pre + zip_path 

2022 if fspath.startswith(self.egg_root + os.sep): 

2023 return fspath[len(self.egg_root) + 1 :].split(os.sep) 

2024 raise AssertionError("%s is not a subpath of %s" % (fspath, self.egg_root)) 

2025 

2026 @property 

2027 def zipinfo(self): 

2028 return self._zip_manifests.load(self.loader.archive) 

2029 

2030 def get_resource_filename( 

2031 self, manager: ResourceManager, resource_name: str 

2032 ) -> str: 

2033 if not self.egg_name: 

2034 raise NotImplementedError( 

2035 "resource_filename() only supported for .egg, not .zip" 

2036 ) 

2037 # no need to lock for extraction, since we use temp names 

2038 zip_path = self._resource_to_zip(resource_name) 

2039 eagers = self._get_eager_resources() 

2040 if '/'.join(self._parts(zip_path)) in eagers: 

2041 for name in eagers: 

2042 self._extract_resource(manager, self._eager_to_zip(name)) 

2043 return self._extract_resource(manager, zip_path) 

2044 

2045 @staticmethod 

2046 def _get_date_and_size(zip_stat): 

2047 size = zip_stat.file_size 

2048 # ymdhms+wday, yday, dst 

2049 date_time = zip_stat.date_time + (0, 0, -1) 

2050 # 1980 offset already done 

2051 timestamp = time.mktime(date_time) 

2052 return timestamp, size 

2053 

2054 # FIXME: 'ZipProvider._extract_resource' is too complex (12) 

2055 def _extract_resource(self, manager: ResourceManager, zip_path) -> str: # noqa: C901 

2056 if zip_path in self._index(): 

2057 for name in self._index()[zip_path]: 

2058 last = self._extract_resource(manager, os.path.join(zip_path, name)) 

2059 # return the extracted directory name 

2060 return os.path.dirname(last) 

2061 

2062 timestamp, size = self._get_date_and_size(self.zipinfo[zip_path]) 

2063 

2064 if not WRITE_SUPPORT: 

2065 raise OSError( 

2066 '"os.rename" and "os.unlink" are not supported on this platform' 

2067 ) 

2068 try: 

2069 if not self.egg_name: 

2070 raise OSError( 

2071 '"egg_name" is empty. This likely means no egg could be found from the "module_path".' 

2072 ) 

2073 real_path = manager.get_cache_path(self.egg_name, self._parts(zip_path)) 

2074 

2075 if self._is_current(real_path, zip_path): 

2076 return real_path 

2077 

2078 outf, tmpnam = _mkstemp( 

2079 ".$extract", 

2080 dir=os.path.dirname(real_path), 

2081 ) 

2082 os.write(outf, self.loader.get_data(zip_path)) 

2083 os.close(outf) 

2084 utime(tmpnam, (timestamp, timestamp)) 

2085 manager.postprocess(tmpnam, real_path) 

2086 

2087 try: 

2088 rename(tmpnam, real_path) 

2089 

2090 except OSError: 

2091 if os.path.isfile(real_path): 

2092 if self._is_current(real_path, zip_path): 

2093 # the file became current since it was checked above, 

2094 # so proceed. 

2095 return real_path 

2096 # Windows, del old file and retry 

2097 elif os.name == 'nt': 

2098 unlink(real_path) 

2099 rename(tmpnam, real_path) 

2100 return real_path 

2101 raise 

2102 

2103 except OSError: 

2104 # report a user-friendly error 

2105 manager.extraction_error() 

2106 

2107 return real_path 

2108 

2109 def _is_current(self, file_path, zip_path): 

2110 """ 

2111 Return True if the file_path is current for this zip_path 

2112 """ 

2113 timestamp, size = self._get_date_and_size(self.zipinfo[zip_path]) 

2114 if not os.path.isfile(file_path): 

2115 return False 

2116 stat = os.stat(file_path) 

2117 if stat.st_size != size or stat.st_mtime != timestamp: 

2118 return False 

2119 # check that the contents match 

2120 zip_contents = self.loader.get_data(zip_path) 

2121 with open(file_path, 'rb') as f: 

2122 file_contents = f.read() 

2123 return zip_contents == file_contents 

2124 

2125 def _get_eager_resources(self): 

2126 if self.eagers is None: 

2127 eagers = [] 

2128 for name in ('native_libs.txt', 'eager_resources.txt'): 

2129 if self.has_metadata(name): 

2130 eagers.extend(self.get_metadata_lines(name)) 

2131 self.eagers = eagers 

2132 return self.eagers 

2133 

2134 def _index(self): 

2135 try: 

2136 return self._dirindex 

2137 except AttributeError: 

2138 ind = {} 

2139 for path in self.zipinfo: 

2140 parts = path.split(os.sep) 

2141 while parts: 

2142 parent = os.sep.join(parts[:-1]) 

2143 if parent in ind: 

2144 ind[parent].append(parts[-1]) 

2145 break 

2146 else: 

2147 ind[parent] = [parts.pop()] 

2148 self._dirindex = ind 

2149 return ind 

2150 

2151 def _has(self, fspath) -> bool: 

2152 zip_path = self._zipinfo_name(fspath) 

2153 return zip_path in self.zipinfo or zip_path in self._index() 

2154 

2155 def _isdir(self, fspath) -> bool: 

2156 return self._zipinfo_name(fspath) in self._index() 

2157 

2158 def _listdir(self, fspath): 

2159 return list(self._index().get(self._zipinfo_name(fspath), ())) 

2160 

2161 def _eager_to_zip(self, resource_name: str): 

2162 return self._zipinfo_name(self._fn(self.egg_root, resource_name)) 

2163 

2164 def _resource_to_zip(self, resource_name: str): 

2165 return self._zipinfo_name(self._fn(self.module_path, resource_name)) 

2166 

2167 

2168register_loader_type(zipimport.zipimporter, ZipProvider) 

2169 

2170 

2171class FileMetadata(EmptyProvider): 

2172 """Metadata handler for standalone PKG-INFO files 

2173 

2174 Usage:: 

2175 

2176 metadata = FileMetadata("/path/to/PKG-INFO") 

2177 

2178 This provider rejects all data and metadata requests except for PKG-INFO, 

2179 which is treated as existing, and will be the contents of the file at 

2180 the provided location. 

2181 """ 

2182 

2183 def __init__(self, path: StrPath): 

2184 self.path = path 

2185 

2186 def _get_metadata_path(self, name): 

2187 return self.path 

2188 

2189 def has_metadata(self, name: str) -> bool: 

2190 return name == 'PKG-INFO' and os.path.isfile(self.path) 

2191 

2192 def get_metadata(self, name: str) -> str: 

2193 if name != 'PKG-INFO': 

2194 raise KeyError("No metadata except PKG-INFO is available") 

2195 

2196 with open(self.path, encoding='utf-8', errors="replace") as f: 

2197 metadata = f.read() 

2198 self._warn_on_replacement(metadata) 

2199 return metadata 

2200 

2201 def _warn_on_replacement(self, metadata): 

2202 replacement_char = '�' 

2203 if replacement_char in metadata: 

2204 tmpl = "{self.path} could not be properly decoded in UTF-8" 

2205 msg = tmpl.format(**locals()) 

2206 warnings.warn(msg) 

2207 

2208 def get_metadata_lines(self, name: str) -> Iterator[str]: 

2209 return yield_lines(self.get_metadata(name)) 

2210 

2211 

2212class PathMetadata(DefaultProvider): 

2213 """Metadata provider for egg directories 

2214 

2215 Usage:: 

2216 

2217 # Development eggs: 

2218 

2219 egg_info = "/path/to/PackageName.egg-info" 

2220 base_dir = os.path.dirname(egg_info) 

2221 metadata = PathMetadata(base_dir, egg_info) 

2222 dist_name = os.path.splitext(os.path.basename(egg_info))[0] 

2223 dist = Distribution(basedir, project_name=dist_name, metadata=metadata) 

2224 

2225 # Unpacked egg directories: 

2226 

2227 egg_path = "/path/to/PackageName-ver-pyver-etc.egg" 

2228 metadata = PathMetadata(egg_path, os.path.join(egg_path,'EGG-INFO')) 

2229 dist = Distribution.from_filename(egg_path, metadata=metadata) 

2230 """ 

2231 

2232 def __init__(self, path: str, egg_info: str): 

2233 self.module_path = path 

2234 self.egg_info = egg_info 

2235 

2236 

2237class EggMetadata(ZipProvider): 

2238 """Metadata provider for .egg files""" 

2239 

2240 def __init__(self, importer: zipimport.zipimporter): 

2241 """Create a metadata provider from a zipimporter""" 

2242 

2243 self.zip_pre = importer.archive + os.sep 

2244 self.loader = importer 

2245 if importer.prefix: 

2246 self.module_path = os.path.join(importer.archive, importer.prefix) 

2247 else: 

2248 self.module_path = importer.archive 

2249 self._setup_prefix() 

2250 

2251 

2252_distribution_finders: dict[type, _DistFinderType[Any]] = _declare_state( 

2253 'dict', '_distribution_finders', {} 

2254) 

2255 

2256 

2257def register_finder( 

2258 importer_type: type[_T], distribution_finder: _DistFinderType[_T] 

2259) -> None: 

2260 """Register `distribution_finder` to find distributions in sys.path items 

2261 

2262 `importer_type` is the type or class of a PEP 302 "Importer" (sys.path item 

2263 handler), and `distribution_finder` is a callable that, passed a path 

2264 item and the importer instance, yields ``Distribution`` instances found on 

2265 that path item. See ``pkg_resources.find_on_path`` for an example.""" 

2266 _distribution_finders[importer_type] = distribution_finder 

2267 

2268 

2269def find_distributions(path_item: str, only: bool = False) -> Iterable[Distribution]: 

2270 """Yield distributions accessible via `path_item`""" 

2271 importer = get_importer(path_item) 

2272 finder = _find_adapter(_distribution_finders, importer) 

2273 return finder(importer, path_item, only) 

2274 

2275 

2276def find_eggs_in_zip( 

2277 importer: zipimport.zipimporter, path_item: str, only: bool = False 

2278) -> Iterator[Distribution]: 

2279 """ 

2280 Find eggs in zip files; possibly multiple nested eggs. 

2281 """ 

2282 if importer.archive.endswith('.whl'): 

2283 # wheels are not supported with this finder 

2284 # they don't have PKG-INFO metadata, and won't ever contain eggs 

2285 return 

2286 metadata = EggMetadata(importer) 

2287 if metadata.has_metadata('PKG-INFO'): 

2288 yield Distribution.from_filename(path_item, metadata=metadata) 

2289 if only: 

2290 # don't yield nested distros 

2291 return 

2292 for subitem in metadata.resource_listdir(''): 

2293 if _is_egg_path(subitem): 

2294 subpath = os.path.join(path_item, subitem) 

2295 dists = find_eggs_in_zip(zipimport.zipimporter(subpath), subpath) 

2296 yield from dists 

2297 elif subitem.lower().endswith(('.dist-info', '.egg-info')): 

2298 subpath = os.path.join(path_item, subitem) 

2299 submeta = EggMetadata(zipimport.zipimporter(subpath)) 

2300 submeta.egg_info = subpath 

2301 yield Distribution.from_location(path_item, subitem, submeta) 

2302 

2303 

2304register_finder(zipimport.zipimporter, find_eggs_in_zip) 

2305 

2306 

2307def find_nothing( 

2308 importer: object | None, path_item: str | None, only: bool | None = False 

2309): 

2310 return () 

2311 

2312 

2313register_finder(object, find_nothing) 

2314 

2315 

2316def find_on_path(importer: object | None, path_item, only=False): 

2317 """Yield distributions accessible on a sys.path directory""" 

2318 path_item = _normalize_cached(path_item) 

2319 

2320 if _is_unpacked_egg(path_item): 

2321 yield Distribution.from_filename( 

2322 path_item, 

2323 metadata=PathMetadata(path_item, os.path.join(path_item, 'EGG-INFO')), 

2324 ) 

2325 return 

2326 

2327 entries = (os.path.join(path_item, child) for child in safe_listdir(path_item)) 

2328 

2329 # scan for .egg and .egg-info in directory 

2330 for entry in sorted(entries): 

2331 fullpath = os.path.join(path_item, entry) 

2332 factory = dist_factory(path_item, entry, only) 

2333 yield from factory(fullpath) 

2334 

2335 

2336def dist_factory(path_item, entry, only): 

2337 """Return a dist_factory for the given entry.""" 

2338 lower = entry.lower() 

2339 is_egg_info = lower.endswith('.egg-info') 

2340 is_dist_info = lower.endswith('.dist-info') and os.path.isdir( 

2341 os.path.join(path_item, entry) 

2342 ) 

2343 is_meta = is_egg_info or is_dist_info 

2344 return ( 

2345 distributions_from_metadata 

2346 if is_meta 

2347 else find_distributions 

2348 if not only and _is_egg_path(entry) 

2349 else resolve_egg_link 

2350 if not only and lower.endswith('.egg-link') 

2351 else NoDists() 

2352 ) 

2353 

2354 

2355class NoDists: 

2356 """ 

2357 >>> bool(NoDists()) 

2358 False 

2359 

2360 >>> list(NoDists()('anything')) 

2361 [] 

2362 """ 

2363 

2364 def __bool__(self): 

2365 return False 

2366 

2367 def __call__(self, fullpath): 

2368 return iter(()) 

2369 

2370 

2371def safe_listdir(path: StrOrBytesPath): 

2372 """ 

2373 Attempt to list contents of path, but suppress some exceptions. 

2374 """ 

2375 try: 

2376 return os.listdir(path) 

2377 except (PermissionError, NotADirectoryError): 

2378 pass 

2379 except OSError as e: 

2380 # Ignore the directory if does not exist, not a directory or 

2381 # permission denied 

2382 if e.errno not in (errno.ENOTDIR, errno.EACCES, errno.ENOENT): 

2383 raise 

2384 return () 

2385 

2386 

2387def distributions_from_metadata(path: str): 

2388 root = os.path.dirname(path) 

2389 if os.path.isdir(path): 

2390 if len(os.listdir(path)) == 0: 

2391 # empty metadata dir; skip 

2392 return 

2393 metadata: _MetadataType = PathMetadata(root, path) 

2394 else: 

2395 metadata = FileMetadata(path) 

2396 entry = os.path.basename(path) 

2397 yield Distribution.from_location( 

2398 root, 

2399 entry, 

2400 metadata, 

2401 precedence=DEVELOP_DIST, 

2402 ) 

2403 

2404 

2405def non_empty_lines(path): 

2406 """ 

2407 Yield non-empty lines from file at path 

2408 """ 

2409 for line in _read_utf8_with_fallback(path).splitlines(): 

2410 line = line.strip() 

2411 if line: 

2412 yield line 

2413 

2414 

2415def resolve_egg_link(path): 

2416 """ 

2417 Given a path to an .egg-link, resolve distributions 

2418 present in the referenced path. 

2419 """ 

2420 referenced_paths = non_empty_lines(path) 

2421 resolved_paths = ( 

2422 os.path.join(os.path.dirname(path), ref) for ref in referenced_paths 

2423 ) 

2424 dist_groups = map(find_distributions, resolved_paths) 

2425 return next(dist_groups, ()) 

2426 

2427 

2428if hasattr(pkgutil, 'ImpImporter'): 

2429 register_finder(pkgutil.ImpImporter, find_on_path) 

2430 

2431register_finder(importlib.machinery.FileFinder, find_on_path) 

2432 

2433_namespace_handlers: dict[type, _NSHandlerType[Any]] = _declare_state( 

2434 'dict', '_namespace_handlers', {} 

2435) 

2436_namespace_packages: dict[str | None, list[str]] = _declare_state( 

2437 'dict', '_namespace_packages', {} 

2438) 

2439 

2440 

2441def register_namespace_handler( 

2442 importer_type: type[_T], namespace_handler: _NSHandlerType[_T] 

2443) -> None: 

2444 """Register `namespace_handler` to declare namespace packages 

2445 

2446 `importer_type` is the type or class of a PEP 302 "Importer" (sys.path item 

2447 handler), and `namespace_handler` is a callable like this:: 

2448 

2449 def namespace_handler(importer, path_entry, moduleName, module): 

2450 # return a path_entry to use for child packages 

2451 

2452 Namespace handlers are only called if the importer object has already 

2453 agreed that it can handle the relevant path item, and they should only 

2454 return a subpath if the module __path__ does not already contain an 

2455 equivalent subpath. For an example namespace handler, see 

2456 ``pkg_resources.file_ns_handler``. 

2457 """ 

2458 _namespace_handlers[importer_type] = namespace_handler 

2459 

2460 

2461def _handle_ns(packageName, path_item): 

2462 """Ensure that named package includes a subpath of path_item (if needed)""" 

2463 

2464 importer = get_importer(path_item) 

2465 if importer is None: 

2466 return None 

2467 

2468 # use find_spec (PEP 451) and fall-back to find_module (PEP 302) 

2469 try: 

2470 spec = importer.find_spec(packageName) 

2471 except AttributeError: 

2472 # capture warnings due to #1111 

2473 with warnings.catch_warnings(): 

2474 warnings.simplefilter("ignore") 

2475 loader = importer.find_module(packageName) 

2476 else: 

2477 loader = spec.loader if spec else None 

2478 

2479 if loader is None: 

2480 return None 

2481 module = sys.modules.get(packageName) 

2482 if module is None: 

2483 module = sys.modules[packageName] = types.ModuleType(packageName) 

2484 module.__path__ = [] 

2485 _set_parent_ns(packageName) 

2486 elif not hasattr(module, '__path__'): 

2487 raise TypeError("Not a package:", packageName) 

2488 handler = _find_adapter(_namespace_handlers, importer) 

2489 subpath = handler(importer, path_item, packageName, module) 

2490 if subpath is not None: 

2491 path = module.__path__ 

2492 path.append(subpath) 

2493 importlib.import_module(packageName) 

2494 _rebuild_mod_path(path, packageName, module) 

2495 return subpath 

2496 

2497 

2498def _rebuild_mod_path(orig_path, package_name, module: types.ModuleType): 

2499 """ 

2500 Rebuild module.__path__ ensuring that all entries are ordered 

2501 corresponding to their sys.path order 

2502 """ 

2503 sys_path = [_normalize_cached(p) for p in sys.path] 

2504 

2505 def safe_sys_path_index(entry): 

2506 """ 

2507 Workaround for #520 and #513. 

2508 """ 

2509 try: 

2510 return sys_path.index(entry) 

2511 except ValueError: 

2512 return float('inf') 

2513 

2514 def position_in_sys_path(path): 

2515 """ 

2516 Return the ordinal of the path based on its position in sys.path 

2517 """ 

2518 path_parts = path.split(os.sep) 

2519 module_parts = package_name.count('.') + 1 

2520 parts = path_parts[:-module_parts] 

2521 return safe_sys_path_index(_normalize_cached(os.sep.join(parts))) 

2522 

2523 new_path = sorted(orig_path, key=position_in_sys_path) 

2524 new_path = [_normalize_cached(p) for p in new_path] 

2525 

2526 if isinstance(module.__path__, list): 

2527 module.__path__[:] = new_path 

2528 else: 

2529 module.__path__ = new_path 

2530 

2531 

2532def declare_namespace(packageName: str) -> None: 

2533 """Declare that package 'packageName' is a namespace package""" 

2534 

2535 msg = ( 

2536 f"Deprecated call to `pkg_resources.declare_namespace({packageName!r})`.\n" 

2537 "Implementing implicit namespace packages (as specified in PEP 420) " 

2538 "is preferred to `pkg_resources.declare_namespace`. " 

2539 "See https://setuptools.pypa.io/en/latest/references/" 

2540 "keywords.html#keyword-namespace-packages" 

2541 ) 

2542 warnings.warn(msg, DeprecationWarning, stacklevel=2) 

2543 

2544 _imp.acquire_lock() 

2545 try: 

2546 if packageName in _namespace_packages: 

2547 return 

2548 

2549 path: MutableSequence[str] = sys.path 

2550 parent, _, _ = packageName.rpartition('.') 

2551 

2552 if parent: 

2553 declare_namespace(parent) 

2554 if parent not in _namespace_packages: 

2555 __import__(parent) 

2556 try: 

2557 path = sys.modules[parent].__path__ 

2558 except AttributeError as e: 

2559 raise TypeError("Not a package:", parent) from e 

2560 

2561 # Track what packages are namespaces, so when new path items are added, 

2562 # they can be updated 

2563 _namespace_packages.setdefault(parent or None, []).append(packageName) 

2564 _namespace_packages.setdefault(packageName, []) 

2565 

2566 for path_item in path: 

2567 # Ensure all the parent's path items are reflected in the child, 

2568 # if they apply 

2569 _handle_ns(packageName, path_item) 

2570 

2571 finally: 

2572 _imp.release_lock() 

2573 

2574 

2575def fixup_namespace_packages(path_item: str, parent: str | None = None) -> None: 

2576 """Ensure that previously-declared namespace packages include path_item""" 

2577 _imp.acquire_lock() 

2578 try: 

2579 for package in _namespace_packages.get(parent, ()): 

2580 subpath = _handle_ns(package, path_item) 

2581 if subpath: 

2582 fixup_namespace_packages(subpath, package) 

2583 finally: 

2584 _imp.release_lock() 

2585 

2586 

2587def file_ns_handler( 

2588 importer: object, 

2589 path_item: StrPath, 

2590 packageName: str, 

2591 module: types.ModuleType, 

2592): 

2593 """Compute an ns-package subpath for a filesystem or zipfile importer""" 

2594 

2595 subpath = os.path.join(path_item, packageName.split('.')[-1]) 

2596 normalized = _normalize_cached(subpath) 

2597 for item in module.__path__: 

2598 if _normalize_cached(item) == normalized: 

2599 break 

2600 else: 

2601 # Only return the path if it's not already there 

2602 return subpath 

2603 

2604 

2605if hasattr(pkgutil, 'ImpImporter'): 

2606 register_namespace_handler(pkgutil.ImpImporter, file_ns_handler) 

2607 

2608register_namespace_handler(zipimport.zipimporter, file_ns_handler) 

2609register_namespace_handler(importlib.machinery.FileFinder, file_ns_handler) 

2610 

2611 

2612def null_ns_handler( 

2613 importer: object, 

2614 path_item: str | None, 

2615 packageName: str | None, 

2616 module: _ModuleLike | None, 

2617): 

2618 return None 

2619 

2620 

2621register_namespace_handler(object, null_ns_handler) 

2622 

2623 

2624@overload 

2625def normalize_path(filename: StrPath) -> str: ... 

2626@overload 

2627def normalize_path(filename: BytesPath) -> bytes: ... 

2628def normalize_path(filename: StrOrBytesPath): 

2629 """Normalize a file/dir name for comparison purposes""" 

2630 return os.path.normcase(os.path.realpath(os.path.normpath(_cygwin_patch(filename)))) 

2631 

2632 

2633def _cygwin_patch(filename: StrOrBytesPath): # pragma: nocover 

2634 """ 

2635 Contrary to POSIX 2008, on Cygwin, getcwd (3) contains 

2636 symlink components. Using 

2637 os.path.abspath() works around this limitation. A fix in os.getcwd() 

2638 would probably better, in Cygwin even more so, except 

2639 that this seems to be by design... 

2640 """ 

2641 return os.path.abspath(filename) if sys.platform == 'cygwin' else filename 

2642 

2643 

2644if TYPE_CHECKING: 

2645 # https://github.com/python/mypy/issues/16261 

2646 # https://github.com/python/typeshed/issues/6347 

2647 @overload 

2648 def _normalize_cached(filename: StrPath) -> str: ... 

2649 @overload 

2650 def _normalize_cached(filename: BytesPath) -> bytes: ... 

2651 def _normalize_cached(filename: StrOrBytesPath) -> str | bytes: ... 

2652 

2653else: 

2654 

2655 @functools.lru_cache(maxsize=None) 

2656 def _normalize_cached(filename): 

2657 return normalize_path(filename) 

2658 

2659 

2660def _is_egg_path(path): 

2661 """ 

2662 Determine if given path appears to be an egg. 

2663 """ 

2664 return _is_zip_egg(path) or _is_unpacked_egg(path) 

2665 

2666 

2667def _is_zip_egg(path): 

2668 return ( 

2669 path.lower().endswith('.egg') 

2670 and os.path.isfile(path) 

2671 and zipfile.is_zipfile(path) 

2672 ) 

2673 

2674 

2675def _is_unpacked_egg(path): 

2676 """ 

2677 Determine if given path appears to be an unpacked egg. 

2678 """ 

2679 return path.lower().endswith('.egg') and os.path.isfile( 

2680 os.path.join(path, 'EGG-INFO', 'PKG-INFO') 

2681 ) 

2682 

2683 

2684def _set_parent_ns(packageName): 

2685 parts = packageName.split('.') 

2686 name = parts.pop() 

2687 if parts: 

2688 parent = '.'.join(parts) 

2689 setattr(sys.modules[parent], name, sys.modules[packageName]) 

2690 

2691 

2692MODULE = re.compile(r"\w+(\.\w+)*$").match 

2693EGG_NAME = re.compile( 

2694 r""" 

2695 (?P<name>[^-]+) ( 

2696 -(?P<ver>[^-]+) ( 

2697 -py(?P<pyver>[^-]+) ( 

2698 -(?P<plat>.+) 

2699 )? 

2700 )? 

2701 )? 

2702 """, 

2703 re.VERBOSE | re.IGNORECASE, 

2704).match 

2705 

2706 

2707class EntryPoint: 

2708 """Object representing an advertised importable object""" 

2709 

2710 def __init__( 

2711 self, 

2712 name: str, 

2713 module_name: str, 

2714 attrs: Iterable[str] = (), 

2715 extras: Iterable[str] = (), 

2716 dist: Distribution | None = None, 

2717 ): 

2718 if not MODULE(module_name): 

2719 raise ValueError("Invalid module name", module_name) 

2720 self.name = name 

2721 self.module_name = module_name 

2722 self.attrs = tuple(attrs) 

2723 self.extras = tuple(extras) 

2724 self.dist = dist 

2725 

2726 def __str__(self): 

2727 s = "%s = %s" % (self.name, self.module_name) 

2728 if self.attrs: 

2729 s += ':' + '.'.join(self.attrs) 

2730 if self.extras: 

2731 s += ' [%s]' % ','.join(self.extras) 

2732 return s 

2733 

2734 def __repr__(self): 

2735 return "EntryPoint.parse(%r)" % str(self) 

2736 

2737 @overload 

2738 def load( 

2739 self, 

2740 require: Literal[True] = True, 

2741 env: Environment | None = None, 

2742 installer: _InstallerType | None = None, 

2743 ) -> _ResolvedEntryPoint: ... 

2744 @overload 

2745 def load( 

2746 self, 

2747 require: Literal[False], 

2748 *args: Any, 

2749 **kwargs: Any, 

2750 ) -> _ResolvedEntryPoint: ... 

2751 def load( 

2752 self, 

2753 require: bool = True, 

2754 *args: Environment | _InstallerType | None, 

2755 **kwargs: Environment | _InstallerType | None, 

2756 ) -> _ResolvedEntryPoint: 

2757 """ 

2758 Require packages for this EntryPoint, then resolve it. 

2759 """ 

2760 if not require or args or kwargs: 

2761 warnings.warn( 

2762 "Parameters to load are deprecated. Call .resolve and " 

2763 ".require separately.", 

2764 PkgResourcesDeprecationWarning, 

2765 stacklevel=2, 

2766 ) 

2767 if require: 

2768 # We could pass `env` and `installer` directly, 

2769 # but keeping `*args` and `**kwargs` for backwards compatibility 

2770 self.require(*args, **kwargs) # type: ignore 

2771 return self.resolve() 

2772 

2773 def resolve(self) -> _ResolvedEntryPoint: 

2774 """ 

2775 Resolve the entry point from its module and attrs. 

2776 """ 

2777 module = __import__(self.module_name, fromlist=['__name__'], level=0) 

2778 try: 

2779 return functools.reduce(getattr, self.attrs, module) 

2780 except AttributeError as exc: 

2781 raise ImportError(str(exc)) from exc 

2782 

2783 def require( 

2784 self, 

2785 env: Environment | None = None, 

2786 installer: _InstallerType | None = None, 

2787 ) -> None: 

2788 if not self.dist: 

2789 error_cls = UnknownExtra if self.extras else AttributeError 

2790 raise error_cls("Can't require() without a distribution", self) 

2791 

2792 # Get the requirements for this entry point with all its extras and 

2793 # then resolve them. We have to pass `extras` along when resolving so 

2794 # that the working set knows what extras we want. Otherwise, for 

2795 # dist-info distributions, the working set will assume that the 

2796 # requirements for that extra are purely optional and skip over them. 

2797 reqs = self.dist.requires(self.extras) 

2798 items = working_set.resolve(reqs, env, installer, extras=self.extras) 

2799 list(map(working_set.add, items)) 

2800 

2801 pattern = re.compile( 

2802 r'\s*' 

2803 r'(?P<name>.+?)\s*' 

2804 r'=\s*' 

2805 r'(?P<module>[\w.]+)\s*' 

2806 r'(:\s*(?P<attr>[\w.]+))?\s*' 

2807 r'(?P<extras>\[.*\])?\s*$' 

2808 ) 

2809 

2810 @classmethod 

2811 def parse(cls, src: str, dist: Distribution | None = None) -> Self: 

2812 """Parse a single entry point from string `src` 

2813 

2814 Entry point syntax follows the form:: 

2815 

2816 name = some.module:some.attr [extra1, extra2] 

2817 

2818 The entry name and module name are required, but the ``:attrs`` and 

2819 ``[extras]`` parts are optional 

2820 """ 

2821 m = cls.pattern.match(src) 

2822 if not m: 

2823 msg = "EntryPoint must be in 'name=module:attrs [extras]' format" 

2824 raise ValueError(msg, src) 

2825 res = m.groupdict() 

2826 extras = cls._parse_extras(res['extras']) 

2827 attrs = res['attr'].split('.') if res['attr'] else () 

2828 return cls(res['name'], res['module'], attrs, extras, dist) 

2829 

2830 @classmethod 

2831 def _parse_extras(cls, extras_spec): 

2832 if not extras_spec: 

2833 return () 

2834 req = Requirement.parse('x' + extras_spec) 

2835 if req.specs: 

2836 raise ValueError 

2837 return req.extras 

2838 

2839 @classmethod 

2840 def parse_group( 

2841 cls, 

2842 group: str, 

2843 lines: _NestedStr, 

2844 dist: Distribution | None = None, 

2845 ) -> dict[str, Self]: 

2846 """Parse an entry point group""" 

2847 if not MODULE(group): 

2848 raise ValueError("Invalid group name", group) 

2849 this: dict[str, Self] = {} 

2850 for line in yield_lines(lines): 

2851 ep = cls.parse(line, dist) 

2852 if ep.name in this: 

2853 raise ValueError("Duplicate entry point", group, ep.name) 

2854 this[ep.name] = ep 

2855 return this 

2856 

2857 @classmethod 

2858 def parse_map( 

2859 cls, 

2860 data: str | Iterable[str] | dict[str, str | Iterable[str]], 

2861 dist: Distribution | None = None, 

2862 ) -> dict[str, dict[str, Self]]: 

2863 """Parse a map of entry point groups""" 

2864 _data: Iterable[tuple[str | None, str | Iterable[str]]] 

2865 if isinstance(data, dict): 

2866 _data = data.items() 

2867 else: 

2868 _data = split_sections(data) 

2869 maps: dict[str, dict[str, Self]] = {} 

2870 for group, lines in _data: 

2871 if group is None: 

2872 if not lines: 

2873 continue 

2874 raise ValueError("Entry points must be listed in groups") 

2875 group = group.strip() 

2876 if group in maps: 

2877 raise ValueError("Duplicate group name", group) 

2878 maps[group] = cls.parse_group(group, lines, dist) 

2879 return maps 

2880 

2881 

2882def _version_from_file(lines): 

2883 """ 

2884 Given an iterable of lines from a Metadata file, return 

2885 the value of the Version field, if present, or None otherwise. 

2886 """ 

2887 

2888 def is_version_line(line): 

2889 return line.lower().startswith('version:') 

2890 

2891 version_lines = filter(is_version_line, lines) 

2892 line = next(iter(version_lines), '') 

2893 _, _, value = line.partition(':') 

2894 return safe_version(value.strip()) or None 

2895 

2896 

2897class Distribution: 

2898 """Wrap an actual or potential sys.path entry w/metadata""" 

2899 

2900 PKG_INFO = 'PKG-INFO' 

2901 

2902 def __init__( 

2903 self, 

2904 location: str | None = None, 

2905 metadata: _MetadataType = None, 

2906 project_name: str | None = None, 

2907 version: str | None = None, 

2908 py_version: str | None = PY_MAJOR, 

2909 platform: str | None = None, 

2910 precedence: int = EGG_DIST, 

2911 ): 

2912 self.project_name = safe_name(project_name or 'Unknown') 

2913 if version is not None: 

2914 self._version = safe_version(version) 

2915 self.py_version = py_version 

2916 self.platform = platform 

2917 self.location = location 

2918 self.precedence = precedence 

2919 self._provider = metadata or empty_provider 

2920 

2921 @classmethod 

2922 def from_location( 

2923 cls, 

2924 location: str, 

2925 basename: StrPath, 

2926 metadata: _MetadataType = None, 

2927 **kw: int, # We could set `precedence` explicitly, but keeping this as `**kw` for full backwards and subclassing compatibility 

2928 ) -> Distribution: 

2929 project_name, version, py_version, platform = [None] * 4 

2930 basename, ext = os.path.splitext(basename) 

2931 if ext.lower() in _distributionImpl: 

2932 cls = _distributionImpl[ext.lower()] 

2933 

2934 match = EGG_NAME(basename) 

2935 if match: 

2936 project_name, version, py_version, platform = match.group( 

2937 'name', 'ver', 'pyver', 'plat' 

2938 ) 

2939 return cls( 

2940 location, 

2941 metadata, 

2942 project_name=project_name, 

2943 version=version, 

2944 py_version=py_version, 

2945 platform=platform, 

2946 **kw, 

2947 )._reload_version() 

2948 

2949 def _reload_version(self): 

2950 return self 

2951 

2952 @property 

2953 def hashcmp(self): 

2954 return ( 

2955 self._forgiving_parsed_version, 

2956 self.precedence, 

2957 self.key, 

2958 self.location, 

2959 self.py_version or '', 

2960 self.platform or '', 

2961 ) 

2962 

2963 def __hash__(self): 

2964 return hash(self.hashcmp) 

2965 

2966 def __lt__(self, other: Distribution): 

2967 return self.hashcmp < other.hashcmp 

2968 

2969 def __le__(self, other: Distribution): 

2970 return self.hashcmp <= other.hashcmp 

2971 

2972 def __gt__(self, other: Distribution): 

2973 return self.hashcmp > other.hashcmp 

2974 

2975 def __ge__(self, other: Distribution): 

2976 return self.hashcmp >= other.hashcmp 

2977 

2978 def __eq__(self, other: object): 

2979 if not isinstance(other, self.__class__): 

2980 # It's not a Distribution, so they are not equal 

2981 return False 

2982 return self.hashcmp == other.hashcmp 

2983 

2984 def __ne__(self, other: object): 

2985 return not self == other 

2986 

2987 # These properties have to be lazy so that we don't have to load any 

2988 # metadata until/unless it's actually needed. (i.e., some distributions 

2989 # may not know their name or version without loading PKG-INFO) 

2990 

2991 @property 

2992 def key(self): 

2993 try: 

2994 return self._key 

2995 except AttributeError: 

2996 self._key = key = self.project_name.lower() 

2997 return key 

2998 

2999 @property 

3000 def parsed_version(self): 

3001 if not hasattr(self, "_parsed_version"): 

3002 try: 

3003 self._parsed_version = parse_version(self.version) 

3004 except _packaging_version.InvalidVersion as ex: 

3005 info = f"(package: {self.project_name})" 

3006 if hasattr(ex, "add_note"): 

3007 ex.add_note(info) # PEP 678 

3008 raise 

3009 raise _packaging_version.InvalidVersion(f"{str(ex)} {info}") from None 

3010 

3011 return self._parsed_version 

3012 

3013 @property 

3014 def _forgiving_parsed_version(self): 

3015 try: 

3016 return self.parsed_version 

3017 except _packaging_version.InvalidVersion as ex: 

3018 self._parsed_version = parse_version(_forgiving_version(self.version)) 

3019 

3020 notes = "\n".join(getattr(ex, "__notes__", [])) # PEP 678 

3021 msg = f"""!!\n\n 

3022 ************************************************************************* 

3023 {str(ex)}\n{notes} 

3024 

3025 This is a long overdue deprecation. 

3026 For the time being, `pkg_resources` will use `{self._parsed_version}` 

3027 as a replacement to avoid breaking existing environments, 

3028 but no future compatibility is guaranteed. 

3029 

3030 If you maintain package {self.project_name} you should implement 

3031 the relevant changes to adequate the project to PEP 440 immediately. 

3032 ************************************************************************* 

3033 \n\n!! 

3034 """ 

3035 warnings.warn(msg, DeprecationWarning) 

3036 

3037 return self._parsed_version 

3038 

3039 @property 

3040 def version(self): 

3041 try: 

3042 return self._version 

3043 except AttributeError as e: 

3044 version = self._get_version() 

3045 if version is None: 

3046 path = self._get_metadata_path_for_display(self.PKG_INFO) 

3047 msg = ("Missing 'Version:' header and/or {} file at path: {}").format( 

3048 self.PKG_INFO, path 

3049 ) 

3050 raise ValueError(msg, self) from e 

3051 

3052 return version 

3053 

3054 @property 

3055 def _dep_map(self): 

3056 """ 

3057 A map of extra to its list of (direct) requirements 

3058 for this distribution, including the null extra. 

3059 """ 

3060 try: 

3061 return self.__dep_map 

3062 except AttributeError: 

3063 self.__dep_map = self._filter_extras(self._build_dep_map()) 

3064 return self.__dep_map 

3065 

3066 @staticmethod 

3067 def _filter_extras( 

3068 dm: dict[str | None, list[Requirement]], 

3069 ) -> dict[str | None, list[Requirement]]: 

3070 """ 

3071 Given a mapping of extras to dependencies, strip off 

3072 environment markers and filter out any dependencies 

3073 not matching the markers. 

3074 """ 

3075 for extra in list(filter(None, dm)): 

3076 new_extra: str | None = extra 

3077 reqs = dm.pop(extra) 

3078 new_extra, _, marker = extra.partition(':') 

3079 fails_marker = marker and ( 

3080 invalid_marker(marker) or not evaluate_marker(marker) 

3081 ) 

3082 if fails_marker: 

3083 reqs = [] 

3084 new_extra = safe_extra(new_extra) or None 

3085 

3086 dm.setdefault(new_extra, []).extend(reqs) 

3087 return dm 

3088 

3089 def _build_dep_map(self): 

3090 dm = {} 

3091 for name in 'requires.txt', 'depends.txt': 

3092 for extra, reqs in split_sections(self._get_metadata(name)): 

3093 dm.setdefault(extra, []).extend(parse_requirements(reqs)) 

3094 return dm 

3095 

3096 def requires(self, extras: Iterable[str] = ()) -> list[Requirement]: 

3097 """List of Requirements needed for this distro if `extras` are used""" 

3098 dm = self._dep_map 

3099 deps: list[Requirement] = [] 

3100 deps.extend(dm.get(None, ())) 

3101 for ext in extras: 

3102 try: 

3103 deps.extend(dm[safe_extra(ext)]) 

3104 except KeyError as e: 

3105 raise UnknownExtra( 

3106 "%s has no such extra feature %r" % (self, ext) 

3107 ) from e 

3108 return deps 

3109 

3110 def _get_metadata_path_for_display(self, name): 

3111 """ 

3112 Return the path to the given metadata file, if available. 

3113 """ 

3114 try: 

3115 # We need to access _get_metadata_path() on the provider object 

3116 # directly rather than through this class's __getattr__() 

3117 # since _get_metadata_path() is marked private. 

3118 path = self._provider._get_metadata_path(name) 

3119 

3120 # Handle exceptions e.g. in case the distribution's metadata 

3121 # provider doesn't support _get_metadata_path(). 

3122 except Exception: 

3123 return '[could not detect]' 

3124 

3125 return path 

3126 

3127 def _get_metadata(self, name): 

3128 if self.has_metadata(name): 

3129 yield from self.get_metadata_lines(name) 

3130 

3131 def _get_version(self): 

3132 lines = self._get_metadata(self.PKG_INFO) 

3133 return _version_from_file(lines) 

3134 

3135 def activate(self, path: list[str] | None = None, replace: bool = False) -> None: 

3136 """Ensure distribution is importable on `path` (default=sys.path)""" 

3137 if path is None: 

3138 path = sys.path 

3139 self.insert_on(path, replace=replace) 

3140 if path is sys.path and self.location is not None: 

3141 fixup_namespace_packages(self.location) 

3142 for pkg in self._get_metadata('namespace_packages.txt'): 

3143 if pkg in sys.modules: 

3144 declare_namespace(pkg) 

3145 

3146 def egg_name(self): 

3147 """Return what this distribution's standard .egg filename should be""" 

3148 filename = "%s-%s-py%s" % ( 

3149 to_filename(self.project_name), 

3150 to_filename(self.version), 

3151 self.py_version or PY_MAJOR, 

3152 ) 

3153 

3154 if self.platform: 

3155 filename += '-' + self.platform 

3156 return filename 

3157 

3158 def __repr__(self): 

3159 if self.location: 

3160 return "%s (%s)" % (self, self.location) 

3161 else: 

3162 return str(self) 

3163 

3164 def __str__(self): 

3165 try: 

3166 version = getattr(self, 'version', None) 

3167 except ValueError: 

3168 version = None 

3169 version = version or "[unknown version]" 

3170 return "%s %s" % (self.project_name, version) 

3171 

3172 def __getattr__(self, attr): 

3173 """Delegate all unrecognized public attributes to .metadata provider""" 

3174 if attr.startswith('_'): 

3175 raise AttributeError(attr) 

3176 return getattr(self._provider, attr) 

3177 

3178 def __dir__(self): 

3179 return list( 

3180 set(super().__dir__()) 

3181 | set(attr for attr in self._provider.__dir__() if not attr.startswith('_')) 

3182 ) 

3183 

3184 @classmethod 

3185 def from_filename( 

3186 cls, 

3187 filename: StrPath, 

3188 metadata: _MetadataType = None, 

3189 **kw: int, # We could set `precedence` explicitly, but keeping this as `**kw` for full backwards and subclassing compatibility 

3190 ) -> Distribution: 

3191 return cls.from_location( 

3192 _normalize_cached(filename), os.path.basename(filename), metadata, **kw 

3193 ) 

3194 

3195 def as_requirement(self): 

3196 """Return a ``Requirement`` that matches this distribution exactly""" 

3197 if isinstance(self.parsed_version, _packaging_version.Version): 

3198 spec = "%s==%s" % (self.project_name, self.parsed_version) 

3199 else: 

3200 spec = "%s===%s" % (self.project_name, self.parsed_version) 

3201 

3202 return Requirement.parse(spec) 

3203 

3204 def load_entry_point(self, group: str, name: str) -> _ResolvedEntryPoint: 

3205 """Return the `name` entry point of `group` or raise ImportError""" 

3206 ep = self.get_entry_info(group, name) 

3207 if ep is None: 

3208 raise ImportError("Entry point %r not found" % ((group, name),)) 

3209 return ep.load() 

3210 

3211 @overload 

3212 def get_entry_map(self, group: None = None) -> dict[str, dict[str, EntryPoint]]: ... 

3213 @overload 

3214 def get_entry_map(self, group: str) -> dict[str, EntryPoint]: ... 

3215 def get_entry_map(self, group: str | None = None): 

3216 """Return the entry point map for `group`, or the full entry map""" 

3217 if not hasattr(self, "_ep_map"): 

3218 self._ep_map = EntryPoint.parse_map( 

3219 self._get_metadata('entry_points.txt'), self 

3220 ) 

3221 if group is not None: 

3222 return self._ep_map.get(group, {}) 

3223 return self._ep_map 

3224 

3225 def get_entry_info(self, group: str, name: str) -> EntryPoint | None: 

3226 """Return the EntryPoint object for `group`+`name`, or ``None``""" 

3227 return self.get_entry_map(group).get(name) 

3228 

3229 # FIXME: 'Distribution.insert_on' is too complex (13) 

3230 def insert_on( # noqa: C901 

3231 self, 

3232 path: list[str], 

3233 loc=None, 

3234 replace: bool = False, 

3235 ) -> None: 

3236 """Ensure self.location is on path 

3237 

3238 If replace=False (default): 

3239 - If location is already in path anywhere, do nothing. 

3240 - Else: 

3241 - If it's an egg and its parent directory is on path, 

3242 insert just ahead of the parent. 

3243 - Else: add to the end of path. 

3244 If replace=True: 

3245 - If location is already on path anywhere (not eggs) 

3246 or higher priority than its parent (eggs) 

3247 do nothing. 

3248 - Else: 

3249 - If it's an egg and its parent directory is on path, 

3250 insert just ahead of the parent, 

3251 removing any lower-priority entries. 

3252 - Else: add it to the front of path. 

3253 """ 

3254 

3255 loc = loc or self.location 

3256 if not loc: 

3257 return 

3258 

3259 nloc = _normalize_cached(loc) 

3260 bdir = os.path.dirname(nloc) 

3261 npath = [(p and _normalize_cached(p) or p) for p in path] 

3262 

3263 for p, item in enumerate(npath): 

3264 if item == nloc: 

3265 if replace: 

3266 break 

3267 else: 

3268 # don't modify path (even removing duplicates) if 

3269 # found and not replace 

3270 return 

3271 elif item == bdir and self.precedence == EGG_DIST: 

3272 # if it's an .egg, give it precedence over its directory 

3273 # UNLESS it's already been added to sys.path and replace=False 

3274 if (not replace) and nloc in npath[p:]: 

3275 return 

3276 if path is sys.path: 

3277 self.check_version_conflict() 

3278 path.insert(p, loc) 

3279 npath.insert(p, nloc) 

3280 break 

3281 else: 

3282 if path is sys.path: 

3283 self.check_version_conflict() 

3284 if replace: 

3285 path.insert(0, loc) 

3286 else: 

3287 path.append(loc) 

3288 return 

3289 

3290 # p is the spot where we found or inserted loc; now remove duplicates 

3291 while True: 

3292 try: 

3293 np = npath.index(nloc, p + 1) 

3294 except ValueError: 

3295 break 

3296 else: 

3297 del npath[np], path[np] 

3298 # ha! 

3299 p = np 

3300 

3301 return 

3302 

3303 def check_version_conflict(self): 

3304 if self.key == 'setuptools': 

3305 # ignore the inevitable setuptools self-conflicts :( 

3306 return 

3307 

3308 nsp = dict.fromkeys(self._get_metadata('namespace_packages.txt')) 

3309 loc = normalize_path(self.location) 

3310 for modname in self._get_metadata('top_level.txt'): 

3311 if ( 

3312 modname not in sys.modules 

3313 or modname in nsp 

3314 or modname in _namespace_packages 

3315 ): 

3316 continue 

3317 if modname in ('pkg_resources', 'setuptools', 'site'): 

3318 continue 

3319 fn = getattr(sys.modules[modname], '__file__', None) 

3320 if fn and ( 

3321 normalize_path(fn).startswith(loc) or fn.startswith(self.location) 

3322 ): 

3323 continue 

3324 issue_warning( 

3325 "Module %s was already imported from %s, but %s is being added" 

3326 " to sys.path" % (modname, fn, self.location), 

3327 ) 

3328 

3329 def has_version(self): 

3330 try: 

3331 self.version 

3332 except ValueError: 

3333 issue_warning("Unbuilt egg for " + repr(self)) 

3334 return False 

3335 except SystemError: 

3336 # TODO: remove this except clause when python/cpython#103632 is fixed. 

3337 return False 

3338 return True 

3339 

3340 def clone(self, **kw: str | int | IResourceProvider | None) -> Self: 

3341 """Copy this distribution, substituting in any changed keyword args""" 

3342 names = 'project_name version py_version platform location precedence' 

3343 for attr in names.split(): 

3344 kw.setdefault(attr, getattr(self, attr, None)) 

3345 kw.setdefault('metadata', self._provider) 

3346 # Unsafely unpacking. But keeping **kw for backwards and subclassing compatibility 

3347 return self.__class__(**kw) # type:ignore[arg-type] 

3348 

3349 @property 

3350 def extras(self): 

3351 return [dep for dep in self._dep_map if dep] 

3352 

3353 

3354class EggInfoDistribution(Distribution): 

3355 def _reload_version(self): 

3356 """ 

3357 Packages installed by distutils (e.g. numpy or scipy), 

3358 which uses an old safe_version, and so 

3359 their version numbers can get mangled when 

3360 converted to filenames (e.g., 1.11.0.dev0+2329eae to 

3361 1.11.0.dev0_2329eae). These distributions will not be 

3362 parsed properly 

3363 downstream by Distribution and safe_version, so 

3364 take an extra step and try to get the version number from 

3365 the metadata file itself instead of the filename. 

3366 """ 

3367 md_version = self._get_version() 

3368 if md_version: 

3369 self._version = md_version 

3370 return self 

3371 

3372 

3373class DistInfoDistribution(Distribution): 

3374 """ 

3375 Wrap an actual or potential sys.path entry 

3376 w/metadata, .dist-info style. 

3377 """ 

3378 

3379 PKG_INFO = 'METADATA' 

3380 EQEQ = re.compile(r"([\(,])\s*(\d.*?)\s*([,\)])") 

3381 

3382 @property 

3383 def _parsed_pkg_info(self): 

3384 """Parse and cache metadata""" 

3385 try: 

3386 return self._pkg_info 

3387 except AttributeError: 

3388 metadata = self.get_metadata(self.PKG_INFO) 

3389 self._pkg_info = email.parser.Parser().parsestr(metadata) 

3390 return self._pkg_info 

3391 

3392 @property 

3393 def _dep_map(self): 

3394 try: 

3395 return self.__dep_map 

3396 except AttributeError: 

3397 self.__dep_map = self._compute_dependencies() 

3398 return self.__dep_map 

3399 

3400 def _compute_dependencies(self) -> dict[str | None, list[Requirement]]: 

3401 """Recompute this distribution's dependencies.""" 

3402 self.__dep_map: dict[str | None, list[Requirement]] = {None: []} 

3403 

3404 reqs: list[Requirement] = [] 

3405 # Including any condition expressions 

3406 for req in self._parsed_pkg_info.get_all('Requires-Dist') or []: 

3407 reqs.extend(parse_requirements(req)) 

3408 

3409 def reqs_for_extra(extra): 

3410 for req in reqs: 

3411 if not req.marker or req.marker.evaluate({'extra': extra}): 

3412 yield req 

3413 

3414 common = types.MappingProxyType(dict.fromkeys(reqs_for_extra(None))) 

3415 self.__dep_map[None].extend(common) 

3416 

3417 for extra in self._parsed_pkg_info.get_all('Provides-Extra') or []: 

3418 s_extra = safe_extra(extra.strip()) 

3419 self.__dep_map[s_extra] = [ 

3420 r for r in reqs_for_extra(extra) if r not in common 

3421 ] 

3422 

3423 return self.__dep_map 

3424 

3425 

3426_distributionImpl = { 

3427 '.egg': Distribution, 

3428 '.egg-info': EggInfoDistribution, 

3429 '.dist-info': DistInfoDistribution, 

3430} 

3431 

3432 

3433def issue_warning(*args, **kw): 

3434 level = 1 

3435 g = globals() 

3436 try: 

3437 # find the first stack frame that is *not* code in 

3438 # the pkg_resources module, to use for the warning 

3439 while sys._getframe(level).f_globals is g: 

3440 level += 1 

3441 except ValueError: 

3442 pass 

3443 warnings.warn(stacklevel=level + 1, *args, **kw) 

3444 

3445 

3446def parse_requirements(strs: _NestedStr) -> map[Requirement]: 

3447 """ 

3448 Yield ``Requirement`` objects for each specification in `strs`. 

3449 

3450 `strs` must be a string, or a (possibly-nested) iterable thereof. 

3451 """ 

3452 return map(Requirement, join_continuation(map(drop_comment, yield_lines(strs)))) 

3453 

3454 

3455class RequirementParseError(_packaging_requirements.InvalidRequirement): 

3456 "Compatibility wrapper for InvalidRequirement" 

3457 

3458 

3459class Requirement(_packaging_requirements.Requirement): 

3460 # prefer variable length tuple to set (as found in 

3461 # packaging.requirements.Requirement) 

3462 extras: tuple[str, ...] # type: ignore[assignment] 

3463 

3464 def __init__(self, requirement_string: str): 

3465 """DO NOT CALL THIS UNDOCUMENTED METHOD; use Requirement.parse()!""" 

3466 super().__init__(requirement_string) 

3467 self.unsafe_name = self.name 

3468 project_name = safe_name(self.name) 

3469 self.project_name, self.key = project_name, project_name.lower() 

3470 self.specs = [(spec.operator, spec.version) for spec in self.specifier] 

3471 self.extras = tuple(map(safe_extra, self.extras)) 

3472 self.hashCmp = ( 

3473 self.key, 

3474 self.url, 

3475 self.specifier, 

3476 frozenset(self.extras), 

3477 str(self.marker) if self.marker else None, 

3478 ) 

3479 self.__hash = hash(self.hashCmp) 

3480 

3481 def __eq__(self, other: object): 

3482 return isinstance(other, Requirement) and self.hashCmp == other.hashCmp 

3483 

3484 def __ne__(self, other): 

3485 return not self == other 

3486 

3487 def __contains__( 

3488 self, item: Distribution | packaging.specifiers.UnparsedVersion 

3489 ) -> bool: 

3490 if isinstance(item, Distribution): 

3491 if item.key != self.key: 

3492 return False 

3493 

3494 version = item.version 

3495 else: 

3496 version = item 

3497 

3498 # Allow prereleases always in order to match the previous behavior of 

3499 # this method. In the future this should be smarter and follow PEP 440 

3500 # more accurately. 

3501 return self.specifier.contains( 

3502 version, 

3503 prereleases=True, 

3504 ) 

3505 

3506 def __hash__(self): 

3507 return self.__hash 

3508 

3509 def __repr__(self): 

3510 return "Requirement.parse(%r)" % str(self) 

3511 

3512 @staticmethod 

3513 def parse(s: str | Iterable[str]) -> Requirement: 

3514 (req,) = parse_requirements(s) 

3515 return req 

3516 

3517 

3518def _always_object(classes): 

3519 """ 

3520 Ensure object appears in the mro even 

3521 for old-style classes. 

3522 """ 

3523 if object not in classes: 

3524 return classes + (object,) 

3525 return classes 

3526 

3527 

3528def _find_adapter(registry: Mapping[type, _AdapterT], ob: object) -> _AdapterT: 

3529 """Return an adapter factory for `ob` from `registry`""" 

3530 types = _always_object(inspect.getmro(getattr(ob, '__class__', type(ob)))) 

3531 for t in types: 

3532 if t in registry: 

3533 return registry[t] 

3534 # _find_adapter would previously return None, and immediately be called. 

3535 # So we're raising a TypeError to keep backward compatibility if anyone depended on that behaviour. 

3536 raise TypeError(f"Could not find adapter for {registry} and {ob}") 

3537 

3538 

3539def ensure_directory(path: StrOrBytesPath) -> None: 

3540 """Ensure that the parent directory of `path` exists""" 

3541 dirname = os.path.dirname(path) 

3542 os.makedirs(dirname, exist_ok=True) 

3543 

3544 

3545def _bypass_ensure_directory(path): 

3546 """Sandbox-bypassing version of ensure_directory()""" 

3547 if not WRITE_SUPPORT: 

3548 raise OSError('"os.mkdir" not supported on this platform.') 

3549 dirname, filename = split(path) 

3550 if dirname and filename and not isdir(dirname): 

3551 _bypass_ensure_directory(dirname) 

3552 try: 

3553 mkdir(dirname, 0o755) 

3554 except FileExistsError: 

3555 pass 

3556 

3557 

3558def split_sections(s: _NestedStr) -> Iterator[tuple[str | None, list[str]]]: 

3559 """Split a string or iterable thereof into (section, content) pairs 

3560 

3561 Each ``section`` is a stripped version of the section header ("[section]") 

3562 and each ``content`` is a list of stripped lines excluding blank lines and 

3563 comment-only lines. If there are any such lines before the first section 

3564 header, they're returned in a first ``section`` of ``None``. 

3565 """ 

3566 section = None 

3567 content = [] 

3568 for line in yield_lines(s): 

3569 if line.startswith("["): 

3570 if line.endswith("]"): 

3571 if section or content: 

3572 yield section, content 

3573 section = line[1:-1].strip() 

3574 content = [] 

3575 else: 

3576 raise ValueError("Invalid section heading", line) 

3577 else: 

3578 content.append(line) 

3579 

3580 # wrap up last segment 

3581 yield section, content 

3582 

3583 

3584def _mkstemp(*args, **kw): 

3585 old_open = os.open 

3586 try: 

3587 # temporarily bypass sandboxing 

3588 os.open = os_open 

3589 return tempfile.mkstemp(*args, **kw) 

3590 finally: 

3591 # and then put it back 

3592 os.open = old_open 

3593 

3594 

3595# Silence the PEP440Warning by default, so that end users don't get hit by it 

3596# randomly just because they use pkg_resources. We want to append the rule 

3597# because we want earlier uses of filterwarnings to take precedence over this 

3598# one. 

3599warnings.filterwarnings("ignore", category=PEP440Warning, append=True) 

3600 

3601 

3602class PkgResourcesDeprecationWarning(Warning): 

3603 """ 

3604 Base class for warning about deprecations in ``pkg_resources`` 

3605 

3606 This class is not derived from ``DeprecationWarning``, and as such is 

3607 visible by default. 

3608 """ 

3609 

3610 

3611# Ported from ``setuptools`` to avoid introducing an import inter-dependency: 

3612_LOCALE_ENCODING = "locale" if sys.version_info >= (3, 10) else None 

3613 

3614 

3615# This must go before calls to `_call_aside`. See https://github.com/pypa/setuptools/pull/4422 

3616def _read_utf8_with_fallback(file: str, fallback_encoding=_LOCALE_ENCODING) -> str: 

3617 """See setuptools.unicode_utils._read_utf8_with_fallback""" 

3618 try: 

3619 with open(file, "r", encoding="utf-8") as f: 

3620 return f.read() 

3621 except UnicodeDecodeError: # pragma: no cover 

3622 msg = f"""\ 

3623 ******************************************************************************** 

3624 `encoding="utf-8"` fails with {file!r}, trying `encoding={fallback_encoding!r}`. 

3625 

3626 This fallback behaviour is considered **deprecated** and future versions of 

3627 `setuptools/pkg_resources` may not implement it. 

3628 

3629 Please encode {file!r} with "utf-8" to ensure future builds will succeed. 

3630 

3631 If this file was produced by `setuptools` itself, cleaning up the cached files 

3632 and re-building/re-installing the package with a newer version of `setuptools` 

3633 (e.g. by updating `build-system.requires` in its `pyproject.toml`) 

3634 might solve the problem. 

3635 ******************************************************************************** 

3636 """ 

3637 # TODO: Add a deadline? 

3638 # See comment in setuptools.unicode_utils._Utf8EncodingNeeded 

3639 warnings.warn(msg, PkgResourcesDeprecationWarning, stacklevel=2) 

3640 with open(file, "r", encoding=fallback_encoding) as f: 

3641 return f.read() 

3642 

3643 

3644# from jaraco.functools 1.3 

3645def _call_aside(f, *args, **kwargs): 

3646 f(*args, **kwargs) 

3647 return f 

3648 

3649 

3650@_call_aside 

3651def _initialize(g=globals()): 

3652 "Set up global resource manager (deliberately not state-saved)" 

3653 manager = ResourceManager() 

3654 g['_manager'] = manager 

3655 g.update( 

3656 (name, getattr(manager, name)) 

3657 for name in dir(manager) 

3658 if not name.startswith('_') 

3659 ) 

3660 

3661 

3662@_call_aside 

3663def _initialize_master_working_set(): 

3664 """ 

3665 Prepare the master working set and make the ``require()`` 

3666 API available. 

3667 

3668 This function has explicit effects on the global state 

3669 of pkg_resources. It is intended to be invoked once at 

3670 the initialization of this module. 

3671 

3672 Invocation by other packages is unsupported and done 

3673 at their own risk. 

3674 """ 

3675 working_set = _declare_state('object', 'working_set', WorkingSet._build_master()) 

3676 

3677 require = working_set.require 

3678 iter_entry_points = working_set.iter_entry_points 

3679 add_activation_listener = working_set.subscribe 

3680 run_script = working_set.run_script 

3681 # backward compatibility 

3682 run_main = run_script 

3683 # Activate all distributions already on sys.path with replace=False and 

3684 # ensure that all distributions added to the working set in the future 

3685 # (e.g. by calling ``require()``) will get activated as well, 

3686 # with higher priority (replace=True). 

3687 tuple(dist.activate(replace=False) for dist in working_set) 

3688 add_activation_listener( 

3689 lambda dist: dist.activate(replace=True), 

3690 existing=False, 

3691 ) 

3692 working_set.entries = [] 

3693 # match order 

3694 list(map(working_set.add_entry, sys.path)) 

3695 globals().update(locals()) 

3696 

3697 

3698if TYPE_CHECKING: 

3699 # All of these are set by the @_call_aside methods above 

3700 __resource_manager = ResourceManager() # Won't exist at runtime 

3701 resource_exists = __resource_manager.resource_exists 

3702 resource_isdir = __resource_manager.resource_isdir 

3703 resource_filename = __resource_manager.resource_filename 

3704 resource_stream = __resource_manager.resource_stream 

3705 resource_string = __resource_manager.resource_string 

3706 resource_listdir = __resource_manager.resource_listdir 

3707 set_extraction_path = __resource_manager.set_extraction_path 

3708 cleanup_resources = __resource_manager.cleanup_resources 

3709 

3710 working_set = WorkingSet() 

3711 require = working_set.require 

3712 iter_entry_points = working_set.iter_entry_points 

3713 add_activation_listener = working_set.subscribe 

3714 run_script = working_set.run_script 

3715 run_main = run_script