Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pip/_internal/metadata/pkg_resources.py: 49%
169 statements
« prev ^ index » next coverage.py v7.4.3, created at 2024-02-26 06:33 +0000
« prev ^ index » next coverage.py v7.4.3, created at 2024-02-26 06:33 +0000
1import email.message
2import email.parser
3import logging
4import os
5import zipfile
6from typing import Collection, Iterable, Iterator, List, Mapping, NamedTuple, Optional
8from pip._vendor import pkg_resources
9from pip._vendor.packaging.requirements import Requirement
10from pip._vendor.packaging.utils import NormalizedName, canonicalize_name
11from pip._vendor.packaging.version import parse as parse_version
13from pip._internal.exceptions import InvalidWheel, NoneMetadataError, UnsupportedWheel
14from pip._internal.utils.egg_link import egg_link_path_from_location
15from pip._internal.utils.misc import display_path, normalize_path
16from pip._internal.utils.wheel import parse_wheel, read_wheel_metadata_file
18from .base import (
19 BaseDistribution,
20 BaseEntryPoint,
21 BaseEnvironment,
22 DistributionVersion,
23 InfoPath,
24 Wheel,
25)
27__all__ = ["NAME", "Distribution", "Environment"]
29logger = logging.getLogger(__name__)
31NAME = "pkg_resources"
34class EntryPoint(NamedTuple):
35 name: str
36 value: str
37 group: str
40class InMemoryMetadata:
41 """IMetadataProvider that reads metadata files from a dictionary.
43 This also maps metadata decoding exceptions to our internal exception type.
44 """
46 def __init__(self, metadata: Mapping[str, bytes], wheel_name: str) -> None:
47 self._metadata = metadata
48 self._wheel_name = wheel_name
50 def has_metadata(self, name: str) -> bool:
51 return name in self._metadata
53 def get_metadata(self, name: str) -> str:
54 try:
55 return self._metadata[name].decode()
56 except UnicodeDecodeError as e:
57 # Augment the default error with the origin of the file.
58 raise UnsupportedWheel(
59 f"Error decoding metadata for {self._wheel_name}: {e} in {name} file"
60 )
62 def get_metadata_lines(self, name: str) -> Iterable[str]:
63 return pkg_resources.yield_lines(self.get_metadata(name))
65 def metadata_isdir(self, name: str) -> bool:
66 return False
68 def metadata_listdir(self, name: str) -> List[str]:
69 return []
71 def run_script(self, script_name: str, namespace: str) -> None:
72 pass
75class Distribution(BaseDistribution):
76 def __init__(self, dist: pkg_resources.Distribution) -> None:
77 self._dist = dist
79 @classmethod
80 def from_directory(cls, directory: str) -> BaseDistribution:
81 dist_dir = directory.rstrip(os.sep)
83 # Build a PathMetadata object, from path to metadata. :wink:
84 base_dir, dist_dir_name = os.path.split(dist_dir)
85 metadata = pkg_resources.PathMetadata(base_dir, dist_dir)
87 # Determine the correct Distribution object type.
88 if dist_dir.endswith(".egg-info"):
89 dist_cls = pkg_resources.Distribution
90 dist_name = os.path.splitext(dist_dir_name)[0]
91 else:
92 assert dist_dir.endswith(".dist-info")
93 dist_cls = pkg_resources.DistInfoDistribution
94 dist_name = os.path.splitext(dist_dir_name)[0].split("-")[0]
96 dist = dist_cls(base_dir, project_name=dist_name, metadata=metadata)
97 return cls(dist)
99 @classmethod
100 def from_metadata_file_contents(
101 cls,
102 metadata_contents: bytes,
103 filename: str,
104 project_name: str,
105 ) -> BaseDistribution:
106 metadata_dict = {
107 "METADATA": metadata_contents,
108 }
109 dist = pkg_resources.DistInfoDistribution(
110 location=filename,
111 metadata=InMemoryMetadata(metadata_dict, filename),
112 project_name=project_name,
113 )
114 return cls(dist)
116 @classmethod
117 def from_wheel(cls, wheel: Wheel, name: str) -> BaseDistribution:
118 try:
119 with wheel.as_zipfile() as zf:
120 info_dir, _ = parse_wheel(zf, name)
121 metadata_dict = {
122 path.split("/", 1)[-1]: read_wheel_metadata_file(zf, path)
123 for path in zf.namelist()
124 if path.startswith(f"{info_dir}/")
125 }
126 except zipfile.BadZipFile as e:
127 raise InvalidWheel(wheel.location, name) from e
128 except UnsupportedWheel as e:
129 raise UnsupportedWheel(f"{name} has an invalid wheel, {e}")
130 dist = pkg_resources.DistInfoDistribution(
131 location=wheel.location,
132 metadata=InMemoryMetadata(metadata_dict, wheel.location),
133 project_name=name,
134 )
135 return cls(dist)
137 @property
138 def location(self) -> Optional[str]:
139 return self._dist.location
141 @property
142 def installed_location(self) -> Optional[str]:
143 egg_link = egg_link_path_from_location(self.raw_name)
144 if egg_link:
145 location = egg_link
146 elif self.location:
147 location = self.location
148 else:
149 return None
150 return normalize_path(location)
152 @property
153 def info_location(self) -> Optional[str]:
154 return self._dist.egg_info
156 @property
157 def installed_by_distutils(self) -> bool:
158 # A distutils-installed distribution is provided by FileMetadata. This
159 # provider has a "path" attribute not present anywhere else. Not the
160 # best introspection logic, but pip has been doing this for a long time.
161 try:
162 return bool(self._dist._provider.path)
163 except AttributeError:
164 return False
166 @property
167 def canonical_name(self) -> NormalizedName:
168 return canonicalize_name(self._dist.project_name)
170 @property
171 def version(self) -> DistributionVersion:
172 return parse_version(self._dist.version)
174 def is_file(self, path: InfoPath) -> bool:
175 return self._dist.has_metadata(str(path))
177 def iter_distutils_script_names(self) -> Iterator[str]:
178 yield from self._dist.metadata_listdir("scripts")
180 def read_text(self, path: InfoPath) -> str:
181 name = str(path)
182 if not self._dist.has_metadata(name):
183 raise FileNotFoundError(name)
184 content = self._dist.get_metadata(name)
185 if content is None:
186 raise NoneMetadataError(self, name)
187 return content
189 def iter_entry_points(self) -> Iterable[BaseEntryPoint]:
190 for group, entries in self._dist.get_entry_map().items():
191 for name, entry_point in entries.items():
192 name, _, value = str(entry_point).partition("=")
193 yield EntryPoint(name=name.strip(), value=value.strip(), group=group)
195 def _metadata_impl(self) -> email.message.Message:
196 """
197 :raises NoneMetadataError: if the distribution reports `has_metadata()`
198 True but `get_metadata()` returns None.
199 """
200 if isinstance(self._dist, pkg_resources.DistInfoDistribution):
201 metadata_name = "METADATA"
202 else:
203 metadata_name = "PKG-INFO"
204 try:
205 metadata = self.read_text(metadata_name)
206 except FileNotFoundError:
207 if self.location:
208 displaying_path = display_path(self.location)
209 else:
210 displaying_path = repr(self.location)
211 logger.warning("No metadata found in %s", displaying_path)
212 metadata = ""
213 feed_parser = email.parser.FeedParser()
214 feed_parser.feed(metadata)
215 return feed_parser.close()
217 def iter_dependencies(self, extras: Collection[str] = ()) -> Iterable[Requirement]:
218 if extras: # pkg_resources raises on invalid extras, so we sanitize.
219 extras = frozenset(pkg_resources.safe_extra(e) for e in extras)
220 extras = extras.intersection(self._dist.extras)
221 return self._dist.requires(extras)
223 def iter_provided_extras(self) -> Iterable[str]:
224 return self._dist.extras
226 def is_extra_provided(self, extra: str) -> bool:
227 return pkg_resources.safe_extra(extra) in self._dist.extras
230class Environment(BaseEnvironment):
231 def __init__(self, ws: pkg_resources.WorkingSet) -> None:
232 self._ws = ws
234 @classmethod
235 def default(cls) -> BaseEnvironment:
236 return cls(pkg_resources.working_set)
238 @classmethod
239 def from_paths(cls, paths: Optional[List[str]]) -> BaseEnvironment:
240 return cls(pkg_resources.WorkingSet(paths))
242 def _iter_distributions(self) -> Iterator[BaseDistribution]:
243 for dist in self._ws:
244 yield Distribution(dist)
246 def _search_distribution(self, name: str) -> Optional[BaseDistribution]:
247 """Find a distribution matching the ``name`` in the environment.
249 This searches from *all* distributions available in the environment, to
250 match the behavior of ``pkg_resources.get_distribution()``.
251 """
252 canonical_name = canonicalize_name(name)
253 for dist in self.iter_all_distributions():
254 if dist.canonical_name == canonical_name:
255 return dist
256 return None
258 def get_distribution(self, name: str) -> Optional[BaseDistribution]:
259 # Search the distribution by looking through the working set.
260 dist = self._search_distribution(name)
261 if dist:
262 return dist
264 # If distribution could not be found, call working_set.require to
265 # update the working set, and try to find the distribution again.
266 # This might happen for e.g. when you install a package twice, once
267 # using setup.py develop and again using setup.py install. Now when
268 # running pip uninstall twice, the package gets removed from the
269 # working set in the first uninstall, so we have to populate the
270 # working set again so that pip knows about it and the packages gets
271 # picked up and is successfully uninstalled the second time too.
272 try:
273 # We didn't pass in any version specifiers, so this can never
274 # raise pkg_resources.VersionConflict.
275 self._ws.require(name)
276 except pkg_resources.DistributionNotFound:
277 return None
278 return self._search_distribution(name)