1# TODO: Add Generic type annotations to initialized collections.
2# For now we'd simply use implicit Any/Unknown which would add redundant annotations
3# mypy: disable-error-code="var-annotated"
4"""
5Package resource API
6--------------------
7
8A resource is a logical file contained within a package, or a logical
9subdirectory thereof. The package resource API expects resource names
10to have their path parts separated with ``/``, *not* whatever the local
11path separator is. Do not use os.path operations to manipulate resource
12names being passed into the API.
13
14The package resource API is designed to work with normal filesystem packages,
15.egg files, and unpacked .egg files. It can also work in a limited way with
16.zip files and with custom PEP 302 loaders that support the ``get_data()``
17method.
18
19This module is deprecated. Users are directed to :mod:`importlib.resources`,
20:mod:`importlib.metadata` and :pypi:`packaging` instead.
21"""
22
23from __future__ import annotations
24
25import sys
26
27if sys.version_info < (3, 8): # noqa: UP036 # Check for unsupported versions
28 raise RuntimeError("Python 3.8 or later is required")
29
30import os
31import io
32import time
33import re
34import types
35from typing import (
36 Any,
37 Literal,
38 Dict,
39 Iterator,
40 Mapping,
41 MutableSequence,
42 NamedTuple,
43 NoReturn,
44 Tuple,
45 Union,
46 TYPE_CHECKING,
47 Protocol,
48 Callable,
49 Iterable,
50 TypeVar,
51 overload,
52)
53import zipfile
54import zipimport
55import warnings
56import stat
57import functools
58import pkgutil
59import operator
60import platform
61import collections
62import plistlib
63import email.parser
64import errno
65import tempfile
66import textwrap
67import inspect
68import ntpath
69import posixpath
70import importlib
71import importlib.abc
72import importlib.machinery
73from pkgutil import get_importer
74
75import _imp
76
77# capture these to bypass sandboxing
78from os import utime
79from os import open as os_open
80from os.path import isdir, split
81
82try:
83 from os import mkdir, rename, unlink
84
85 WRITE_SUPPORT = True
86except ImportError:
87 # no write support, probably under GAE
88 WRITE_SUPPORT = False
89
90from pip._internal.utils._jaraco_text import (
91 yield_lines,
92 drop_comment,
93 join_continuation,
94)
95from pip._vendor.packaging import markers as _packaging_markers
96from pip._vendor.packaging import requirements as _packaging_requirements
97from pip._vendor.packaging import utils as _packaging_utils
98from pip._vendor.packaging import version as _packaging_version
99from pip._vendor.platformdirs import user_cache_dir as _user_cache_dir
100
101if TYPE_CHECKING:
102 from _typeshed import BytesPath, StrPath, StrOrBytesPath
103 from typing_extensions import Self
104
105
106# Patch: Remove deprecation warning from vendored pkg_resources.
107# Setting PYTHONWARNINGS=error to verify builds produce no warnings
108# causes immediate exceptions.
109# See https://github.com/pypa/pip/issues/12243
110
111
112_T = TypeVar("_T")
113_DistributionT = TypeVar("_DistributionT", bound="Distribution")
114# Type aliases
115_NestedStr = Union[str, Iterable[Union[str, Iterable["_NestedStr"]]]]
116_InstallerTypeT = Callable[["Requirement"], "_DistributionT"]
117_InstallerType = Callable[["Requirement"], Union["Distribution", None]]
118_PkgReqType = Union[str, "Requirement"]
119_EPDistType = Union["Distribution", _PkgReqType]
120_MetadataType = Union["IResourceProvider", None]
121_ResolvedEntryPoint = Any # Can be any attribute in the module
122_ResourceStream = Any # TODO / Incomplete: A readable file-like object
123# Any object works, but let's indicate we expect something like a module (optionally has __loader__ or __file__)
124_ModuleLike = Union[object, types.ModuleType]
125# Any: Should be _ModuleLike but we end up with issues where _ModuleLike doesn't have _ZipLoaderModule's __loader__
126_ProviderFactoryType = Callable[[Any], "IResourceProvider"]
127_DistFinderType = Callable[[_T, str, bool], Iterable["Distribution"]]
128_NSHandlerType = Callable[[_T, str, str, types.ModuleType], Union[str, None]]
129_AdapterT = TypeVar(
130 "_AdapterT", _DistFinderType[Any], _ProviderFactoryType, _NSHandlerType[Any]
131)
132
133
134# Use _typeshed.importlib.LoaderProtocol once available https://github.com/python/typeshed/pull/11890
135class _LoaderProtocol(Protocol):
136 def load_module(self, fullname: str, /) -> types.ModuleType: ...
137
138
139class _ZipLoaderModule(Protocol):
140 __loader__: zipimport.zipimporter
141
142
143_PEP440_FALLBACK = re.compile(r"^v?(?P<safe>(?:[0-9]+!)?[0-9]+(?:\.[0-9]+)*)", re.I)
144
145
146class PEP440Warning(RuntimeWarning):
147 """
148 Used when there is an issue with a version or specifier not complying with
149 PEP 440.
150 """
151
152
153parse_version = _packaging_version.Version
154
155
156_state_vars: dict[str, str] = {}
157
158
159def _declare_state(vartype: str, varname: str, initial_value: _T) -> _T:
160 _state_vars[varname] = vartype
161 return initial_value
162
163
164def __getstate__() -> dict[str, Any]:
165 state = {}
166 g = globals()
167 for k, v in _state_vars.items():
168 state[k] = g['_sget_' + v](g[k])
169 return state
170
171
172def __setstate__(state: dict[str, Any]) -> dict[str, Any]:
173 g = globals()
174 for k, v in state.items():
175 g['_sset_' + _state_vars[k]](k, g[k], v)
176 return state
177
178
179def _sget_dict(val):
180 return val.copy()
181
182
183def _sset_dict(key, ob, state):
184 ob.clear()
185 ob.update(state)
186
187
188def _sget_object(val):
189 return val.__getstate__()
190
191
192def _sset_object(key, ob, state):
193 ob.__setstate__(state)
194
195
196_sget_none = _sset_none = lambda *args: None
197
198
199def get_supported_platform():
200 """Return this platform's maximum compatible version.
201
202 distutils.util.get_platform() normally reports the minimum version
203 of macOS that would be required to *use* extensions produced by
204 distutils. But what we want when checking compatibility is to know the
205 version of macOS that we are *running*. To allow usage of packages that
206 explicitly require a newer version of macOS, we must also know the
207 current version of the OS.
208
209 If this condition occurs for any other platform with a version in its
210 platform strings, this function should be extended accordingly.
211 """
212 plat = get_build_platform()
213 m = macosVersionString.match(plat)
214 if m is not None and sys.platform == "darwin":
215 try:
216 plat = 'macosx-%s-%s' % ('.'.join(_macos_vers()[:2]), m.group(3))
217 except ValueError:
218 # not macOS
219 pass
220 return plat
221
222
223__all__ = [
224 # Basic resource access and distribution/entry point discovery
225 'require',
226 'run_script',
227 'get_provider',
228 'get_distribution',
229 'load_entry_point',
230 'get_entry_map',
231 'get_entry_info',
232 'iter_entry_points',
233 'resource_string',
234 'resource_stream',
235 'resource_filename',
236 'resource_listdir',
237 'resource_exists',
238 'resource_isdir',
239 # Environmental control
240 'declare_namespace',
241 'working_set',
242 'add_activation_listener',
243 'find_distributions',
244 'set_extraction_path',
245 'cleanup_resources',
246 'get_default_cache',
247 # Primary implementation classes
248 'Environment',
249 'WorkingSet',
250 'ResourceManager',
251 'Distribution',
252 'Requirement',
253 'EntryPoint',
254 # Exceptions
255 'ResolutionError',
256 'VersionConflict',
257 'DistributionNotFound',
258 'UnknownExtra',
259 'ExtractionError',
260 # Warnings
261 'PEP440Warning',
262 # Parsing functions and string utilities
263 'parse_requirements',
264 'parse_version',
265 'safe_name',
266 'safe_version',
267 'get_platform',
268 'compatible_platforms',
269 'yield_lines',
270 'split_sections',
271 'safe_extra',
272 'to_filename',
273 'invalid_marker',
274 'evaluate_marker',
275 # filesystem utilities
276 'ensure_directory',
277 'normalize_path',
278 # Distribution "precedence" constants
279 'EGG_DIST',
280 'BINARY_DIST',
281 'SOURCE_DIST',
282 'CHECKOUT_DIST',
283 'DEVELOP_DIST',
284 # "Provider" interfaces, implementations, and registration/lookup APIs
285 'IMetadataProvider',
286 'IResourceProvider',
287 'FileMetadata',
288 'PathMetadata',
289 'EggMetadata',
290 'EmptyProvider',
291 'empty_provider',
292 'NullProvider',
293 'EggProvider',
294 'DefaultProvider',
295 'ZipProvider',
296 'register_finder',
297 'register_namespace_handler',
298 'register_loader_type',
299 'fixup_namespace_packages',
300 'get_importer',
301 # Warnings
302 'PkgResourcesDeprecationWarning',
303 # Deprecated/backward compatibility only
304 'run_main',
305 'AvailableDistributions',
306]
307
308
309class ResolutionError(Exception):
310 """Abstract base for dependency resolution errors"""
311
312 def __repr__(self):
313 return self.__class__.__name__ + repr(self.args)
314
315
316class VersionConflict(ResolutionError):
317 """
318 An already-installed version conflicts with the requested version.
319
320 Should be initialized with the installed Distribution and the requested
321 Requirement.
322 """
323
324 _template = "{self.dist} is installed but {self.req} is required"
325
326 @property
327 def dist(self) -> Distribution:
328 return self.args[0]
329
330 @property
331 def req(self) -> Requirement:
332 return self.args[1]
333
334 def report(self):
335 return self._template.format(**locals())
336
337 def with_context(self, required_by: set[Distribution | str]):
338 """
339 If required_by is non-empty, return a version of self that is a
340 ContextualVersionConflict.
341 """
342 if not required_by:
343 return self
344 args = self.args + (required_by,)
345 return ContextualVersionConflict(*args)
346
347
348class ContextualVersionConflict(VersionConflict):
349 """
350 A VersionConflict that accepts a third parameter, the set of the
351 requirements that required the installed Distribution.
352 """
353
354 _template = VersionConflict._template + ' by {self.required_by}'
355
356 @property
357 def required_by(self) -> set[str]:
358 return self.args[2]
359
360
361class DistributionNotFound(ResolutionError):
362 """A requested distribution was not found"""
363
364 _template = (
365 "The '{self.req}' distribution was not found "
366 "and is required by {self.requirers_str}"
367 )
368
369 @property
370 def req(self) -> Requirement:
371 return self.args[0]
372
373 @property
374 def requirers(self) -> set[str] | None:
375 return self.args[1]
376
377 @property
378 def requirers_str(self):
379 if not self.requirers:
380 return 'the application'
381 return ', '.join(self.requirers)
382
383 def report(self):
384 return self._template.format(**locals())
385
386 def __str__(self):
387 return self.report()
388
389
390class UnknownExtra(ResolutionError):
391 """Distribution doesn't have an "extra feature" of the given name"""
392
393
394_provider_factories: dict[type[_ModuleLike], _ProviderFactoryType] = {}
395
396PY_MAJOR = '{}.{}'.format(*sys.version_info)
397EGG_DIST = 3
398BINARY_DIST = 2
399SOURCE_DIST = 1
400CHECKOUT_DIST = 0
401DEVELOP_DIST = -1
402
403
404def register_loader_type(
405 loader_type: type[_ModuleLike], provider_factory: _ProviderFactoryType
406):
407 """Register `provider_factory` to make providers for `loader_type`
408
409 `loader_type` is the type or class of a PEP 302 ``module.__loader__``,
410 and `provider_factory` is a function that, passed a *module* object,
411 returns an ``IResourceProvider`` for that module.
412 """
413 _provider_factories[loader_type] = provider_factory
414
415
416@overload
417def get_provider(moduleOrReq: str) -> IResourceProvider: ...
418@overload
419def get_provider(moduleOrReq: Requirement) -> Distribution: ...
420def get_provider(moduleOrReq: str | Requirement) -> IResourceProvider | Distribution:
421 """Return an IResourceProvider for the named module or requirement"""
422 if isinstance(moduleOrReq, Requirement):
423 return working_set.find(moduleOrReq) or require(str(moduleOrReq))[0]
424 try:
425 module = sys.modules[moduleOrReq]
426 except KeyError:
427 __import__(moduleOrReq)
428 module = sys.modules[moduleOrReq]
429 loader = getattr(module, '__loader__', None)
430 return _find_adapter(_provider_factories, loader)(module)
431
432
433@functools.lru_cache(maxsize=None)
434def _macos_vers():
435 version = platform.mac_ver()[0]
436 # fallback for MacPorts
437 if version == '':
438 plist = '/System/Library/CoreServices/SystemVersion.plist'
439 if os.path.exists(plist):
440 with open(plist, 'rb') as fh:
441 plist_content = plistlib.load(fh)
442 if 'ProductVersion' in plist_content:
443 version = plist_content['ProductVersion']
444 return version.split('.')
445
446
447def _macos_arch(machine):
448 return {'PowerPC': 'ppc', 'Power_Macintosh': 'ppc'}.get(machine, machine)
449
450
451def get_build_platform():
452 """Return this platform's string for platform-specific distributions
453
454 XXX Currently this is the same as ``distutils.util.get_platform()``, but it
455 needs some hacks for Linux and macOS.
456 """
457 from sysconfig import get_platform
458
459 plat = get_platform()
460 if sys.platform == "darwin" and not plat.startswith('macosx-'):
461 try:
462 version = _macos_vers()
463 machine = os.uname()[4].replace(" ", "_")
464 return "macosx-%d.%d-%s" % (
465 int(version[0]),
466 int(version[1]),
467 _macos_arch(machine),
468 )
469 except ValueError:
470 # if someone is running a non-Mac darwin system, this will fall
471 # through to the default implementation
472 pass
473 return plat
474
475
476macosVersionString = re.compile(r"macosx-(\d+)\.(\d+)-(.*)")
477darwinVersionString = re.compile(r"darwin-(\d+)\.(\d+)\.(\d+)-(.*)")
478# XXX backward compat
479get_platform = get_build_platform
480
481
482def compatible_platforms(provided: str | None, required: str | None):
483 """Can code for the `provided` platform run on the `required` platform?
484
485 Returns true if either platform is ``None``, or the platforms are equal.
486
487 XXX Needs compatibility checks for Linux and other unixy OSes.
488 """
489 if provided is None or required is None or provided == required:
490 # easy case
491 return True
492
493 # macOS special cases
494 reqMac = macosVersionString.match(required)
495 if reqMac:
496 provMac = macosVersionString.match(provided)
497
498 # is this a Mac package?
499 if not provMac:
500 # this is backwards compatibility for packages built before
501 # setuptools 0.6. All packages built after this point will
502 # use the new macOS designation.
503 provDarwin = darwinVersionString.match(provided)
504 if provDarwin:
505 dversion = int(provDarwin.group(1))
506 macosversion = "%s.%s" % (reqMac.group(1), reqMac.group(2))
507 if (
508 dversion == 7
509 and macosversion >= "10.3"
510 or dversion == 8
511 and macosversion >= "10.4"
512 ):
513 return True
514 # egg isn't macOS or legacy darwin
515 return False
516
517 # are they the same major version and machine type?
518 if provMac.group(1) != reqMac.group(1) or provMac.group(3) != reqMac.group(3):
519 return False
520
521 # is the required OS major update >= the provided one?
522 if int(provMac.group(2)) > int(reqMac.group(2)):
523 return False
524
525 return True
526
527 # XXX Linux and other platforms' special cases should go here
528 return False
529
530
531@overload
532def get_distribution(dist: _DistributionT) -> _DistributionT: ...
533@overload
534def get_distribution(dist: _PkgReqType) -> Distribution: ...
535def get_distribution(dist: Distribution | _PkgReqType) -> Distribution:
536 """Return a current distribution object for a Requirement or string"""
537 if isinstance(dist, str):
538 dist = Requirement.parse(dist)
539 if isinstance(dist, Requirement):
540 # Bad type narrowing, dist has to be a Requirement here, so get_provider has to return Distribution
541 dist = get_provider(dist) # type: ignore[assignment]
542 if not isinstance(dist, Distribution):
543 raise TypeError("Expected str, Requirement, or Distribution", dist)
544 return dist
545
546
547def load_entry_point(dist: _EPDistType, group: str, name: str) -> _ResolvedEntryPoint:
548 """Return `name` entry point of `group` for `dist` or raise ImportError"""
549 return get_distribution(dist).load_entry_point(group, name)
550
551
552@overload
553def get_entry_map(
554 dist: _EPDistType, group: None = None
555) -> dict[str, dict[str, EntryPoint]]: ...
556@overload
557def get_entry_map(dist: _EPDistType, group: str) -> dict[str, EntryPoint]: ...
558def get_entry_map(dist: _EPDistType, group: str | None = None):
559 """Return the entry point map for `group`, or the full entry map"""
560 return get_distribution(dist).get_entry_map(group)
561
562
563def get_entry_info(dist: _EPDistType, group: str, name: str):
564 """Return the EntryPoint object for `group`+`name`, or ``None``"""
565 return get_distribution(dist).get_entry_info(group, name)
566
567
568class IMetadataProvider(Protocol):
569 def has_metadata(self, name: str) -> bool:
570 """Does the package's distribution contain the named metadata?"""
571
572 def get_metadata(self, name: str) -> str:
573 """The named metadata resource as a string"""
574
575 def get_metadata_lines(self, name: str) -> Iterator[str]:
576 """Yield named metadata resource as list of non-blank non-comment lines
577
578 Leading and trailing whitespace is stripped from each line, and lines
579 with ``#`` as the first non-blank character are omitted."""
580
581 def metadata_isdir(self, name: str) -> bool:
582 """Is the named metadata a directory? (like ``os.path.isdir()``)"""
583
584 def metadata_listdir(self, name: str) -> list[str]:
585 """List of metadata names in the directory (like ``os.listdir()``)"""
586
587 def run_script(self, script_name: str, namespace: dict[str, Any]) -> None:
588 """Execute the named script in the supplied namespace dictionary"""
589
590
591class IResourceProvider(IMetadataProvider, Protocol):
592 """An object that provides access to package resources"""
593
594 def get_resource_filename(
595 self, manager: ResourceManager, resource_name: str
596 ) -> str:
597 """Return a true filesystem path for `resource_name`
598
599 `manager` must be a ``ResourceManager``"""
600
601 def get_resource_stream(
602 self, manager: ResourceManager, resource_name: str
603 ) -> _ResourceStream:
604 """Return a readable file-like object for `resource_name`
605
606 `manager` must be a ``ResourceManager``"""
607
608 def get_resource_string(
609 self, manager: ResourceManager, resource_name: str
610 ) -> bytes:
611 """Return the contents of `resource_name` as :obj:`bytes`
612
613 `manager` must be a ``ResourceManager``"""
614
615 def has_resource(self, resource_name: str) -> bool:
616 """Does the package contain the named resource?"""
617
618 def resource_isdir(self, resource_name: str) -> bool:
619 """Is the named resource a directory? (like ``os.path.isdir()``)"""
620
621 def resource_listdir(self, resource_name: str) -> list[str]:
622 """List of resource names in the directory (like ``os.listdir()``)"""
623
624
625class WorkingSet:
626 """A collection of active distributions on sys.path (or a similar list)"""
627
628 def __init__(self, entries: Iterable[str] | None = None):
629 """Create working set from list of path entries (default=sys.path)"""
630 self.entries: list[str] = []
631 self.entry_keys = {}
632 self.by_key = {}
633 self.normalized_to_canonical_keys = {}
634 self.callbacks = []
635
636 if entries is None:
637 entries = sys.path
638
639 for entry in entries:
640 self.add_entry(entry)
641
642 @classmethod
643 def _build_master(cls):
644 """
645 Prepare the master working set.
646 """
647 ws = cls()
648 try:
649 from __main__ import __requires__
650 except ImportError:
651 # The main program does not list any requirements
652 return ws
653
654 # ensure the requirements are met
655 try:
656 ws.require(__requires__)
657 except VersionConflict:
658 return cls._build_from_requirements(__requires__)
659
660 return ws
661
662 @classmethod
663 def _build_from_requirements(cls, req_spec):
664 """
665 Build a working set from a requirement spec. Rewrites sys.path.
666 """
667 # try it without defaults already on sys.path
668 # by starting with an empty path
669 ws = cls([])
670 reqs = parse_requirements(req_spec)
671 dists = ws.resolve(reqs, Environment())
672 for dist in dists:
673 ws.add(dist)
674
675 # add any missing entries from sys.path
676 for entry in sys.path:
677 if entry not in ws.entries:
678 ws.add_entry(entry)
679
680 # then copy back to sys.path
681 sys.path[:] = ws.entries
682 return ws
683
684 def add_entry(self, entry: str):
685 """Add a path item to ``.entries``, finding any distributions on it
686
687 ``find_distributions(entry, True)`` is used to find distributions
688 corresponding to the path entry, and they are added. `entry` is
689 always appended to ``.entries``, even if it is already present.
690 (This is because ``sys.path`` can contain the same value more than
691 once, and the ``.entries`` of the ``sys.path`` WorkingSet should always
692 equal ``sys.path``.)
693 """
694 self.entry_keys.setdefault(entry, [])
695 self.entries.append(entry)
696 for dist in find_distributions(entry, True):
697 self.add(dist, entry, False)
698
699 def __contains__(self, dist: Distribution) -> bool:
700 """True if `dist` is the active distribution for its project"""
701 return self.by_key.get(dist.key) == dist
702
703 def find(self, req: Requirement) -> Distribution | None:
704 """Find a distribution matching requirement `req`
705
706 If there is an active distribution for the requested project, this
707 returns it as long as it meets the version requirement specified by
708 `req`. But, if there is an active distribution for the project and it
709 does *not* meet the `req` requirement, ``VersionConflict`` is raised.
710 If there is no active distribution for the requested project, ``None``
711 is returned.
712 """
713 dist = self.by_key.get(req.key)
714
715 if dist is None:
716 canonical_key = self.normalized_to_canonical_keys.get(req.key)
717
718 if canonical_key is not None:
719 req.key = canonical_key
720 dist = self.by_key.get(canonical_key)
721
722 if dist is not None and dist not in req:
723 # XXX add more info
724 raise VersionConflict(dist, req)
725 return dist
726
727 def iter_entry_points(self, group: str, name: str | None = None):
728 """Yield entry point objects from `group` matching `name`
729
730 If `name` is None, yields all entry points in `group` from all
731 distributions in the working set, otherwise only ones matching
732 both `group` and `name` are yielded (in distribution order).
733 """
734 return (
735 entry
736 for dist in self
737 for entry in dist.get_entry_map(group).values()
738 if name is None or name == entry.name
739 )
740
741 def run_script(self, requires: str, script_name: str):
742 """Locate distribution for `requires` and run `script_name` script"""
743 ns = sys._getframe(1).f_globals
744 name = ns['__name__']
745 ns.clear()
746 ns['__name__'] = name
747 self.require(requires)[0].run_script(script_name, ns)
748
749 def __iter__(self) -> Iterator[Distribution]:
750 """Yield distributions for non-duplicate projects in the working set
751
752 The yield order is the order in which the items' path entries were
753 added to the working set.
754 """
755 seen = set()
756 for item in self.entries:
757 if item not in self.entry_keys:
758 # workaround a cache issue
759 continue
760
761 for key in self.entry_keys[item]:
762 if key not in seen:
763 seen.add(key)
764 yield self.by_key[key]
765
766 def add(
767 self,
768 dist: Distribution,
769 entry: str | None = None,
770 insert: bool = True,
771 replace: bool = False,
772 ):
773 """Add `dist` to working set, associated with `entry`
774
775 If `entry` is unspecified, it defaults to the ``.location`` of `dist`.
776 On exit from this routine, `entry` is added to the end of the working
777 set's ``.entries`` (if it wasn't already present).
778
779 `dist` is only added to the working set if it's for a project that
780 doesn't already have a distribution in the set, unless `replace=True`.
781 If it's added, any callbacks registered with the ``subscribe()`` method
782 will be called.
783 """
784 if insert:
785 dist.insert_on(self.entries, entry, replace=replace)
786
787 if entry is None:
788 entry = dist.location
789 keys = self.entry_keys.setdefault(entry, [])
790 keys2 = self.entry_keys.setdefault(dist.location, [])
791 if not replace and dist.key in self.by_key:
792 # ignore hidden distros
793 return
794
795 self.by_key[dist.key] = dist
796 normalized_name = _packaging_utils.canonicalize_name(dist.key)
797 self.normalized_to_canonical_keys[normalized_name] = dist.key
798 if dist.key not in keys:
799 keys.append(dist.key)
800 if dist.key not in keys2:
801 keys2.append(dist.key)
802 self._added_new(dist)
803
804 @overload
805 def resolve(
806 self,
807 requirements: Iterable[Requirement],
808 env: Environment | None,
809 installer: _InstallerTypeT[_DistributionT],
810 replace_conflicting: bool = False,
811 extras: tuple[str, ...] | None = None,
812 ) -> list[_DistributionT]: ...
813 @overload
814 def resolve(
815 self,
816 requirements: Iterable[Requirement],
817 env: Environment | None = None,
818 *,
819 installer: _InstallerTypeT[_DistributionT],
820 replace_conflicting: bool = False,
821 extras: tuple[str, ...] | None = None,
822 ) -> list[_DistributionT]: ...
823 @overload
824 def resolve(
825 self,
826 requirements: Iterable[Requirement],
827 env: Environment | None = None,
828 installer: _InstallerType | None = None,
829 replace_conflicting: bool = False,
830 extras: tuple[str, ...] | None = None,
831 ) -> list[Distribution]: ...
832 def resolve(
833 self,
834 requirements: Iterable[Requirement],
835 env: Environment | None = None,
836 installer: _InstallerType | None | _InstallerTypeT[_DistributionT] = None,
837 replace_conflicting: bool = False,
838 extras: tuple[str, ...] | None = None,
839 ) -> list[Distribution] | list[_DistributionT]:
840 """List all distributions needed to (recursively) meet `requirements`
841
842 `requirements` must be a sequence of ``Requirement`` objects. `env`,
843 if supplied, should be an ``Environment`` instance. If
844 not supplied, it defaults to all distributions available within any
845 entry or distribution in the working set. `installer`, if supplied,
846 will be invoked with each requirement that cannot be met by an
847 already-installed distribution; it should return a ``Distribution`` or
848 ``None``.
849
850 Unless `replace_conflicting=True`, raises a VersionConflict exception
851 if
852 any requirements are found on the path that have the correct name but
853 the wrong version. Otherwise, if an `installer` is supplied it will be
854 invoked to obtain the correct version of the requirement and activate
855 it.
856
857 `extras` is a list of the extras to be used with these requirements.
858 This is important because extra requirements may look like `my_req;
859 extra = "my_extra"`, which would otherwise be interpreted as a purely
860 optional requirement. Instead, we want to be able to assert that these
861 requirements are truly required.
862 """
863
864 # set up the stack
865 requirements = list(requirements)[::-1]
866 # set of processed requirements
867 processed = set()
868 # key -> dist
869 best = {}
870 to_activate = []
871
872 req_extras = _ReqExtras()
873
874 # Mapping of requirement to set of distributions that required it;
875 # useful for reporting info about conflicts.
876 required_by = collections.defaultdict(set)
877
878 while requirements:
879 # process dependencies breadth-first
880 req = requirements.pop(0)
881 if req in processed:
882 # Ignore cyclic or redundant dependencies
883 continue
884
885 if not req_extras.markers_pass(req, extras):
886 continue
887
888 dist = self._resolve_dist(
889 req, best, replace_conflicting, env, installer, required_by, to_activate
890 )
891
892 # push the new requirements onto the stack
893 new_requirements = dist.requires(req.extras)[::-1]
894 requirements.extend(new_requirements)
895
896 # Register the new requirements needed by req
897 for new_requirement in new_requirements:
898 required_by[new_requirement].add(req.project_name)
899 req_extras[new_requirement] = req.extras
900
901 processed.add(req)
902
903 # return list of distros to activate
904 return to_activate
905
906 def _resolve_dist(
907 self, req, best, replace_conflicting, env, installer, required_by, to_activate
908 ) -> Distribution:
909 dist = best.get(req.key)
910 if dist is None:
911 # Find the best distribution and add it to the map
912 dist = self.by_key.get(req.key)
913 if dist is None or (dist not in req and replace_conflicting):
914 ws = self
915 if env is None:
916 if dist is None:
917 env = Environment(self.entries)
918 else:
919 # Use an empty environment and workingset to avoid
920 # any further conflicts with the conflicting
921 # distribution
922 env = Environment([])
923 ws = WorkingSet([])
924 dist = best[req.key] = env.best_match(
925 req, ws, installer, replace_conflicting=replace_conflicting
926 )
927 if dist is None:
928 requirers = required_by.get(req, None)
929 raise DistributionNotFound(req, requirers)
930 to_activate.append(dist)
931 if dist not in req:
932 # Oops, the "best" so far conflicts with a dependency
933 dependent_req = required_by[req]
934 raise VersionConflict(dist, req).with_context(dependent_req)
935 return dist
936
937 @overload
938 def find_plugins(
939 self,
940 plugin_env: Environment,
941 full_env: Environment | None,
942 installer: _InstallerTypeT[_DistributionT],
943 fallback: bool = True,
944 ) -> tuple[list[_DistributionT], dict[Distribution, Exception]]: ...
945 @overload
946 def find_plugins(
947 self,
948 plugin_env: Environment,
949 full_env: Environment | None = None,
950 *,
951 installer: _InstallerTypeT[_DistributionT],
952 fallback: bool = True,
953 ) -> tuple[list[_DistributionT], dict[Distribution, Exception]]: ...
954 @overload
955 def find_plugins(
956 self,
957 plugin_env: Environment,
958 full_env: Environment | None = None,
959 installer: _InstallerType | None = None,
960 fallback: bool = True,
961 ) -> tuple[list[Distribution], dict[Distribution, Exception]]: ...
962 def find_plugins(
963 self,
964 plugin_env: Environment,
965 full_env: Environment | None = None,
966 installer: _InstallerType | None | _InstallerTypeT[_DistributionT] = None,
967 fallback: bool = True,
968 ) -> tuple[
969 list[Distribution] | list[_DistributionT],
970 dict[Distribution, Exception],
971 ]:
972 """Find all activatable distributions in `plugin_env`
973
974 Example usage::
975
976 distributions, errors = working_set.find_plugins(
977 Environment(plugin_dirlist)
978 )
979 # add plugins+libs to sys.path
980 map(working_set.add, distributions)
981 # display errors
982 print('Could not load', errors)
983
984 The `plugin_env` should be an ``Environment`` instance that contains
985 only distributions that are in the project's "plugin directory" or
986 directories. The `full_env`, if supplied, should be an ``Environment``
987 contains all currently-available distributions. If `full_env` is not
988 supplied, one is created automatically from the ``WorkingSet`` this
989 method is called on, which will typically mean that every directory on
990 ``sys.path`` will be scanned for distributions.
991
992 `installer` is a standard installer callback as used by the
993 ``resolve()`` method. The `fallback` flag indicates whether we should
994 attempt to resolve older versions of a plugin if the newest version
995 cannot be resolved.
996
997 This method returns a 2-tuple: (`distributions`, `error_info`), where
998 `distributions` is a list of the distributions found in `plugin_env`
999 that were loadable, along with any other distributions that are needed
1000 to resolve their dependencies. `error_info` is a dictionary mapping
1001 unloadable plugin distributions to an exception instance describing the
1002 error that occurred. Usually this will be a ``DistributionNotFound`` or
1003 ``VersionConflict`` instance.
1004 """
1005
1006 plugin_projects = list(plugin_env)
1007 # scan project names in alphabetic order
1008 plugin_projects.sort()
1009
1010 error_info: dict[Distribution, Exception] = {}
1011 distributions: dict[Distribution, Exception | None] = {}
1012
1013 if full_env is None:
1014 env = Environment(self.entries)
1015 env += plugin_env
1016 else:
1017 env = full_env + plugin_env
1018
1019 shadow_set = self.__class__([])
1020 # put all our entries in shadow_set
1021 list(map(shadow_set.add, self))
1022
1023 for project_name in plugin_projects:
1024 for dist in plugin_env[project_name]:
1025 req = [dist.as_requirement()]
1026
1027 try:
1028 resolvees = shadow_set.resolve(req, env, installer)
1029
1030 except ResolutionError as v:
1031 # save error info
1032 error_info[dist] = v
1033 if fallback:
1034 # try the next older version of project
1035 continue
1036 else:
1037 # give up on this project, keep going
1038 break
1039
1040 else:
1041 list(map(shadow_set.add, resolvees))
1042 distributions.update(dict.fromkeys(resolvees))
1043
1044 # success, no need to try any more versions of this project
1045 break
1046
1047 sorted_distributions = list(distributions)
1048 sorted_distributions.sort()
1049
1050 return sorted_distributions, error_info
1051
1052 def require(self, *requirements: _NestedStr):
1053 """Ensure that distributions matching `requirements` are activated
1054
1055 `requirements` must be a string or a (possibly-nested) sequence
1056 thereof, specifying the distributions and versions required. The
1057 return value is a sequence of the distributions that needed to be
1058 activated to fulfill the requirements; all relevant distributions are
1059 included, even if they were already activated in this working set.
1060 """
1061 needed = self.resolve(parse_requirements(requirements))
1062
1063 for dist in needed:
1064 self.add(dist)
1065
1066 return needed
1067
1068 def subscribe(
1069 self, callback: Callable[[Distribution], object], existing: bool = True
1070 ):
1071 """Invoke `callback` for all distributions
1072
1073 If `existing=True` (default),
1074 call on all existing ones, as well.
1075 """
1076 if callback in self.callbacks:
1077 return
1078 self.callbacks.append(callback)
1079 if not existing:
1080 return
1081 for dist in self:
1082 callback(dist)
1083
1084 def _added_new(self, dist):
1085 for callback in self.callbacks:
1086 callback(dist)
1087
1088 def __getstate__(self):
1089 return (
1090 self.entries[:],
1091 self.entry_keys.copy(),
1092 self.by_key.copy(),
1093 self.normalized_to_canonical_keys.copy(),
1094 self.callbacks[:],
1095 )
1096
1097 def __setstate__(self, e_k_b_n_c):
1098 entries, keys, by_key, normalized_to_canonical_keys, callbacks = e_k_b_n_c
1099 self.entries = entries[:]
1100 self.entry_keys = keys.copy()
1101 self.by_key = by_key.copy()
1102 self.normalized_to_canonical_keys = normalized_to_canonical_keys.copy()
1103 self.callbacks = callbacks[:]
1104
1105
1106class _ReqExtras(Dict["Requirement", Tuple[str, ...]]):
1107 """
1108 Map each requirement to the extras that demanded it.
1109 """
1110
1111 def markers_pass(self, req: Requirement, extras: tuple[str, ...] | None = None):
1112 """
1113 Evaluate markers for req against each extra that
1114 demanded it.
1115
1116 Return False if the req has a marker and fails
1117 evaluation. Otherwise, return True.
1118 """
1119 extra_evals = (
1120 req.marker.evaluate({'extra': extra})
1121 for extra in self.get(req, ()) + (extras or (None,))
1122 )
1123 return not req.marker or any(extra_evals)
1124
1125
1126class Environment:
1127 """Searchable snapshot of distributions on a search path"""
1128
1129 def __init__(
1130 self,
1131 search_path: Iterable[str] | None = None,
1132 platform: str | None = get_supported_platform(),
1133 python: str | None = PY_MAJOR,
1134 ):
1135 """Snapshot distributions available on a search path
1136
1137 Any distributions found on `search_path` are added to the environment.
1138 `search_path` should be a sequence of ``sys.path`` items. If not
1139 supplied, ``sys.path`` is used.
1140
1141 `platform` is an optional string specifying the name of the platform
1142 that platform-specific distributions must be compatible with. If
1143 unspecified, it defaults to the current platform. `python` is an
1144 optional string naming the desired version of Python (e.g. ``'3.6'``);
1145 it defaults to the current version.
1146
1147 You may explicitly set `platform` (and/or `python`) to ``None`` if you
1148 wish to map *all* distributions, not just those compatible with the
1149 running platform or Python version.
1150 """
1151 self._distmap = {}
1152 self.platform = platform
1153 self.python = python
1154 self.scan(search_path)
1155
1156 def can_add(self, dist: Distribution):
1157 """Is distribution `dist` acceptable for this environment?
1158
1159 The distribution must match the platform and python version
1160 requirements specified when this environment was created, or False
1161 is returned.
1162 """
1163 py_compat = (
1164 self.python is None
1165 or dist.py_version is None
1166 or dist.py_version == self.python
1167 )
1168 return py_compat and compatible_platforms(dist.platform, self.platform)
1169
1170 def remove(self, dist: Distribution):
1171 """Remove `dist` from the environment"""
1172 self._distmap[dist.key].remove(dist)
1173
1174 def scan(self, search_path: Iterable[str] | None = None):
1175 """Scan `search_path` for distributions usable in this environment
1176
1177 Any distributions found are added to the environment.
1178 `search_path` should be a sequence of ``sys.path`` items. If not
1179 supplied, ``sys.path`` is used. Only distributions conforming to
1180 the platform/python version defined at initialization are added.
1181 """
1182 if search_path is None:
1183 search_path = sys.path
1184
1185 for item in search_path:
1186 for dist in find_distributions(item):
1187 self.add(dist)
1188
1189 def __getitem__(self, project_name: str) -> list[Distribution]:
1190 """Return a newest-to-oldest list of distributions for `project_name`
1191
1192 Uses case-insensitive `project_name` comparison, assuming all the
1193 project's distributions use their project's name converted to all
1194 lowercase as their key.
1195
1196 """
1197 distribution_key = project_name.lower()
1198 return self._distmap.get(distribution_key, [])
1199
1200 def add(self, dist: Distribution):
1201 """Add `dist` if we ``can_add()`` it and it has not already been added"""
1202 if self.can_add(dist) and dist.has_version():
1203 dists = self._distmap.setdefault(dist.key, [])
1204 if dist not in dists:
1205 dists.append(dist)
1206 dists.sort(key=operator.attrgetter('hashcmp'), reverse=True)
1207
1208 @overload
1209 def best_match(
1210 self,
1211 req: Requirement,
1212 working_set: WorkingSet,
1213 installer: _InstallerTypeT[_DistributionT],
1214 replace_conflicting: bool = False,
1215 ) -> _DistributionT: ...
1216 @overload
1217 def best_match(
1218 self,
1219 req: Requirement,
1220 working_set: WorkingSet,
1221 installer: _InstallerType | None = None,
1222 replace_conflicting: bool = False,
1223 ) -> Distribution | None: ...
1224 def best_match(
1225 self,
1226 req: Requirement,
1227 working_set: WorkingSet,
1228 installer: _InstallerType | None | _InstallerTypeT[_DistributionT] = None,
1229 replace_conflicting: bool = False,
1230 ) -> Distribution | None:
1231 """Find distribution best matching `req` and usable on `working_set`
1232
1233 This calls the ``find(req)`` method of the `working_set` to see if a
1234 suitable distribution is already active. (This may raise
1235 ``VersionConflict`` if an unsuitable version of the project is already
1236 active in the specified `working_set`.) If a suitable distribution
1237 isn't active, this method returns the newest distribution in the
1238 environment that meets the ``Requirement`` in `req`. If no suitable
1239 distribution is found, and `installer` is supplied, then the result of
1240 calling the environment's ``obtain(req, installer)`` method will be
1241 returned.
1242 """
1243 try:
1244 dist = working_set.find(req)
1245 except VersionConflict:
1246 if not replace_conflicting:
1247 raise
1248 dist = None
1249 if dist is not None:
1250 return dist
1251 for dist in self[req.key]:
1252 if dist in req:
1253 return dist
1254 # try to download/install
1255 return self.obtain(req, installer)
1256
1257 @overload
1258 def obtain(
1259 self,
1260 requirement: Requirement,
1261 installer: _InstallerTypeT[_DistributionT],
1262 ) -> _DistributionT: ...
1263 @overload
1264 def obtain(
1265 self,
1266 requirement: Requirement,
1267 installer: Callable[[Requirement], None] | None = None,
1268 ) -> None: ...
1269 @overload
1270 def obtain(
1271 self,
1272 requirement: Requirement,
1273 installer: _InstallerType | None = None,
1274 ) -> Distribution | None: ...
1275 def obtain(
1276 self,
1277 requirement: Requirement,
1278 installer: Callable[[Requirement], None]
1279 | _InstallerType
1280 | None
1281 | _InstallerTypeT[_DistributionT] = None,
1282 ) -> Distribution | None:
1283 """Obtain a distribution matching `requirement` (e.g. via download)
1284
1285 Obtain a distro that matches requirement (e.g. via download). In the
1286 base ``Environment`` class, this routine just returns
1287 ``installer(requirement)``, unless `installer` is None, in which case
1288 None is returned instead. This method is a hook that allows subclasses
1289 to attempt other ways of obtaining a distribution before falling back
1290 to the `installer` argument."""
1291 return installer(requirement) if installer else None
1292
1293 def __iter__(self) -> Iterator[str]:
1294 """Yield the unique project names of the available distributions"""
1295 for key in self._distmap.keys():
1296 if self[key]:
1297 yield key
1298
1299 def __iadd__(self, other: Distribution | Environment):
1300 """In-place addition of a distribution or environment"""
1301 if isinstance(other, Distribution):
1302 self.add(other)
1303 elif isinstance(other, Environment):
1304 for project in other:
1305 for dist in other[project]:
1306 self.add(dist)
1307 else:
1308 raise TypeError("Can't add %r to environment" % (other,))
1309 return self
1310
1311 def __add__(self, other: Distribution | Environment):
1312 """Add an environment or distribution to an environment"""
1313 new = self.__class__([], platform=None, python=None)
1314 for env in self, other:
1315 new += env
1316 return new
1317
1318
1319# XXX backward compatibility
1320AvailableDistributions = Environment
1321
1322
1323class ExtractionError(RuntimeError):
1324 """An error occurred extracting a resource
1325
1326 The following attributes are available from instances of this exception:
1327
1328 manager
1329 The resource manager that raised this exception
1330
1331 cache_path
1332 The base directory for resource extraction
1333
1334 original_error
1335 The exception instance that caused extraction to fail
1336 """
1337
1338 manager: ResourceManager
1339 cache_path: str
1340 original_error: BaseException | None
1341
1342
1343class ResourceManager:
1344 """Manage resource extraction and packages"""
1345
1346 extraction_path: str | None = None
1347
1348 def __init__(self):
1349 self.cached_files = {}
1350
1351 def resource_exists(self, package_or_requirement: _PkgReqType, resource_name: str):
1352 """Does the named resource exist?"""
1353 return get_provider(package_or_requirement).has_resource(resource_name)
1354
1355 def resource_isdir(self, package_or_requirement: _PkgReqType, resource_name: str):
1356 """Is the named resource an existing directory?"""
1357 return get_provider(package_or_requirement).resource_isdir(resource_name)
1358
1359 def resource_filename(
1360 self, package_or_requirement: _PkgReqType, resource_name: str
1361 ):
1362 """Return a true filesystem path for specified resource"""
1363 return get_provider(package_or_requirement).get_resource_filename(
1364 self, resource_name
1365 )
1366
1367 def resource_stream(self, package_or_requirement: _PkgReqType, resource_name: str):
1368 """Return a readable file-like object for specified resource"""
1369 return get_provider(package_or_requirement).get_resource_stream(
1370 self, resource_name
1371 )
1372
1373 def resource_string(
1374 self, package_or_requirement: _PkgReqType, resource_name: str
1375 ) -> bytes:
1376 """Return specified resource as :obj:`bytes`"""
1377 return get_provider(package_or_requirement).get_resource_string(
1378 self, resource_name
1379 )
1380
1381 def resource_listdir(self, package_or_requirement: _PkgReqType, resource_name: str):
1382 """List the contents of the named resource directory"""
1383 return get_provider(package_or_requirement).resource_listdir(resource_name)
1384
1385 def extraction_error(self) -> NoReturn:
1386 """Give an error message for problems extracting file(s)"""
1387
1388 old_exc = sys.exc_info()[1]
1389 cache_path = self.extraction_path or get_default_cache()
1390
1391 tmpl = textwrap.dedent(
1392 """
1393 Can't extract file(s) to egg cache
1394
1395 The following error occurred while trying to extract file(s)
1396 to the Python egg cache:
1397
1398 {old_exc}
1399
1400 The Python egg cache directory is currently set to:
1401
1402 {cache_path}
1403
1404 Perhaps your account does not have write access to this directory?
1405 You can change the cache directory by setting the PYTHON_EGG_CACHE
1406 environment variable to point to an accessible directory.
1407 """
1408 ).lstrip()
1409 err = ExtractionError(tmpl.format(**locals()))
1410 err.manager = self
1411 err.cache_path = cache_path
1412 err.original_error = old_exc
1413 raise err
1414
1415 def get_cache_path(self, archive_name: str, names: Iterable[StrPath] = ()):
1416 """Return absolute location in cache for `archive_name` and `names`
1417
1418 The parent directory of the resulting path will be created if it does
1419 not already exist. `archive_name` should be the base filename of the
1420 enclosing egg (which may not be the name of the enclosing zipfile!),
1421 including its ".egg" extension. `names`, if provided, should be a
1422 sequence of path name parts "under" the egg's extraction location.
1423
1424 This method should only be called by resource providers that need to
1425 obtain an extraction location, and only for names they intend to
1426 extract, as it tracks the generated names for possible cleanup later.
1427 """
1428 extract_path = self.extraction_path or get_default_cache()
1429 target_path = os.path.join(extract_path, archive_name + '-tmp', *names)
1430 try:
1431 _bypass_ensure_directory(target_path)
1432 except Exception:
1433 self.extraction_error()
1434
1435 self._warn_unsafe_extraction_path(extract_path)
1436
1437 self.cached_files[target_path] = True
1438 return target_path
1439
1440 @staticmethod
1441 def _warn_unsafe_extraction_path(path):
1442 """
1443 If the default extraction path is overridden and set to an insecure
1444 location, such as /tmp, it opens up an opportunity for an attacker to
1445 replace an extracted file with an unauthorized payload. Warn the user
1446 if a known insecure location is used.
1447
1448 See Distribute #375 for more details.
1449 """
1450 if os.name == 'nt' and not path.startswith(os.environ['windir']):
1451 # On Windows, permissions are generally restrictive by default
1452 # and temp directories are not writable by other users, so
1453 # bypass the warning.
1454 return
1455 mode = os.stat(path).st_mode
1456 if mode & stat.S_IWOTH or mode & stat.S_IWGRP:
1457 msg = (
1458 "Extraction path is writable by group/others "
1459 "and vulnerable to attack when "
1460 "used with get_resource_filename ({path}). "
1461 "Consider a more secure "
1462 "location (set with .set_extraction_path or the "
1463 "PYTHON_EGG_CACHE environment variable)."
1464 ).format(**locals())
1465 warnings.warn(msg, UserWarning)
1466
1467 def postprocess(self, tempname: StrOrBytesPath, filename: StrOrBytesPath):
1468 """Perform any platform-specific postprocessing of `tempname`
1469
1470 This is where Mac header rewrites should be done; other platforms don't
1471 have anything special they should do.
1472
1473 Resource providers should call this method ONLY after successfully
1474 extracting a compressed resource. They must NOT call it on resources
1475 that are already in the filesystem.
1476
1477 `tempname` is the current (temporary) name of the file, and `filename`
1478 is the name it will be renamed to by the caller after this routine
1479 returns.
1480 """
1481
1482 if os.name == 'posix':
1483 # Make the resource executable
1484 mode = ((os.stat(tempname).st_mode) | 0o555) & 0o7777
1485 os.chmod(tempname, mode)
1486
1487 def set_extraction_path(self, path: str):
1488 """Set the base path where resources will be extracted to, if needed.
1489
1490 If you do not call this routine before any extractions take place, the
1491 path defaults to the return value of ``get_default_cache()``. (Which
1492 is based on the ``PYTHON_EGG_CACHE`` environment variable, with various
1493 platform-specific fallbacks. See that routine's documentation for more
1494 details.)
1495
1496 Resources are extracted to subdirectories of this path based upon
1497 information given by the ``IResourceProvider``. You may set this to a
1498 temporary directory, but then you must call ``cleanup_resources()`` to
1499 delete the extracted files when done. There is no guarantee that
1500 ``cleanup_resources()`` will be able to remove all extracted files.
1501
1502 (Note: you may not change the extraction path for a given resource
1503 manager once resources have been extracted, unless you first call
1504 ``cleanup_resources()``.)
1505 """
1506 if self.cached_files:
1507 raise ValueError("Can't change extraction path, files already extracted")
1508
1509 self.extraction_path = path
1510
1511 def cleanup_resources(self, force: bool = False) -> list[str]:
1512 """
1513 Delete all extracted resource files and directories, returning a list
1514 of the file and directory names that could not be successfully removed.
1515 This function does not have any concurrency protection, so it should
1516 generally only be called when the extraction path is a temporary
1517 directory exclusive to a single process. This method is not
1518 automatically called; you must call it explicitly or register it as an
1519 ``atexit`` function if you wish to ensure cleanup of a temporary
1520 directory used for extractions.
1521 """
1522 # XXX
1523 return []
1524
1525
1526def get_default_cache() -> str:
1527 """
1528 Return the ``PYTHON_EGG_CACHE`` environment variable
1529 or a platform-relevant user cache dir for an app
1530 named "Python-Eggs".
1531 """
1532 return os.environ.get('PYTHON_EGG_CACHE') or _user_cache_dir(appname='Python-Eggs')
1533
1534
1535def safe_name(name: str):
1536 """Convert an arbitrary string to a standard distribution name
1537
1538 Any runs of non-alphanumeric/. characters are replaced with a single '-'.
1539 """
1540 return re.sub('[^A-Za-z0-9.]+', '-', name)
1541
1542
1543def safe_version(version: str):
1544 """
1545 Convert an arbitrary string to a standard version string
1546 """
1547 try:
1548 # normalize the version
1549 return str(_packaging_version.Version(version))
1550 except _packaging_version.InvalidVersion:
1551 version = version.replace(' ', '.')
1552 return re.sub('[^A-Za-z0-9.]+', '-', version)
1553
1554
1555def _forgiving_version(version):
1556 """Fallback when ``safe_version`` is not safe enough
1557 >>> parse_version(_forgiving_version('0.23ubuntu1'))
1558 <Version('0.23.dev0+sanitized.ubuntu1')>
1559 >>> parse_version(_forgiving_version('0.23-'))
1560 <Version('0.23.dev0+sanitized')>
1561 >>> parse_version(_forgiving_version('0.-_'))
1562 <Version('0.dev0+sanitized')>
1563 >>> parse_version(_forgiving_version('42.+?1'))
1564 <Version('42.dev0+sanitized.1')>
1565 >>> parse_version(_forgiving_version('hello world'))
1566 <Version('0.dev0+sanitized.hello.world')>
1567 """
1568 version = version.replace(' ', '.')
1569 match = _PEP440_FALLBACK.search(version)
1570 if match:
1571 safe = match["safe"]
1572 rest = version[len(safe) :]
1573 else:
1574 safe = "0"
1575 rest = version
1576 local = f"sanitized.{_safe_segment(rest)}".strip(".")
1577 return f"{safe}.dev0+{local}"
1578
1579
1580def _safe_segment(segment):
1581 """Convert an arbitrary string into a safe segment"""
1582 segment = re.sub('[^A-Za-z0-9.]+', '-', segment)
1583 segment = re.sub('-[^A-Za-z0-9]+', '-', segment)
1584 return re.sub(r'\.[^A-Za-z0-9]+', '.', segment).strip(".-")
1585
1586
1587def safe_extra(extra: str):
1588 """Convert an arbitrary string to a standard 'extra' name
1589
1590 Any runs of non-alphanumeric characters are replaced with a single '_',
1591 and the result is always lowercased.
1592 """
1593 return re.sub('[^A-Za-z0-9.-]+', '_', extra).lower()
1594
1595
1596def to_filename(name: str):
1597 """Convert a project or version name to its filename-escaped form
1598
1599 Any '-' characters are currently replaced with '_'.
1600 """
1601 return name.replace('-', '_')
1602
1603
1604def invalid_marker(text: str):
1605 """
1606 Validate text as a PEP 508 environment marker; return an exception
1607 if invalid or False otherwise.
1608 """
1609 try:
1610 evaluate_marker(text)
1611 except SyntaxError as e:
1612 e.filename = None
1613 e.lineno = None
1614 return e
1615 return False
1616
1617
1618def evaluate_marker(text: str, extra: str | None = None) -> bool:
1619 """
1620 Evaluate a PEP 508 environment marker.
1621 Return a boolean indicating the marker result in this environment.
1622 Raise SyntaxError if marker is invalid.
1623
1624 This implementation uses the 'pyparsing' module.
1625 """
1626 try:
1627 marker = _packaging_markers.Marker(text)
1628 return marker.evaluate()
1629 except _packaging_markers.InvalidMarker as e:
1630 raise SyntaxError(e) from e
1631
1632
1633class NullProvider:
1634 """Try to implement resources and metadata for arbitrary PEP 302 loaders"""
1635
1636 egg_name: str | None = None
1637 egg_info: str | None = None
1638 loader: _LoaderProtocol | None = None
1639
1640 def __init__(self, module: _ModuleLike):
1641 self.loader = getattr(module, '__loader__', None)
1642 self.module_path = os.path.dirname(getattr(module, '__file__', ''))
1643
1644 def get_resource_filename(self, manager: ResourceManager, resource_name: str):
1645 return self._fn(self.module_path, resource_name)
1646
1647 def get_resource_stream(self, manager: ResourceManager, resource_name: str):
1648 return io.BytesIO(self.get_resource_string(manager, resource_name))
1649
1650 def get_resource_string(
1651 self, manager: ResourceManager, resource_name: str
1652 ) -> bytes:
1653 return self._get(self._fn(self.module_path, resource_name))
1654
1655 def has_resource(self, resource_name: str):
1656 return self._has(self._fn(self.module_path, resource_name))
1657
1658 def _get_metadata_path(self, name):
1659 return self._fn(self.egg_info, name)
1660
1661 def has_metadata(self, name: str) -> bool:
1662 if not self.egg_info:
1663 return False
1664
1665 path = self._get_metadata_path(name)
1666 return self._has(path)
1667
1668 def get_metadata(self, name: str):
1669 if not self.egg_info:
1670 return ""
1671 path = self._get_metadata_path(name)
1672 value = self._get(path)
1673 try:
1674 return value.decode('utf-8')
1675 except UnicodeDecodeError as exc:
1676 # Include the path in the error message to simplify
1677 # troubleshooting, and without changing the exception type.
1678 exc.reason += ' in {} file at path: {}'.format(name, path)
1679 raise
1680
1681 def get_metadata_lines(self, name: str) -> Iterator[str]:
1682 return yield_lines(self.get_metadata(name))
1683
1684 def resource_isdir(self, resource_name: str):
1685 return self._isdir(self._fn(self.module_path, resource_name))
1686
1687 def metadata_isdir(self, name: str) -> bool:
1688 return bool(self.egg_info and self._isdir(self._fn(self.egg_info, name)))
1689
1690 def resource_listdir(self, resource_name: str):
1691 return self._listdir(self._fn(self.module_path, resource_name))
1692
1693 def metadata_listdir(self, name: str) -> list[str]:
1694 if self.egg_info:
1695 return self._listdir(self._fn(self.egg_info, name))
1696 return []
1697
1698 def run_script(self, script_name: str, namespace: dict[str, Any]):
1699 script = 'scripts/' + script_name
1700 if not self.has_metadata(script):
1701 raise ResolutionError(
1702 "Script {script!r} not found in metadata at {self.egg_info!r}".format(
1703 **locals()
1704 ),
1705 )
1706
1707 script_text = self.get_metadata(script).replace('\r\n', '\n')
1708 script_text = script_text.replace('\r', '\n')
1709 script_filename = self._fn(self.egg_info, script)
1710 namespace['__file__'] = script_filename
1711 if os.path.exists(script_filename):
1712 source = _read_utf8_with_fallback(script_filename)
1713 code = compile(source, script_filename, 'exec')
1714 exec(code, namespace, namespace)
1715 else:
1716 from linecache import cache
1717
1718 cache[script_filename] = (
1719 len(script_text),
1720 0,
1721 script_text.split('\n'),
1722 script_filename,
1723 )
1724 script_code = compile(script_text, script_filename, 'exec')
1725 exec(script_code, namespace, namespace)
1726
1727 def _has(self, path) -> bool:
1728 raise NotImplementedError(
1729 "Can't perform this operation for unregistered loader type"
1730 )
1731
1732 def _isdir(self, path) -> bool:
1733 raise NotImplementedError(
1734 "Can't perform this operation for unregistered loader type"
1735 )
1736
1737 def _listdir(self, path) -> list[str]:
1738 raise NotImplementedError(
1739 "Can't perform this operation for unregistered loader type"
1740 )
1741
1742 def _fn(self, base: str | None, resource_name: str):
1743 if base is None:
1744 raise TypeError(
1745 "`base` parameter in `_fn` is `None`. Either override this method or check the parameter first."
1746 )
1747 self._validate_resource_path(resource_name)
1748 if resource_name:
1749 return os.path.join(base, *resource_name.split('/'))
1750 return base
1751
1752 @staticmethod
1753 def _validate_resource_path(path):
1754 """
1755 Validate the resource paths according to the docs.
1756 https://setuptools.pypa.io/en/latest/pkg_resources.html#basic-resource-access
1757
1758 >>> warned = getfixture('recwarn')
1759 >>> warnings.simplefilter('always')
1760 >>> vrp = NullProvider._validate_resource_path
1761 >>> vrp('foo/bar.txt')
1762 >>> bool(warned)
1763 False
1764 >>> vrp('../foo/bar.txt')
1765 >>> bool(warned)
1766 True
1767 >>> warned.clear()
1768 >>> vrp('/foo/bar.txt')
1769 >>> bool(warned)
1770 True
1771 >>> vrp('foo/../../bar.txt')
1772 >>> bool(warned)
1773 True
1774 >>> warned.clear()
1775 >>> vrp('foo/f../bar.txt')
1776 >>> bool(warned)
1777 False
1778
1779 Windows path separators are straight-up disallowed.
1780 >>> vrp(r'\\foo/bar.txt')
1781 Traceback (most recent call last):
1782 ...
1783 ValueError: Use of .. or absolute path in a resource path \
1784is not allowed.
1785
1786 >>> vrp(r'C:\\foo/bar.txt')
1787 Traceback (most recent call last):
1788 ...
1789 ValueError: Use of .. or absolute path in a resource path \
1790is not allowed.
1791
1792 Blank values are allowed
1793
1794 >>> vrp('')
1795 >>> bool(warned)
1796 False
1797
1798 Non-string values are not.
1799
1800 >>> vrp(None)
1801 Traceback (most recent call last):
1802 ...
1803 AttributeError: ...
1804 """
1805 invalid = (
1806 os.path.pardir in path.split(posixpath.sep)
1807 or posixpath.isabs(path)
1808 or ntpath.isabs(path)
1809 or path.startswith("\\")
1810 )
1811 if not invalid:
1812 return
1813
1814 msg = "Use of .. or absolute path in a resource path is not allowed."
1815
1816 # Aggressively disallow Windows absolute paths
1817 if (path.startswith("\\") or ntpath.isabs(path)) and not posixpath.isabs(path):
1818 raise ValueError(msg)
1819
1820 # for compatibility, warn; in future
1821 # raise ValueError(msg)
1822 issue_warning(
1823 msg[:-1] + " and will raise exceptions in a future release.",
1824 DeprecationWarning,
1825 )
1826
1827 def _get(self, path) -> bytes:
1828 if hasattr(self.loader, 'get_data') and self.loader:
1829 # Already checked get_data exists
1830 return self.loader.get_data(path) # type: ignore[attr-defined]
1831 raise NotImplementedError(
1832 "Can't perform this operation for loaders without 'get_data()'"
1833 )
1834
1835
1836register_loader_type(object, NullProvider)
1837
1838
1839def _parents(path):
1840 """
1841 yield all parents of path including path
1842 """
1843 last = None
1844 while path != last:
1845 yield path
1846 last = path
1847 path, _ = os.path.split(path)
1848
1849
1850class EggProvider(NullProvider):
1851 """Provider based on a virtual filesystem"""
1852
1853 def __init__(self, module: _ModuleLike):
1854 super().__init__(module)
1855 self._setup_prefix()
1856
1857 def _setup_prefix(self):
1858 # Assume that metadata may be nested inside a "basket"
1859 # of multiple eggs and use module_path instead of .archive.
1860 eggs = filter(_is_egg_path, _parents(self.module_path))
1861 egg = next(eggs, None)
1862 egg and self._set_egg(egg)
1863
1864 def _set_egg(self, path: str):
1865 self.egg_name = os.path.basename(path)
1866 self.egg_info = os.path.join(path, 'EGG-INFO')
1867 self.egg_root = path
1868
1869
1870class DefaultProvider(EggProvider):
1871 """Provides access to package resources in the filesystem"""
1872
1873 def _has(self, path) -> bool:
1874 return os.path.exists(path)
1875
1876 def _isdir(self, path) -> bool:
1877 return os.path.isdir(path)
1878
1879 def _listdir(self, path):
1880 return os.listdir(path)
1881
1882 def get_resource_stream(self, manager: object, resource_name: str):
1883 return open(self._fn(self.module_path, resource_name), 'rb')
1884
1885 def _get(self, path) -> bytes:
1886 with open(path, 'rb') as stream:
1887 return stream.read()
1888
1889 @classmethod
1890 def _register(cls):
1891 loader_names = (
1892 'SourceFileLoader',
1893 'SourcelessFileLoader',
1894 )
1895 for name in loader_names:
1896 loader_cls = getattr(importlib.machinery, name, type(None))
1897 register_loader_type(loader_cls, cls)
1898
1899
1900DefaultProvider._register()
1901
1902
1903class EmptyProvider(NullProvider):
1904 """Provider that returns nothing for all requests"""
1905
1906 # A special case, we don't want all Providers inheriting from NullProvider to have a potentially None module_path
1907 module_path: str | None = None # type: ignore[assignment]
1908
1909 _isdir = _has = lambda self, path: False
1910
1911 def _get(self, path) -> bytes:
1912 return b''
1913
1914 def _listdir(self, path):
1915 return []
1916
1917 def __init__(self):
1918 pass
1919
1920
1921empty_provider = EmptyProvider()
1922
1923
1924class ZipManifests(Dict[str, "MemoizedZipManifests.manifest_mod"]):
1925 """
1926 zip manifest builder
1927 """
1928
1929 # `path` could be `StrPath | IO[bytes]` but that violates the LSP for `MemoizedZipManifests.load`
1930 @classmethod
1931 def build(cls, path: str):
1932 """
1933 Build a dictionary similar to the zipimport directory
1934 caches, except instead of tuples, store ZipInfo objects.
1935
1936 Use a platform-specific path separator (os.sep) for the path keys
1937 for compatibility with pypy on Windows.
1938 """
1939 with zipfile.ZipFile(path) as zfile:
1940 items = (
1941 (
1942 name.replace('/', os.sep),
1943 zfile.getinfo(name),
1944 )
1945 for name in zfile.namelist()
1946 )
1947 return dict(items)
1948
1949 load = build
1950
1951
1952class MemoizedZipManifests(ZipManifests):
1953 """
1954 Memoized zipfile manifests.
1955 """
1956
1957 class manifest_mod(NamedTuple):
1958 manifest: dict[str, zipfile.ZipInfo]
1959 mtime: float
1960
1961 def load(self, path: str) -> dict[str, zipfile.ZipInfo]: # type: ignore[override] # ZipManifests.load is a classmethod
1962 """
1963 Load a manifest at path or return a suitable manifest already loaded.
1964 """
1965 path = os.path.normpath(path)
1966 mtime = os.stat(path).st_mtime
1967
1968 if path not in self or self[path].mtime != mtime:
1969 manifest = self.build(path)
1970 self[path] = self.manifest_mod(manifest, mtime)
1971
1972 return self[path].manifest
1973
1974
1975class ZipProvider(EggProvider):
1976 """Resource support for zips and eggs"""
1977
1978 eagers: list[str] | None = None
1979 _zip_manifests = MemoizedZipManifests()
1980 # ZipProvider's loader should always be a zipimporter or equivalent
1981 loader: zipimport.zipimporter
1982
1983 def __init__(self, module: _ZipLoaderModule):
1984 super().__init__(module)
1985 self.zip_pre = self.loader.archive + os.sep
1986
1987 def _zipinfo_name(self, fspath):
1988 # Convert a virtual filename (full path to file) into a zipfile subpath
1989 # usable with the zipimport directory cache for our target archive
1990 fspath = fspath.rstrip(os.sep)
1991 if fspath == self.loader.archive:
1992 return ''
1993 if fspath.startswith(self.zip_pre):
1994 return fspath[len(self.zip_pre) :]
1995 raise AssertionError("%s is not a subpath of %s" % (fspath, self.zip_pre))
1996
1997 def _parts(self, zip_path):
1998 # Convert a zipfile subpath into an egg-relative path part list.
1999 # pseudo-fs path
2000 fspath = self.zip_pre + zip_path
2001 if fspath.startswith(self.egg_root + os.sep):
2002 return fspath[len(self.egg_root) + 1 :].split(os.sep)
2003 raise AssertionError("%s is not a subpath of %s" % (fspath, self.egg_root))
2004
2005 @property
2006 def zipinfo(self):
2007 return self._zip_manifests.load(self.loader.archive)
2008
2009 def get_resource_filename(self, manager: ResourceManager, resource_name: str):
2010 if not self.egg_name:
2011 raise NotImplementedError(
2012 "resource_filename() only supported for .egg, not .zip"
2013 )
2014 # no need to lock for extraction, since we use temp names
2015 zip_path = self._resource_to_zip(resource_name)
2016 eagers = self._get_eager_resources()
2017 if '/'.join(self._parts(zip_path)) in eagers:
2018 for name in eagers:
2019 self._extract_resource(manager, self._eager_to_zip(name))
2020 return self._extract_resource(manager, zip_path)
2021
2022 @staticmethod
2023 def _get_date_and_size(zip_stat):
2024 size = zip_stat.file_size
2025 # ymdhms+wday, yday, dst
2026 date_time = zip_stat.date_time + (0, 0, -1)
2027 # 1980 offset already done
2028 timestamp = time.mktime(date_time)
2029 return timestamp, size
2030
2031 # FIXME: 'ZipProvider._extract_resource' is too complex (12)
2032 def _extract_resource(self, manager: ResourceManager, zip_path) -> str: # noqa: C901
2033 if zip_path in self._index():
2034 for name in self._index()[zip_path]:
2035 last = self._extract_resource(manager, os.path.join(zip_path, name))
2036 # return the extracted directory name
2037 return os.path.dirname(last)
2038
2039 timestamp, size = self._get_date_and_size(self.zipinfo[zip_path])
2040
2041 if not WRITE_SUPPORT:
2042 raise OSError(
2043 '"os.rename" and "os.unlink" are not supported on this platform'
2044 )
2045 try:
2046 if not self.egg_name:
2047 raise OSError(
2048 '"egg_name" is empty. This likely means no egg could be found from the "module_path".'
2049 )
2050 real_path = manager.get_cache_path(self.egg_name, self._parts(zip_path))
2051
2052 if self._is_current(real_path, zip_path):
2053 return real_path
2054
2055 outf, tmpnam = _mkstemp(
2056 ".$extract",
2057 dir=os.path.dirname(real_path),
2058 )
2059 os.write(outf, self.loader.get_data(zip_path))
2060 os.close(outf)
2061 utime(tmpnam, (timestamp, timestamp))
2062 manager.postprocess(tmpnam, real_path)
2063
2064 try:
2065 rename(tmpnam, real_path)
2066
2067 except OSError:
2068 if os.path.isfile(real_path):
2069 if self._is_current(real_path, zip_path):
2070 # the file became current since it was checked above,
2071 # so proceed.
2072 return real_path
2073 # Windows, del old file and retry
2074 elif os.name == 'nt':
2075 unlink(real_path)
2076 rename(tmpnam, real_path)
2077 return real_path
2078 raise
2079
2080 except OSError:
2081 # report a user-friendly error
2082 manager.extraction_error()
2083
2084 return real_path
2085
2086 def _is_current(self, file_path, zip_path):
2087 """
2088 Return True if the file_path is current for this zip_path
2089 """
2090 timestamp, size = self._get_date_and_size(self.zipinfo[zip_path])
2091 if not os.path.isfile(file_path):
2092 return False
2093 stat = os.stat(file_path)
2094 if stat.st_size != size or stat.st_mtime != timestamp:
2095 return False
2096 # check that the contents match
2097 zip_contents = self.loader.get_data(zip_path)
2098 with open(file_path, 'rb') as f:
2099 file_contents = f.read()
2100 return zip_contents == file_contents
2101
2102 def _get_eager_resources(self):
2103 if self.eagers is None:
2104 eagers = []
2105 for name in ('native_libs.txt', 'eager_resources.txt'):
2106 if self.has_metadata(name):
2107 eagers.extend(self.get_metadata_lines(name))
2108 self.eagers = eagers
2109 return self.eagers
2110
2111 def _index(self):
2112 try:
2113 return self._dirindex
2114 except AttributeError:
2115 ind = {}
2116 for path in self.zipinfo:
2117 parts = path.split(os.sep)
2118 while parts:
2119 parent = os.sep.join(parts[:-1])
2120 if parent in ind:
2121 ind[parent].append(parts[-1])
2122 break
2123 else:
2124 ind[parent] = [parts.pop()]
2125 self._dirindex = ind
2126 return ind
2127
2128 def _has(self, fspath) -> bool:
2129 zip_path = self._zipinfo_name(fspath)
2130 return zip_path in self.zipinfo or zip_path in self._index()
2131
2132 def _isdir(self, fspath) -> bool:
2133 return self._zipinfo_name(fspath) in self._index()
2134
2135 def _listdir(self, fspath):
2136 return list(self._index().get(self._zipinfo_name(fspath), ()))
2137
2138 def _eager_to_zip(self, resource_name: str):
2139 return self._zipinfo_name(self._fn(self.egg_root, resource_name))
2140
2141 def _resource_to_zip(self, resource_name: str):
2142 return self._zipinfo_name(self._fn(self.module_path, resource_name))
2143
2144
2145register_loader_type(zipimport.zipimporter, ZipProvider)
2146
2147
2148class FileMetadata(EmptyProvider):
2149 """Metadata handler for standalone PKG-INFO files
2150
2151 Usage::
2152
2153 metadata = FileMetadata("/path/to/PKG-INFO")
2154
2155 This provider rejects all data and metadata requests except for PKG-INFO,
2156 which is treated as existing, and will be the contents of the file at
2157 the provided location.
2158 """
2159
2160 def __init__(self, path: StrPath):
2161 self.path = path
2162
2163 def _get_metadata_path(self, name):
2164 return self.path
2165
2166 def has_metadata(self, name: str) -> bool:
2167 return name == 'PKG-INFO' and os.path.isfile(self.path)
2168
2169 def get_metadata(self, name: str):
2170 if name != 'PKG-INFO':
2171 raise KeyError("No metadata except PKG-INFO is available")
2172
2173 with open(self.path, encoding='utf-8', errors="replace") as f:
2174 metadata = f.read()
2175 self._warn_on_replacement(metadata)
2176 return metadata
2177
2178 def _warn_on_replacement(self, metadata):
2179 replacement_char = '�'
2180 if replacement_char in metadata:
2181 tmpl = "{self.path} could not be properly decoded in UTF-8"
2182 msg = tmpl.format(**locals())
2183 warnings.warn(msg)
2184
2185 def get_metadata_lines(self, name: str) -> Iterator[str]:
2186 return yield_lines(self.get_metadata(name))
2187
2188
2189class PathMetadata(DefaultProvider):
2190 """Metadata provider for egg directories
2191
2192 Usage::
2193
2194 # Development eggs:
2195
2196 egg_info = "/path/to/PackageName.egg-info"
2197 base_dir = os.path.dirname(egg_info)
2198 metadata = PathMetadata(base_dir, egg_info)
2199 dist_name = os.path.splitext(os.path.basename(egg_info))[0]
2200 dist = Distribution(basedir, project_name=dist_name, metadata=metadata)
2201
2202 # Unpacked egg directories:
2203
2204 egg_path = "/path/to/PackageName-ver-pyver-etc.egg"
2205 metadata = PathMetadata(egg_path, os.path.join(egg_path,'EGG-INFO'))
2206 dist = Distribution.from_filename(egg_path, metadata=metadata)
2207 """
2208
2209 def __init__(self, path: str, egg_info: str):
2210 self.module_path = path
2211 self.egg_info = egg_info
2212
2213
2214class EggMetadata(ZipProvider):
2215 """Metadata provider for .egg files"""
2216
2217 def __init__(self, importer: zipimport.zipimporter):
2218 """Create a metadata provider from a zipimporter"""
2219
2220 self.zip_pre = importer.archive + os.sep
2221 self.loader = importer
2222 if importer.prefix:
2223 self.module_path = os.path.join(importer.archive, importer.prefix)
2224 else:
2225 self.module_path = importer.archive
2226 self._setup_prefix()
2227
2228
2229_distribution_finders: dict[type, _DistFinderType[Any]] = _declare_state(
2230 'dict', '_distribution_finders', {}
2231)
2232
2233
2234def register_finder(importer_type: type[_T], distribution_finder: _DistFinderType[_T]):
2235 """Register `distribution_finder` to find distributions in sys.path items
2236
2237 `importer_type` is the type or class of a PEP 302 "Importer" (sys.path item
2238 handler), and `distribution_finder` is a callable that, passed a path
2239 item and the importer instance, yields ``Distribution`` instances found on
2240 that path item. See ``pkg_resources.find_on_path`` for an example."""
2241 _distribution_finders[importer_type] = distribution_finder
2242
2243
2244def find_distributions(path_item: str, only: bool = False):
2245 """Yield distributions accessible via `path_item`"""
2246 importer = get_importer(path_item)
2247 finder = _find_adapter(_distribution_finders, importer)
2248 return finder(importer, path_item, only)
2249
2250
2251def find_eggs_in_zip(
2252 importer: zipimport.zipimporter, path_item: str, only: bool = False
2253) -> Iterator[Distribution]:
2254 """
2255 Find eggs in zip files; possibly multiple nested eggs.
2256 """
2257 if importer.archive.endswith('.whl'):
2258 # wheels are not supported with this finder
2259 # they don't have PKG-INFO metadata, and won't ever contain eggs
2260 return
2261 metadata = EggMetadata(importer)
2262 if metadata.has_metadata('PKG-INFO'):
2263 yield Distribution.from_filename(path_item, metadata=metadata)
2264 if only:
2265 # don't yield nested distros
2266 return
2267 for subitem in metadata.resource_listdir(''):
2268 if _is_egg_path(subitem):
2269 subpath = os.path.join(path_item, subitem)
2270 dists = find_eggs_in_zip(zipimport.zipimporter(subpath), subpath)
2271 yield from dists
2272 elif subitem.lower().endswith(('.dist-info', '.egg-info')):
2273 subpath = os.path.join(path_item, subitem)
2274 submeta = EggMetadata(zipimport.zipimporter(subpath))
2275 submeta.egg_info = subpath
2276 yield Distribution.from_location(path_item, subitem, submeta)
2277
2278
2279register_finder(zipimport.zipimporter, find_eggs_in_zip)
2280
2281
2282def find_nothing(
2283 importer: object | None, path_item: str | None, only: bool | None = False
2284):
2285 return ()
2286
2287
2288register_finder(object, find_nothing)
2289
2290
2291def find_on_path(importer: object | None, path_item, only=False):
2292 """Yield distributions accessible on a sys.path directory"""
2293 path_item = _normalize_cached(path_item)
2294
2295 if _is_unpacked_egg(path_item):
2296 yield Distribution.from_filename(
2297 path_item,
2298 metadata=PathMetadata(path_item, os.path.join(path_item, 'EGG-INFO')),
2299 )
2300 return
2301
2302 entries = (os.path.join(path_item, child) for child in safe_listdir(path_item))
2303
2304 # scan for .egg and .egg-info in directory
2305 for entry in sorted(entries):
2306 fullpath = os.path.join(path_item, entry)
2307 factory = dist_factory(path_item, entry, only)
2308 yield from factory(fullpath)
2309
2310
2311def dist_factory(path_item, entry, only):
2312 """Return a dist_factory for the given entry."""
2313 lower = entry.lower()
2314 is_egg_info = lower.endswith('.egg-info')
2315 is_dist_info = lower.endswith('.dist-info') and os.path.isdir(
2316 os.path.join(path_item, entry)
2317 )
2318 is_meta = is_egg_info or is_dist_info
2319 return (
2320 distributions_from_metadata
2321 if is_meta
2322 else find_distributions
2323 if not only and _is_egg_path(entry)
2324 else resolve_egg_link
2325 if not only and lower.endswith('.egg-link')
2326 else NoDists()
2327 )
2328
2329
2330class NoDists:
2331 """
2332 >>> bool(NoDists())
2333 False
2334
2335 >>> list(NoDists()('anything'))
2336 []
2337 """
2338
2339 def __bool__(self):
2340 return False
2341
2342 def __call__(self, fullpath):
2343 return iter(())
2344
2345
2346def safe_listdir(path: StrOrBytesPath):
2347 """
2348 Attempt to list contents of path, but suppress some exceptions.
2349 """
2350 try:
2351 return os.listdir(path)
2352 except (PermissionError, NotADirectoryError):
2353 pass
2354 except OSError as e:
2355 # Ignore the directory if does not exist, not a directory or
2356 # permission denied
2357 if e.errno not in (errno.ENOTDIR, errno.EACCES, errno.ENOENT):
2358 raise
2359 return ()
2360
2361
2362def distributions_from_metadata(path: str):
2363 root = os.path.dirname(path)
2364 if os.path.isdir(path):
2365 if len(os.listdir(path)) == 0:
2366 # empty metadata dir; skip
2367 return
2368 metadata: _MetadataType = PathMetadata(root, path)
2369 else:
2370 metadata = FileMetadata(path)
2371 entry = os.path.basename(path)
2372 yield Distribution.from_location(
2373 root,
2374 entry,
2375 metadata,
2376 precedence=DEVELOP_DIST,
2377 )
2378
2379
2380def non_empty_lines(path):
2381 """
2382 Yield non-empty lines from file at path
2383 """
2384 for line in _read_utf8_with_fallback(path).splitlines():
2385 line = line.strip()
2386 if line:
2387 yield line
2388
2389
2390def resolve_egg_link(path):
2391 """
2392 Given a path to an .egg-link, resolve distributions
2393 present in the referenced path.
2394 """
2395 referenced_paths = non_empty_lines(path)
2396 resolved_paths = (
2397 os.path.join(os.path.dirname(path), ref) for ref in referenced_paths
2398 )
2399 dist_groups = map(find_distributions, resolved_paths)
2400 return next(dist_groups, ())
2401
2402
2403if hasattr(pkgutil, 'ImpImporter'):
2404 register_finder(pkgutil.ImpImporter, find_on_path)
2405
2406register_finder(importlib.machinery.FileFinder, find_on_path)
2407
2408_namespace_handlers: dict[type, _NSHandlerType[Any]] = _declare_state(
2409 'dict', '_namespace_handlers', {}
2410)
2411_namespace_packages: dict[str | None, list[str]] = _declare_state(
2412 'dict', '_namespace_packages', {}
2413)
2414
2415
2416def register_namespace_handler(
2417 importer_type: type[_T], namespace_handler: _NSHandlerType[_T]
2418):
2419 """Register `namespace_handler` to declare namespace packages
2420
2421 `importer_type` is the type or class of a PEP 302 "Importer" (sys.path item
2422 handler), and `namespace_handler` is a callable like this::
2423
2424 def namespace_handler(importer, path_entry, moduleName, module):
2425 # return a path_entry to use for child packages
2426
2427 Namespace handlers are only called if the importer object has already
2428 agreed that it can handle the relevant path item, and they should only
2429 return a subpath if the module __path__ does not already contain an
2430 equivalent subpath. For an example namespace handler, see
2431 ``pkg_resources.file_ns_handler``.
2432 """
2433 _namespace_handlers[importer_type] = namespace_handler
2434
2435
2436def _handle_ns(packageName, path_item):
2437 """Ensure that named package includes a subpath of path_item (if needed)"""
2438
2439 importer = get_importer(path_item)
2440 if importer is None:
2441 return None
2442
2443 # use find_spec (PEP 451) and fall-back to find_module (PEP 302)
2444 try:
2445 spec = importer.find_spec(packageName)
2446 except AttributeError:
2447 # capture warnings due to #1111
2448 with warnings.catch_warnings():
2449 warnings.simplefilter("ignore")
2450 loader = importer.find_module(packageName)
2451 else:
2452 loader = spec.loader if spec else None
2453
2454 if loader is None:
2455 return None
2456 module = sys.modules.get(packageName)
2457 if module is None:
2458 module = sys.modules[packageName] = types.ModuleType(packageName)
2459 module.__path__ = []
2460 _set_parent_ns(packageName)
2461 elif not hasattr(module, '__path__'):
2462 raise TypeError("Not a package:", packageName)
2463 handler = _find_adapter(_namespace_handlers, importer)
2464 subpath = handler(importer, path_item, packageName, module)
2465 if subpath is not None:
2466 path = module.__path__
2467 path.append(subpath)
2468 importlib.import_module(packageName)
2469 _rebuild_mod_path(path, packageName, module)
2470 return subpath
2471
2472
2473def _rebuild_mod_path(orig_path, package_name, module: types.ModuleType):
2474 """
2475 Rebuild module.__path__ ensuring that all entries are ordered
2476 corresponding to their sys.path order
2477 """
2478 sys_path = [_normalize_cached(p) for p in sys.path]
2479
2480 def safe_sys_path_index(entry):
2481 """
2482 Workaround for #520 and #513.
2483 """
2484 try:
2485 return sys_path.index(entry)
2486 except ValueError:
2487 return float('inf')
2488
2489 def position_in_sys_path(path):
2490 """
2491 Return the ordinal of the path based on its position in sys.path
2492 """
2493 path_parts = path.split(os.sep)
2494 module_parts = package_name.count('.') + 1
2495 parts = path_parts[:-module_parts]
2496 return safe_sys_path_index(_normalize_cached(os.sep.join(parts)))
2497
2498 new_path = sorted(orig_path, key=position_in_sys_path)
2499 new_path = [_normalize_cached(p) for p in new_path]
2500
2501 if isinstance(module.__path__, list):
2502 module.__path__[:] = new_path
2503 else:
2504 module.__path__ = new_path
2505
2506
2507def declare_namespace(packageName: str):
2508 """Declare that package 'packageName' is a namespace package"""
2509
2510 msg = (
2511 f"Deprecated call to `pkg_resources.declare_namespace({packageName!r})`.\n"
2512 "Implementing implicit namespace packages (as specified in PEP 420) "
2513 "is preferred to `pkg_resources.declare_namespace`. "
2514 "See https://setuptools.pypa.io/en/latest/references/"
2515 "keywords.html#keyword-namespace-packages"
2516 )
2517 warnings.warn(msg, DeprecationWarning, stacklevel=2)
2518
2519 _imp.acquire_lock()
2520 try:
2521 if packageName in _namespace_packages:
2522 return
2523
2524 path: MutableSequence[str] = sys.path
2525 parent, _, _ = packageName.rpartition('.')
2526
2527 if parent:
2528 declare_namespace(parent)
2529 if parent not in _namespace_packages:
2530 __import__(parent)
2531 try:
2532 path = sys.modules[parent].__path__
2533 except AttributeError as e:
2534 raise TypeError("Not a package:", parent) from e
2535
2536 # Track what packages are namespaces, so when new path items are added,
2537 # they can be updated
2538 _namespace_packages.setdefault(parent or None, []).append(packageName)
2539 _namespace_packages.setdefault(packageName, [])
2540
2541 for path_item in path:
2542 # Ensure all the parent's path items are reflected in the child,
2543 # if they apply
2544 _handle_ns(packageName, path_item)
2545
2546 finally:
2547 _imp.release_lock()
2548
2549
2550def fixup_namespace_packages(path_item: str, parent: str | None = None):
2551 """Ensure that previously-declared namespace packages include path_item"""
2552 _imp.acquire_lock()
2553 try:
2554 for package in _namespace_packages.get(parent, ()):
2555 subpath = _handle_ns(package, path_item)
2556 if subpath:
2557 fixup_namespace_packages(subpath, package)
2558 finally:
2559 _imp.release_lock()
2560
2561
2562def file_ns_handler(
2563 importer: object,
2564 path_item: StrPath,
2565 packageName: str,
2566 module: types.ModuleType,
2567):
2568 """Compute an ns-package subpath for a filesystem or zipfile importer"""
2569
2570 subpath = os.path.join(path_item, packageName.split('.')[-1])
2571 normalized = _normalize_cached(subpath)
2572 for item in module.__path__:
2573 if _normalize_cached(item) == normalized:
2574 break
2575 else:
2576 # Only return the path if it's not already there
2577 return subpath
2578
2579
2580if hasattr(pkgutil, 'ImpImporter'):
2581 register_namespace_handler(pkgutil.ImpImporter, file_ns_handler)
2582
2583register_namespace_handler(zipimport.zipimporter, file_ns_handler)
2584register_namespace_handler(importlib.machinery.FileFinder, file_ns_handler)
2585
2586
2587def null_ns_handler(
2588 importer: object,
2589 path_item: str | None,
2590 packageName: str | None,
2591 module: _ModuleLike | None,
2592):
2593 return None
2594
2595
2596register_namespace_handler(object, null_ns_handler)
2597
2598
2599@overload
2600def normalize_path(filename: StrPath) -> str: ...
2601@overload
2602def normalize_path(filename: BytesPath) -> bytes: ...
2603def normalize_path(filename: StrOrBytesPath):
2604 """Normalize a file/dir name for comparison purposes"""
2605 return os.path.normcase(os.path.realpath(os.path.normpath(_cygwin_patch(filename))))
2606
2607
2608def _cygwin_patch(filename: StrOrBytesPath): # pragma: nocover
2609 """
2610 Contrary to POSIX 2008, on Cygwin, getcwd (3) contains
2611 symlink components. Using
2612 os.path.abspath() works around this limitation. A fix in os.getcwd()
2613 would probably better, in Cygwin even more so, except
2614 that this seems to be by design...
2615 """
2616 return os.path.abspath(filename) if sys.platform == 'cygwin' else filename
2617
2618
2619if TYPE_CHECKING:
2620 # https://github.com/python/mypy/issues/16261
2621 # https://github.com/python/typeshed/issues/6347
2622 @overload
2623 def _normalize_cached(filename: StrPath) -> str: ...
2624 @overload
2625 def _normalize_cached(filename: BytesPath) -> bytes: ...
2626 def _normalize_cached(filename: StrOrBytesPath) -> str | bytes: ...
2627else:
2628
2629 @functools.lru_cache(maxsize=None)
2630 def _normalize_cached(filename):
2631 return normalize_path(filename)
2632
2633
2634def _is_egg_path(path):
2635 """
2636 Determine if given path appears to be an egg.
2637 """
2638 return _is_zip_egg(path) or _is_unpacked_egg(path)
2639
2640
2641def _is_zip_egg(path):
2642 return (
2643 path.lower().endswith('.egg')
2644 and os.path.isfile(path)
2645 and zipfile.is_zipfile(path)
2646 )
2647
2648
2649def _is_unpacked_egg(path):
2650 """
2651 Determine if given path appears to be an unpacked egg.
2652 """
2653 return path.lower().endswith('.egg') and os.path.isfile(
2654 os.path.join(path, 'EGG-INFO', 'PKG-INFO')
2655 )
2656
2657
2658def _set_parent_ns(packageName):
2659 parts = packageName.split('.')
2660 name = parts.pop()
2661 if parts:
2662 parent = '.'.join(parts)
2663 setattr(sys.modules[parent], name, sys.modules[packageName])
2664
2665
2666MODULE = re.compile(r"\w+(\.\w+)*$").match
2667EGG_NAME = re.compile(
2668 r"""
2669 (?P<name>[^-]+) (
2670 -(?P<ver>[^-]+) (
2671 -py(?P<pyver>[^-]+) (
2672 -(?P<plat>.+)
2673 )?
2674 )?
2675 )?
2676 """,
2677 re.VERBOSE | re.IGNORECASE,
2678).match
2679
2680
2681class EntryPoint:
2682 """Object representing an advertised importable object"""
2683
2684 def __init__(
2685 self,
2686 name: str,
2687 module_name: str,
2688 attrs: Iterable[str] = (),
2689 extras: Iterable[str] = (),
2690 dist: Distribution | None = None,
2691 ):
2692 if not MODULE(module_name):
2693 raise ValueError("Invalid module name", module_name)
2694 self.name = name
2695 self.module_name = module_name
2696 self.attrs = tuple(attrs)
2697 self.extras = tuple(extras)
2698 self.dist = dist
2699
2700 def __str__(self):
2701 s = "%s = %s" % (self.name, self.module_name)
2702 if self.attrs:
2703 s += ':' + '.'.join(self.attrs)
2704 if self.extras:
2705 s += ' [%s]' % ','.join(self.extras)
2706 return s
2707
2708 def __repr__(self):
2709 return "EntryPoint.parse(%r)" % str(self)
2710
2711 @overload
2712 def load(
2713 self,
2714 require: Literal[True] = True,
2715 env: Environment | None = None,
2716 installer: _InstallerType | None = None,
2717 ) -> _ResolvedEntryPoint: ...
2718 @overload
2719 def load(
2720 self,
2721 require: Literal[False],
2722 *args: Any,
2723 **kwargs: Any,
2724 ) -> _ResolvedEntryPoint: ...
2725 def load(
2726 self,
2727 require: bool = True,
2728 *args: Environment | _InstallerType | None,
2729 **kwargs: Environment | _InstallerType | None,
2730 ) -> _ResolvedEntryPoint:
2731 """
2732 Require packages for this EntryPoint, then resolve it.
2733 """
2734 if not require or args or kwargs:
2735 warnings.warn(
2736 "Parameters to load are deprecated. Call .resolve and "
2737 ".require separately.",
2738 PkgResourcesDeprecationWarning,
2739 stacklevel=2,
2740 )
2741 if require:
2742 # We could pass `env` and `installer` directly,
2743 # but keeping `*args` and `**kwargs` for backwards compatibility
2744 self.require(*args, **kwargs) # type: ignore
2745 return self.resolve()
2746
2747 def resolve(self) -> _ResolvedEntryPoint:
2748 """
2749 Resolve the entry point from its module and attrs.
2750 """
2751 module = __import__(self.module_name, fromlist=['__name__'], level=0)
2752 try:
2753 return functools.reduce(getattr, self.attrs, module)
2754 except AttributeError as exc:
2755 raise ImportError(str(exc)) from exc
2756
2757 def require(
2758 self,
2759 env: Environment | None = None,
2760 installer: _InstallerType | None = None,
2761 ):
2762 if not self.dist:
2763 error_cls = UnknownExtra if self.extras else AttributeError
2764 raise error_cls("Can't require() without a distribution", self)
2765
2766 # Get the requirements for this entry point with all its extras and
2767 # then resolve them. We have to pass `extras` along when resolving so
2768 # that the working set knows what extras we want. Otherwise, for
2769 # dist-info distributions, the working set will assume that the
2770 # requirements for that extra are purely optional and skip over them.
2771 reqs = self.dist.requires(self.extras)
2772 items = working_set.resolve(reqs, env, installer, extras=self.extras)
2773 list(map(working_set.add, items))
2774
2775 pattern = re.compile(
2776 r'\s*'
2777 r'(?P<name>.+?)\s*'
2778 r'=\s*'
2779 r'(?P<module>[\w.]+)\s*'
2780 r'(:\s*(?P<attr>[\w.]+))?\s*'
2781 r'(?P<extras>\[.*\])?\s*$'
2782 )
2783
2784 @classmethod
2785 def parse(cls, src: str, dist: Distribution | None = None):
2786 """Parse a single entry point from string `src`
2787
2788 Entry point syntax follows the form::
2789
2790 name = some.module:some.attr [extra1, extra2]
2791
2792 The entry name and module name are required, but the ``:attrs`` and
2793 ``[extras]`` parts are optional
2794 """
2795 m = cls.pattern.match(src)
2796 if not m:
2797 msg = "EntryPoint must be in 'name=module:attrs [extras]' format"
2798 raise ValueError(msg, src)
2799 res = m.groupdict()
2800 extras = cls._parse_extras(res['extras'])
2801 attrs = res['attr'].split('.') if res['attr'] else ()
2802 return cls(res['name'], res['module'], attrs, extras, dist)
2803
2804 @classmethod
2805 def _parse_extras(cls, extras_spec):
2806 if not extras_spec:
2807 return ()
2808 req = Requirement.parse('x' + extras_spec)
2809 if req.specs:
2810 raise ValueError
2811 return req.extras
2812
2813 @classmethod
2814 def parse_group(
2815 cls,
2816 group: str,
2817 lines: _NestedStr,
2818 dist: Distribution | None = None,
2819 ):
2820 """Parse an entry point group"""
2821 if not MODULE(group):
2822 raise ValueError("Invalid group name", group)
2823 this: dict[str, Self] = {}
2824 for line in yield_lines(lines):
2825 ep = cls.parse(line, dist)
2826 if ep.name in this:
2827 raise ValueError("Duplicate entry point", group, ep.name)
2828 this[ep.name] = ep
2829 return this
2830
2831 @classmethod
2832 def parse_map(
2833 cls,
2834 data: str | Iterable[str] | dict[str, str | Iterable[str]],
2835 dist: Distribution | None = None,
2836 ):
2837 """Parse a map of entry point groups"""
2838 _data: Iterable[tuple[str | None, str | Iterable[str]]]
2839 if isinstance(data, dict):
2840 _data = data.items()
2841 else:
2842 _data = split_sections(data)
2843 maps: dict[str, dict[str, Self]] = {}
2844 for group, lines in _data:
2845 if group is None:
2846 if not lines:
2847 continue
2848 raise ValueError("Entry points must be listed in groups")
2849 group = group.strip()
2850 if group in maps:
2851 raise ValueError("Duplicate group name", group)
2852 maps[group] = cls.parse_group(group, lines, dist)
2853 return maps
2854
2855
2856def _version_from_file(lines):
2857 """
2858 Given an iterable of lines from a Metadata file, return
2859 the value of the Version field, if present, or None otherwise.
2860 """
2861
2862 def is_version_line(line):
2863 return line.lower().startswith('version:')
2864
2865 version_lines = filter(is_version_line, lines)
2866 line = next(iter(version_lines), '')
2867 _, _, value = line.partition(':')
2868 return safe_version(value.strip()) or None
2869
2870
2871class Distribution:
2872 """Wrap an actual or potential sys.path entry w/metadata"""
2873
2874 PKG_INFO = 'PKG-INFO'
2875
2876 def __init__(
2877 self,
2878 location: str | None = None,
2879 metadata: _MetadataType = None,
2880 project_name: str | None = None,
2881 version: str | None = None,
2882 py_version: str | None = PY_MAJOR,
2883 platform: str | None = None,
2884 precedence: int = EGG_DIST,
2885 ):
2886 self.project_name = safe_name(project_name or 'Unknown')
2887 if version is not None:
2888 self._version = safe_version(version)
2889 self.py_version = py_version
2890 self.platform = platform
2891 self.location = location
2892 self.precedence = precedence
2893 self._provider = metadata or empty_provider
2894
2895 @classmethod
2896 def from_location(
2897 cls,
2898 location: str,
2899 basename: StrPath,
2900 metadata: _MetadataType = None,
2901 **kw: int, # We could set `precedence` explicitly, but keeping this as `**kw` for full backwards and subclassing compatibility
2902 ) -> Distribution:
2903 project_name, version, py_version, platform = [None] * 4
2904 basename, ext = os.path.splitext(basename)
2905 if ext.lower() in _distributionImpl:
2906 cls = _distributionImpl[ext.lower()]
2907
2908 match = EGG_NAME(basename)
2909 if match:
2910 project_name, version, py_version, platform = match.group(
2911 'name', 'ver', 'pyver', 'plat'
2912 )
2913 return cls(
2914 location,
2915 metadata,
2916 project_name=project_name,
2917 version=version,
2918 py_version=py_version,
2919 platform=platform,
2920 **kw,
2921 )._reload_version()
2922
2923 def _reload_version(self):
2924 return self
2925
2926 @property
2927 def hashcmp(self):
2928 return (
2929 self._forgiving_parsed_version,
2930 self.precedence,
2931 self.key,
2932 self.location,
2933 self.py_version or '',
2934 self.platform or '',
2935 )
2936
2937 def __hash__(self):
2938 return hash(self.hashcmp)
2939
2940 def __lt__(self, other: Distribution):
2941 return self.hashcmp < other.hashcmp
2942
2943 def __le__(self, other: Distribution):
2944 return self.hashcmp <= other.hashcmp
2945
2946 def __gt__(self, other: Distribution):
2947 return self.hashcmp > other.hashcmp
2948
2949 def __ge__(self, other: Distribution):
2950 return self.hashcmp >= other.hashcmp
2951
2952 def __eq__(self, other: object):
2953 if not isinstance(other, self.__class__):
2954 # It's not a Distribution, so they are not equal
2955 return False
2956 return self.hashcmp == other.hashcmp
2957
2958 def __ne__(self, other: object):
2959 return not self == other
2960
2961 # These properties have to be lazy so that we don't have to load any
2962 # metadata until/unless it's actually needed. (i.e., some distributions
2963 # may not know their name or version without loading PKG-INFO)
2964
2965 @property
2966 def key(self):
2967 try:
2968 return self._key
2969 except AttributeError:
2970 self._key = key = self.project_name.lower()
2971 return key
2972
2973 @property
2974 def parsed_version(self):
2975 if not hasattr(self, "_parsed_version"):
2976 try:
2977 self._parsed_version = parse_version(self.version)
2978 except _packaging_version.InvalidVersion as ex:
2979 info = f"(package: {self.project_name})"
2980 if hasattr(ex, "add_note"):
2981 ex.add_note(info) # PEP 678
2982 raise
2983 raise _packaging_version.InvalidVersion(f"{str(ex)} {info}") from None
2984
2985 return self._parsed_version
2986
2987 @property
2988 def _forgiving_parsed_version(self):
2989 try:
2990 return self.parsed_version
2991 except _packaging_version.InvalidVersion as ex:
2992 self._parsed_version = parse_version(_forgiving_version(self.version))
2993
2994 notes = "\n".join(getattr(ex, "__notes__", [])) # PEP 678
2995 msg = f"""!!\n\n
2996 *************************************************************************
2997 {str(ex)}\n{notes}
2998
2999 This is a long overdue deprecation.
3000 For the time being, `pkg_resources` will use `{self._parsed_version}`
3001 as a replacement to avoid breaking existing environments,
3002 but no future compatibility is guaranteed.
3003
3004 If you maintain package {self.project_name} you should implement
3005 the relevant changes to adequate the project to PEP 440 immediately.
3006 *************************************************************************
3007 \n\n!!
3008 """
3009 warnings.warn(msg, DeprecationWarning)
3010
3011 return self._parsed_version
3012
3013 @property
3014 def version(self):
3015 try:
3016 return self._version
3017 except AttributeError as e:
3018 version = self._get_version()
3019 if version is None:
3020 path = self._get_metadata_path_for_display(self.PKG_INFO)
3021 msg = ("Missing 'Version:' header and/or {} file at path: {}").format(
3022 self.PKG_INFO, path
3023 )
3024 raise ValueError(msg, self) from e
3025
3026 return version
3027
3028 @property
3029 def _dep_map(self):
3030 """
3031 A map of extra to its list of (direct) requirements
3032 for this distribution, including the null extra.
3033 """
3034 try:
3035 return self.__dep_map
3036 except AttributeError:
3037 self.__dep_map = self._filter_extras(self._build_dep_map())
3038 return self.__dep_map
3039
3040 @staticmethod
3041 def _filter_extras(dm: dict[str | None, list[Requirement]]):
3042 """
3043 Given a mapping of extras to dependencies, strip off
3044 environment markers and filter out any dependencies
3045 not matching the markers.
3046 """
3047 for extra in list(filter(None, dm)):
3048 new_extra: str | None = extra
3049 reqs = dm.pop(extra)
3050 new_extra, _, marker = extra.partition(':')
3051 fails_marker = marker and (
3052 invalid_marker(marker) or not evaluate_marker(marker)
3053 )
3054 if fails_marker:
3055 reqs = []
3056 new_extra = safe_extra(new_extra) or None
3057
3058 dm.setdefault(new_extra, []).extend(reqs)
3059 return dm
3060
3061 def _build_dep_map(self):
3062 dm = {}
3063 for name in 'requires.txt', 'depends.txt':
3064 for extra, reqs in split_sections(self._get_metadata(name)):
3065 dm.setdefault(extra, []).extend(parse_requirements(reqs))
3066 return dm
3067
3068 def requires(self, extras: Iterable[str] = ()):
3069 """List of Requirements needed for this distro if `extras` are used"""
3070 dm = self._dep_map
3071 deps: list[Requirement] = []
3072 deps.extend(dm.get(None, ()))
3073 for ext in extras:
3074 try:
3075 deps.extend(dm[safe_extra(ext)])
3076 except KeyError as e:
3077 raise UnknownExtra(
3078 "%s has no such extra feature %r" % (self, ext)
3079 ) from e
3080 return deps
3081
3082 def _get_metadata_path_for_display(self, name):
3083 """
3084 Return the path to the given metadata file, if available.
3085 """
3086 try:
3087 # We need to access _get_metadata_path() on the provider object
3088 # directly rather than through this class's __getattr__()
3089 # since _get_metadata_path() is marked private.
3090 path = self._provider._get_metadata_path(name)
3091
3092 # Handle exceptions e.g. in case the distribution's metadata
3093 # provider doesn't support _get_metadata_path().
3094 except Exception:
3095 return '[could not detect]'
3096
3097 return path
3098
3099 def _get_metadata(self, name):
3100 if self.has_metadata(name):
3101 yield from self.get_metadata_lines(name)
3102
3103 def _get_version(self):
3104 lines = self._get_metadata(self.PKG_INFO)
3105 return _version_from_file(lines)
3106
3107 def activate(self, path: list[str] | None = None, replace: bool = False):
3108 """Ensure distribution is importable on `path` (default=sys.path)"""
3109 if path is None:
3110 path = sys.path
3111 self.insert_on(path, replace=replace)
3112 if path is sys.path and self.location is not None:
3113 fixup_namespace_packages(self.location)
3114 for pkg in self._get_metadata('namespace_packages.txt'):
3115 if pkg in sys.modules:
3116 declare_namespace(pkg)
3117
3118 def egg_name(self):
3119 """Return what this distribution's standard .egg filename should be"""
3120 filename = "%s-%s-py%s" % (
3121 to_filename(self.project_name),
3122 to_filename(self.version),
3123 self.py_version or PY_MAJOR,
3124 )
3125
3126 if self.platform:
3127 filename += '-' + self.platform
3128 return filename
3129
3130 def __repr__(self):
3131 if self.location:
3132 return "%s (%s)" % (self, self.location)
3133 else:
3134 return str(self)
3135
3136 def __str__(self):
3137 try:
3138 version = getattr(self, 'version', None)
3139 except ValueError:
3140 version = None
3141 version = version or "[unknown version]"
3142 return "%s %s" % (self.project_name, version)
3143
3144 def __getattr__(self, attr):
3145 """Delegate all unrecognized public attributes to .metadata provider"""
3146 if attr.startswith('_'):
3147 raise AttributeError(attr)
3148 return getattr(self._provider, attr)
3149
3150 def __dir__(self):
3151 return list(
3152 set(super().__dir__())
3153 | set(attr for attr in self._provider.__dir__() if not attr.startswith('_'))
3154 )
3155
3156 @classmethod
3157 def from_filename(
3158 cls,
3159 filename: StrPath,
3160 metadata: _MetadataType = None,
3161 **kw: int, # We could set `precedence` explicitly, but keeping this as `**kw` for full backwards and subclassing compatibility
3162 ):
3163 return cls.from_location(
3164 _normalize_cached(filename), os.path.basename(filename), metadata, **kw
3165 )
3166
3167 def as_requirement(self):
3168 """Return a ``Requirement`` that matches this distribution exactly"""
3169 if isinstance(self.parsed_version, _packaging_version.Version):
3170 spec = "%s==%s" % (self.project_name, self.parsed_version)
3171 else:
3172 spec = "%s===%s" % (self.project_name, self.parsed_version)
3173
3174 return Requirement.parse(spec)
3175
3176 def load_entry_point(self, group: str, name: str) -> _ResolvedEntryPoint:
3177 """Return the `name` entry point of `group` or raise ImportError"""
3178 ep = self.get_entry_info(group, name)
3179 if ep is None:
3180 raise ImportError("Entry point %r not found" % ((group, name),))
3181 return ep.load()
3182
3183 @overload
3184 def get_entry_map(self, group: None = None) -> dict[str, dict[str, EntryPoint]]: ...
3185 @overload
3186 def get_entry_map(self, group: str) -> dict[str, EntryPoint]: ...
3187 def get_entry_map(self, group: str | None = None):
3188 """Return the entry point map for `group`, or the full entry map"""
3189 if not hasattr(self, "_ep_map"):
3190 self._ep_map = EntryPoint.parse_map(
3191 self._get_metadata('entry_points.txt'), self
3192 )
3193 if group is not None:
3194 return self._ep_map.get(group, {})
3195 return self._ep_map
3196
3197 def get_entry_info(self, group: str, name: str):
3198 """Return the EntryPoint object for `group`+`name`, or ``None``"""
3199 return self.get_entry_map(group).get(name)
3200
3201 # FIXME: 'Distribution.insert_on' is too complex (13)
3202 def insert_on( # noqa: C901
3203 self,
3204 path: list[str],
3205 loc=None,
3206 replace: bool = False,
3207 ):
3208 """Ensure self.location is on path
3209
3210 If replace=False (default):
3211 - If location is already in path anywhere, do nothing.
3212 - Else:
3213 - If it's an egg and its parent directory is on path,
3214 insert just ahead of the parent.
3215 - Else: add to the end of path.
3216 If replace=True:
3217 - If location is already on path anywhere (not eggs)
3218 or higher priority than its parent (eggs)
3219 do nothing.
3220 - Else:
3221 - If it's an egg and its parent directory is on path,
3222 insert just ahead of the parent,
3223 removing any lower-priority entries.
3224 - Else: add it to the front of path.
3225 """
3226
3227 loc = loc or self.location
3228 if not loc:
3229 return
3230
3231 nloc = _normalize_cached(loc)
3232 bdir = os.path.dirname(nloc)
3233 npath = [(p and _normalize_cached(p) or p) for p in path]
3234
3235 for p, item in enumerate(npath):
3236 if item == nloc:
3237 if replace:
3238 break
3239 else:
3240 # don't modify path (even removing duplicates) if
3241 # found and not replace
3242 return
3243 elif item == bdir and self.precedence == EGG_DIST:
3244 # if it's an .egg, give it precedence over its directory
3245 # UNLESS it's already been added to sys.path and replace=False
3246 if (not replace) and nloc in npath[p:]:
3247 return
3248 if path is sys.path:
3249 self.check_version_conflict()
3250 path.insert(p, loc)
3251 npath.insert(p, nloc)
3252 break
3253 else:
3254 if path is sys.path:
3255 self.check_version_conflict()
3256 if replace:
3257 path.insert(0, loc)
3258 else:
3259 path.append(loc)
3260 return
3261
3262 # p is the spot where we found or inserted loc; now remove duplicates
3263 while True:
3264 try:
3265 np = npath.index(nloc, p + 1)
3266 except ValueError:
3267 break
3268 else:
3269 del npath[np], path[np]
3270 # ha!
3271 p = np
3272
3273 return
3274
3275 def check_version_conflict(self):
3276 if self.key == 'setuptools':
3277 # ignore the inevitable setuptools self-conflicts :(
3278 return
3279
3280 nsp = dict.fromkeys(self._get_metadata('namespace_packages.txt'))
3281 loc = normalize_path(self.location)
3282 for modname in self._get_metadata('top_level.txt'):
3283 if (
3284 modname not in sys.modules
3285 or modname in nsp
3286 or modname in _namespace_packages
3287 ):
3288 continue
3289 if modname in ('pkg_resources', 'setuptools', 'site'):
3290 continue
3291 fn = getattr(sys.modules[modname], '__file__', None)
3292 if fn and (
3293 normalize_path(fn).startswith(loc) or fn.startswith(self.location)
3294 ):
3295 continue
3296 issue_warning(
3297 "Module %s was already imported from %s, but %s is being added"
3298 " to sys.path" % (modname, fn, self.location),
3299 )
3300
3301 def has_version(self):
3302 try:
3303 self.version
3304 except ValueError:
3305 issue_warning("Unbuilt egg for " + repr(self))
3306 return False
3307 except SystemError:
3308 # TODO: remove this except clause when python/cpython#103632 is fixed.
3309 return False
3310 return True
3311
3312 def clone(self, **kw: str | int | IResourceProvider | None):
3313 """Copy this distribution, substituting in any changed keyword args"""
3314 names = 'project_name version py_version platform location precedence'
3315 for attr in names.split():
3316 kw.setdefault(attr, getattr(self, attr, None))
3317 kw.setdefault('metadata', self._provider)
3318 # Unsafely unpacking. But keeping **kw for backwards and subclassing compatibility
3319 return self.__class__(**kw) # type:ignore[arg-type]
3320
3321 @property
3322 def extras(self):
3323 return [dep for dep in self._dep_map if dep]
3324
3325
3326class EggInfoDistribution(Distribution):
3327 def _reload_version(self):
3328 """
3329 Packages installed by distutils (e.g. numpy or scipy),
3330 which uses an old safe_version, and so
3331 their version numbers can get mangled when
3332 converted to filenames (e.g., 1.11.0.dev0+2329eae to
3333 1.11.0.dev0_2329eae). These distributions will not be
3334 parsed properly
3335 downstream by Distribution and safe_version, so
3336 take an extra step and try to get the version number from
3337 the metadata file itself instead of the filename.
3338 """
3339 md_version = self._get_version()
3340 if md_version:
3341 self._version = md_version
3342 return self
3343
3344
3345class DistInfoDistribution(Distribution):
3346 """
3347 Wrap an actual or potential sys.path entry
3348 w/metadata, .dist-info style.
3349 """
3350
3351 PKG_INFO = 'METADATA'
3352 EQEQ = re.compile(r"([\(,])\s*(\d.*?)\s*([,\)])")
3353
3354 @property
3355 def _parsed_pkg_info(self):
3356 """Parse and cache metadata"""
3357 try:
3358 return self._pkg_info
3359 except AttributeError:
3360 metadata = self.get_metadata(self.PKG_INFO)
3361 self._pkg_info = email.parser.Parser().parsestr(metadata)
3362 return self._pkg_info
3363
3364 @property
3365 def _dep_map(self):
3366 try:
3367 return self.__dep_map
3368 except AttributeError:
3369 self.__dep_map = self._compute_dependencies()
3370 return self.__dep_map
3371
3372 def _compute_dependencies(self) -> dict[str | None, list[Requirement]]:
3373 """Recompute this distribution's dependencies."""
3374 self.__dep_map: dict[str | None, list[Requirement]] = {None: []}
3375
3376 reqs: list[Requirement] = []
3377 # Including any condition expressions
3378 for req in self._parsed_pkg_info.get_all('Requires-Dist') or []:
3379 reqs.extend(parse_requirements(req))
3380
3381 def reqs_for_extra(extra):
3382 for req in reqs:
3383 if not req.marker or req.marker.evaluate({'extra': extra}):
3384 yield req
3385
3386 common = types.MappingProxyType(dict.fromkeys(reqs_for_extra(None)))
3387 self.__dep_map[None].extend(common)
3388
3389 for extra in self._parsed_pkg_info.get_all('Provides-Extra') or []:
3390 s_extra = safe_extra(extra.strip())
3391 self.__dep_map[s_extra] = [
3392 r for r in reqs_for_extra(extra) if r not in common
3393 ]
3394
3395 return self.__dep_map
3396
3397
3398_distributionImpl = {
3399 '.egg': Distribution,
3400 '.egg-info': EggInfoDistribution,
3401 '.dist-info': DistInfoDistribution,
3402}
3403
3404
3405def issue_warning(*args, **kw):
3406 level = 1
3407 g = globals()
3408 try:
3409 # find the first stack frame that is *not* code in
3410 # the pkg_resources module, to use for the warning
3411 while sys._getframe(level).f_globals is g:
3412 level += 1
3413 except ValueError:
3414 pass
3415 warnings.warn(stacklevel=level + 1, *args, **kw)
3416
3417
3418def parse_requirements(strs: _NestedStr):
3419 """
3420 Yield ``Requirement`` objects for each specification in `strs`.
3421
3422 `strs` must be a string, or a (possibly-nested) iterable thereof.
3423 """
3424 return map(Requirement, join_continuation(map(drop_comment, yield_lines(strs))))
3425
3426
3427class RequirementParseError(_packaging_requirements.InvalidRequirement):
3428 "Compatibility wrapper for InvalidRequirement"
3429
3430
3431class Requirement(_packaging_requirements.Requirement):
3432 def __init__(self, requirement_string: str):
3433 """DO NOT CALL THIS UNDOCUMENTED METHOD; use Requirement.parse()!"""
3434 super().__init__(requirement_string)
3435 self.unsafe_name = self.name
3436 project_name = safe_name(self.name)
3437 self.project_name, self.key = project_name, project_name.lower()
3438 self.specs = [(spec.operator, spec.version) for spec in self.specifier]
3439 # packaging.requirements.Requirement uses a set for its extras. We use a variable-length tuple
3440 self.extras: tuple[str] = tuple(map(safe_extra, self.extras))
3441 self.hashCmp = (
3442 self.key,
3443 self.url,
3444 self.specifier,
3445 frozenset(self.extras),
3446 str(self.marker) if self.marker else None,
3447 )
3448 self.__hash = hash(self.hashCmp)
3449
3450 def __eq__(self, other: object):
3451 return isinstance(other, Requirement) and self.hashCmp == other.hashCmp
3452
3453 def __ne__(self, other):
3454 return not self == other
3455
3456 def __contains__(self, item: Distribution | str | tuple[str, ...]) -> bool:
3457 if isinstance(item, Distribution):
3458 if item.key != self.key:
3459 return False
3460
3461 item = item.version
3462
3463 # Allow prereleases always in order to match the previous behavior of
3464 # this method. In the future this should be smarter and follow PEP 440
3465 # more accurately.
3466 return self.specifier.contains(item, prereleases=True)
3467
3468 def __hash__(self):
3469 return self.__hash
3470
3471 def __repr__(self):
3472 return "Requirement.parse(%r)" % str(self)
3473
3474 @staticmethod
3475 def parse(s: str | Iterable[str]):
3476 (req,) = parse_requirements(s)
3477 return req
3478
3479
3480def _always_object(classes):
3481 """
3482 Ensure object appears in the mro even
3483 for old-style classes.
3484 """
3485 if object not in classes:
3486 return classes + (object,)
3487 return classes
3488
3489
3490def _find_adapter(registry: Mapping[type, _AdapterT], ob: object) -> _AdapterT:
3491 """Return an adapter factory for `ob` from `registry`"""
3492 types = _always_object(inspect.getmro(getattr(ob, '__class__', type(ob))))
3493 for t in types:
3494 if t in registry:
3495 return registry[t]
3496 # _find_adapter would previously return None, and immediately be called.
3497 # So we're raising a TypeError to keep backward compatibility if anyone depended on that behaviour.
3498 raise TypeError(f"Could not find adapter for {registry} and {ob}")
3499
3500
3501def ensure_directory(path: StrOrBytesPath):
3502 """Ensure that the parent directory of `path` exists"""
3503 dirname = os.path.dirname(path)
3504 os.makedirs(dirname, exist_ok=True)
3505
3506
3507def _bypass_ensure_directory(path):
3508 """Sandbox-bypassing version of ensure_directory()"""
3509 if not WRITE_SUPPORT:
3510 raise OSError('"os.mkdir" not supported on this platform.')
3511 dirname, filename = split(path)
3512 if dirname and filename and not isdir(dirname):
3513 _bypass_ensure_directory(dirname)
3514 try:
3515 mkdir(dirname, 0o755)
3516 except FileExistsError:
3517 pass
3518
3519
3520def split_sections(s: _NestedStr) -> Iterator[tuple[str | None, list[str]]]:
3521 """Split a string or iterable thereof into (section, content) pairs
3522
3523 Each ``section`` is a stripped version of the section header ("[section]")
3524 and each ``content`` is a list of stripped lines excluding blank lines and
3525 comment-only lines. If there are any such lines before the first section
3526 header, they're returned in a first ``section`` of ``None``.
3527 """
3528 section = None
3529 content = []
3530 for line in yield_lines(s):
3531 if line.startswith("["):
3532 if line.endswith("]"):
3533 if section or content:
3534 yield section, content
3535 section = line[1:-1].strip()
3536 content = []
3537 else:
3538 raise ValueError("Invalid section heading", line)
3539 else:
3540 content.append(line)
3541
3542 # wrap up last segment
3543 yield section, content
3544
3545
3546def _mkstemp(*args, **kw):
3547 old_open = os.open
3548 try:
3549 # temporarily bypass sandboxing
3550 os.open = os_open
3551 return tempfile.mkstemp(*args, **kw)
3552 finally:
3553 # and then put it back
3554 os.open = old_open
3555
3556
3557# Silence the PEP440Warning by default, so that end users don't get hit by it
3558# randomly just because they use pkg_resources. We want to append the rule
3559# because we want earlier uses of filterwarnings to take precedence over this
3560# one.
3561warnings.filterwarnings("ignore", category=PEP440Warning, append=True)
3562
3563
3564class PkgResourcesDeprecationWarning(Warning):
3565 """
3566 Base class for warning about deprecations in ``pkg_resources``
3567
3568 This class is not derived from ``DeprecationWarning``, and as such is
3569 visible by default.
3570 """
3571
3572
3573# Ported from ``setuptools`` to avoid introducing an import inter-dependency:
3574_LOCALE_ENCODING = "locale" if sys.version_info >= (3, 10) else None
3575
3576
3577def _read_utf8_with_fallback(file: str, fallback_encoding=_LOCALE_ENCODING) -> str:
3578 """See setuptools.unicode_utils._read_utf8_with_fallback"""
3579 try:
3580 with open(file, "r", encoding="utf-8") as f:
3581 return f.read()
3582 except UnicodeDecodeError: # pragma: no cover
3583 msg = f"""\
3584 ********************************************************************************
3585 `encoding="utf-8"` fails with {file!r}, trying `encoding={fallback_encoding!r}`.
3586
3587 This fallback behaviour is considered **deprecated** and future versions of
3588 `setuptools/pkg_resources` may not implement it.
3589
3590 Please encode {file!r} with "utf-8" to ensure future builds will succeed.
3591
3592 If this file was produced by `setuptools` itself, cleaning up the cached files
3593 and re-building/re-installing the package with a newer version of `setuptools`
3594 (e.g. by updating `build-system.requires` in its `pyproject.toml`)
3595 might solve the problem.
3596 ********************************************************************************
3597 """
3598 # TODO: Add a deadline?
3599 # See comment in setuptools.unicode_utils._Utf8EncodingNeeded
3600 warnings.warn(msg, PkgResourcesDeprecationWarning, stacklevel=2)
3601 with open(file, "r", encoding=fallback_encoding) as f:
3602 return f.read()
3603
3604
3605# from jaraco.functools 1.3
3606def _call_aside(f, *args, **kwargs):
3607 f(*args, **kwargs)
3608 return f
3609
3610
3611@_call_aside
3612def _initialize(g=globals()):
3613 "Set up global resource manager (deliberately not state-saved)"
3614 manager = ResourceManager()
3615 g['_manager'] = manager
3616 g.update(
3617 (name, getattr(manager, name))
3618 for name in dir(manager)
3619 if not name.startswith('_')
3620 )
3621
3622
3623@_call_aside
3624def _initialize_master_working_set():
3625 """
3626 Prepare the master working set and make the ``require()``
3627 API available.
3628
3629 This function has explicit effects on the global state
3630 of pkg_resources. It is intended to be invoked once at
3631 the initialization of this module.
3632
3633 Invocation by other packages is unsupported and done
3634 at their own risk.
3635 """
3636 working_set = _declare_state('object', 'working_set', WorkingSet._build_master())
3637
3638 require = working_set.require
3639 iter_entry_points = working_set.iter_entry_points
3640 add_activation_listener = working_set.subscribe
3641 run_script = working_set.run_script
3642 # backward compatibility
3643 run_main = run_script
3644 # Activate all distributions already on sys.path with replace=False and
3645 # ensure that all distributions added to the working set in the future
3646 # (e.g. by calling ``require()``) will get activated as well,
3647 # with higher priority (replace=True).
3648 tuple(dist.activate(replace=False) for dist in working_set)
3649 add_activation_listener(
3650 lambda dist: dist.activate(replace=True),
3651 existing=False,
3652 )
3653 working_set.entries = []
3654 # match order
3655 list(map(working_set.add_entry, sys.path))
3656 globals().update(locals())
3657
3658
3659if TYPE_CHECKING:
3660 # All of these are set by the @_call_aside methods above
3661 __resource_manager = ResourceManager() # Won't exist at runtime
3662 resource_exists = __resource_manager.resource_exists
3663 resource_isdir = __resource_manager.resource_isdir
3664 resource_filename = __resource_manager.resource_filename
3665 resource_stream = __resource_manager.resource_stream
3666 resource_string = __resource_manager.resource_string
3667 resource_listdir = __resource_manager.resource_listdir
3668 set_extraction_path = __resource_manager.set_extraction_path
3669 cleanup_resources = __resource_manager.cleanup_resources
3670
3671 working_set = WorkingSet()
3672 require = working_set.require
3673 iter_entry_points = working_set.iter_entry_points
3674 add_activation_listener = working_set.subscribe
3675 run_script = working_set.run_script
3676 run_main = run_script