Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pip/_internal/models/link.py: 41%
239 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:48 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:48 +0000
1import functools
2import itertools
3import logging
4import os
5import posixpath
6import re
7import urllib.parse
8from dataclasses import dataclass
9from typing import (
10 TYPE_CHECKING,
11 Any,
12 Dict,
13 List,
14 Mapping,
15 NamedTuple,
16 Optional,
17 Tuple,
18 Union,
19)
21from pip._internal.utils.deprecation import deprecated
22from pip._internal.utils.filetypes import WHEEL_EXTENSION
23from pip._internal.utils.hashes import Hashes
24from pip._internal.utils.misc import (
25 pairwise,
26 redact_auth_from_url,
27 split_auth_from_netloc,
28 splitext,
29)
30from pip._internal.utils.models import KeyBasedCompareMixin
31from pip._internal.utils.urls import path_to_url, url_to_path
33if TYPE_CHECKING:
34 from pip._internal.index.collector import IndexContent
36logger = logging.getLogger(__name__)
39# Order matters, earlier hashes have a precedence over later hashes for what
40# we will pick to use.
41_SUPPORTED_HASHES = ("sha512", "sha384", "sha256", "sha224", "sha1", "md5")
44@dataclass(frozen=True)
45class LinkHash:
46 """Links to content may have embedded hash values. This class parses those.
48 `name` must be any member of `_SUPPORTED_HASHES`.
50 This class can be converted to and from `ArchiveInfo`. While ArchiveInfo intends to
51 be JSON-serializable to conform to PEP 610, this class contains the logic for
52 parsing a hash name and value for correctness, and then checking whether that hash
53 conforms to a schema with `.is_hash_allowed()`."""
55 name: str
56 value: str
58 _hash_url_fragment_re = re.compile(
59 # NB: we do not validate that the second group (.*) is a valid hex
60 # digest. Instead, we simply keep that string in this class, and then check it
61 # against Hashes when hash-checking is needed. This is easier to debug than
62 # proactively discarding an invalid hex digest, as we handle incorrect hashes
63 # and malformed hashes in the same place.
64 r"[#&]({choices})=([^&]*)".format(
65 choices="|".join(re.escape(hash_name) for hash_name in _SUPPORTED_HASHES)
66 ),
67 )
69 def __post_init__(self) -> None:
70 assert self.name in _SUPPORTED_HASHES
72 @classmethod
73 def parse_pep658_hash(cls, dist_info_metadata: str) -> Optional["LinkHash"]:
74 """Parse a PEP 658 data-dist-info-metadata hash."""
75 if dist_info_metadata == "true":
76 return None
77 name, sep, value = dist_info_metadata.partition("=")
78 if not sep:
79 return None
80 if name not in _SUPPORTED_HASHES:
81 return None
82 return cls(name=name, value=value)
84 @classmethod
85 @functools.lru_cache(maxsize=None)
86 def find_hash_url_fragment(cls, url: str) -> Optional["LinkHash"]:
87 """Search a string for a checksum algorithm name and encoded output value."""
88 match = cls._hash_url_fragment_re.search(url)
89 if match is None:
90 return None
91 name, value = match.groups()
92 return cls(name=name, value=value)
94 def as_dict(self) -> Dict[str, str]:
95 return {self.name: self.value}
97 def as_hashes(self) -> Hashes:
98 """Return a Hashes instance which checks only for the current hash."""
99 return Hashes({self.name: [self.value]})
101 def is_hash_allowed(self, hashes: Optional[Hashes]) -> bool:
102 """
103 Return True if the current hash is allowed by `hashes`.
104 """
105 if hashes is None:
106 return False
107 return hashes.is_hash_allowed(self.name, hex_digest=self.value)
110def _clean_url_path_part(part: str) -> str:
111 """
112 Clean a "part" of a URL path (i.e. after splitting on "@" characters).
113 """
114 # We unquote prior to quoting to make sure nothing is double quoted.
115 return urllib.parse.quote(urllib.parse.unquote(part))
118def _clean_file_url_path(part: str) -> str:
119 """
120 Clean the first part of a URL path that corresponds to a local
121 filesystem path (i.e. the first part after splitting on "@" characters).
122 """
123 # We unquote prior to quoting to make sure nothing is double quoted.
124 # Also, on Windows the path part might contain a drive letter which
125 # should not be quoted. On Linux where drive letters do not
126 # exist, the colon should be quoted. We rely on urllib.request
127 # to do the right thing here.
128 return urllib.request.pathname2url(urllib.request.url2pathname(part))
131# percent-encoded: /
132_reserved_chars_re = re.compile("(@|%2F)", re.IGNORECASE)
135def _clean_url_path(path: str, is_local_path: bool) -> str:
136 """
137 Clean the path portion of a URL.
138 """
139 if is_local_path:
140 clean_func = _clean_file_url_path
141 else:
142 clean_func = _clean_url_path_part
144 # Split on the reserved characters prior to cleaning so that
145 # revision strings in VCS URLs are properly preserved.
146 parts = _reserved_chars_re.split(path)
148 cleaned_parts = []
149 for to_clean, reserved in pairwise(itertools.chain(parts, [""])):
150 cleaned_parts.append(clean_func(to_clean))
151 # Normalize %xx escapes (e.g. %2f -> %2F)
152 cleaned_parts.append(reserved.upper())
154 return "".join(cleaned_parts)
157def _ensure_quoted_url(url: str) -> str:
158 """
159 Make sure a link is fully quoted.
160 For example, if ' ' occurs in the URL, it will be replaced with "%20",
161 and without double-quoting other characters.
162 """
163 # Split the URL into parts according to the general structure
164 # `scheme://netloc/path;parameters?query#fragment`.
165 result = urllib.parse.urlparse(url)
166 # If the netloc is empty, then the URL refers to a local filesystem path.
167 is_local_path = not result.netloc
168 path = _clean_url_path(result.path, is_local_path=is_local_path)
169 return urllib.parse.urlunparse(result._replace(path=path))
172class Link(KeyBasedCompareMixin):
173 """Represents a parsed link from a Package Index's simple URL"""
175 __slots__ = [
176 "_parsed_url",
177 "_url",
178 "_hashes",
179 "comes_from",
180 "requires_python",
181 "yanked_reason",
182 "dist_info_metadata",
183 "cache_link_parsing",
184 "egg_fragment",
185 ]
187 def __init__(
188 self,
189 url: str,
190 comes_from: Optional[Union[str, "IndexContent"]] = None,
191 requires_python: Optional[str] = None,
192 yanked_reason: Optional[str] = None,
193 dist_info_metadata: Optional[str] = None,
194 cache_link_parsing: bool = True,
195 hashes: Optional[Mapping[str, str]] = None,
196 ) -> None:
197 """
198 :param url: url of the resource pointed to (href of the link)
199 :param comes_from: instance of IndexContent where the link was found,
200 or string.
201 :param requires_python: String containing the `Requires-Python`
202 metadata field, specified in PEP 345. This may be specified by
203 a data-requires-python attribute in the HTML link tag, as
204 described in PEP 503.
205 :param yanked_reason: the reason the file has been yanked, if the
206 file has been yanked, or None if the file hasn't been yanked.
207 This is the value of the "data-yanked" attribute, if present, in
208 a simple repository HTML link. If the file has been yanked but
209 no reason was provided, this should be the empty string. See
210 PEP 592 for more information and the specification.
211 :param dist_info_metadata: the metadata attached to the file, or None if no such
212 metadata is provided. This is the value of the "data-dist-info-metadata"
213 attribute, if present, in a simple repository HTML link. This may be parsed
214 into its own `Link` by `self.metadata_link()`. See PEP 658 for more
215 information and the specification.
216 :param cache_link_parsing: A flag that is used elsewhere to determine
217 whether resources retrieved from this link should be cached. PyPI
218 URLs should generally have this set to False, for example.
219 :param hashes: A mapping of hash names to digests to allow us to
220 determine the validity of a download.
221 """
223 # url can be a UNC windows share
224 if url.startswith("\\\\"):
225 url = path_to_url(url)
227 self._parsed_url = urllib.parse.urlsplit(url)
228 # Store the url as a private attribute to prevent accidentally
229 # trying to set a new value.
230 self._url = url
232 link_hash = LinkHash.find_hash_url_fragment(url)
233 hashes_from_link = {} if link_hash is None else link_hash.as_dict()
234 if hashes is None:
235 self._hashes = hashes_from_link
236 else:
237 self._hashes = {**hashes, **hashes_from_link}
239 self.comes_from = comes_from
240 self.requires_python = requires_python if requires_python else None
241 self.yanked_reason = yanked_reason
242 self.dist_info_metadata = dist_info_metadata
244 super().__init__(key=url, defining_class=Link)
246 self.cache_link_parsing = cache_link_parsing
247 self.egg_fragment = self._egg_fragment()
249 @classmethod
250 def from_json(
251 cls,
252 file_data: Dict[str, Any],
253 page_url: str,
254 ) -> Optional["Link"]:
255 """
256 Convert an pypi json document from a simple repository page into a Link.
257 """
258 file_url = file_data.get("url")
259 if file_url is None:
260 return None
262 url = _ensure_quoted_url(urllib.parse.urljoin(page_url, file_url))
263 pyrequire = file_data.get("requires-python")
264 yanked_reason = file_data.get("yanked")
265 dist_info_metadata = file_data.get("dist-info-metadata")
266 hashes = file_data.get("hashes", {})
268 # The Link.yanked_reason expects an empty string instead of a boolean.
269 if yanked_reason and not isinstance(yanked_reason, str):
270 yanked_reason = ""
271 # The Link.yanked_reason expects None instead of False.
272 elif not yanked_reason:
273 yanked_reason = None
275 return cls(
276 url,
277 comes_from=page_url,
278 requires_python=pyrequire,
279 yanked_reason=yanked_reason,
280 hashes=hashes,
281 dist_info_metadata=dist_info_metadata,
282 )
284 @classmethod
285 def from_element(
286 cls,
287 anchor_attribs: Dict[str, Optional[str]],
288 page_url: str,
289 base_url: str,
290 ) -> Optional["Link"]:
291 """
292 Convert an anchor element's attributes in a simple repository page to a Link.
293 """
294 href = anchor_attribs.get("href")
295 if not href:
296 return None
298 url = _ensure_quoted_url(urllib.parse.urljoin(base_url, href))
299 pyrequire = anchor_attribs.get("data-requires-python")
300 yanked_reason = anchor_attribs.get("data-yanked")
301 dist_info_metadata = anchor_attribs.get("data-dist-info-metadata")
303 return cls(
304 url,
305 comes_from=page_url,
306 requires_python=pyrequire,
307 yanked_reason=yanked_reason,
308 dist_info_metadata=dist_info_metadata,
309 )
311 def __str__(self) -> str:
312 if self.requires_python:
313 rp = f" (requires-python:{self.requires_python})"
314 else:
315 rp = ""
316 if self.comes_from:
317 return "{} (from {}){}".format(
318 redact_auth_from_url(self._url), self.comes_from, rp
319 )
320 else:
321 return redact_auth_from_url(str(self._url))
323 def __repr__(self) -> str:
324 return f"<Link {self}>"
326 @property
327 def url(self) -> str:
328 return self._url
330 @property
331 def filename(self) -> str:
332 path = self.path.rstrip("/")
333 name = posixpath.basename(path)
334 if not name:
335 # Make sure we don't leak auth information if the netloc
336 # includes a username and password.
337 netloc, user_pass = split_auth_from_netloc(self.netloc)
338 return netloc
340 name = urllib.parse.unquote(name)
341 assert name, f"URL {self._url!r} produced no filename"
342 return name
344 @property
345 def file_path(self) -> str:
346 return url_to_path(self.url)
348 @property
349 def scheme(self) -> str:
350 return self._parsed_url.scheme
352 @property
353 def netloc(self) -> str:
354 """
355 This can contain auth information.
356 """
357 return self._parsed_url.netloc
359 @property
360 def path(self) -> str:
361 return urllib.parse.unquote(self._parsed_url.path)
363 def splitext(self) -> Tuple[str, str]:
364 return splitext(posixpath.basename(self.path.rstrip("/")))
366 @property
367 def ext(self) -> str:
368 return self.splitext()[1]
370 @property
371 def url_without_fragment(self) -> str:
372 scheme, netloc, path, query, fragment = self._parsed_url
373 return urllib.parse.urlunsplit((scheme, netloc, path, query, ""))
375 _egg_fragment_re = re.compile(r"[#&]egg=([^&]*)")
377 # Per PEP 508.
378 _project_name_re = re.compile(
379 r"^([A-Z0-9]|[A-Z0-9][A-Z0-9._-]*[A-Z0-9])$", re.IGNORECASE
380 )
382 def _egg_fragment(self) -> Optional[str]:
383 match = self._egg_fragment_re.search(self._url)
384 if not match:
385 return None
387 # An egg fragment looks like a PEP 508 project name, along with
388 # an optional extras specifier. Anything else is invalid.
389 project_name = match.group(1)
390 if not self._project_name_re.match(project_name):
391 deprecated(
392 reason=f"{self} contains an egg fragment with a non-PEP 508 name",
393 replacement="to use the req @ url syntax, and remove the egg fragment",
394 gone_in="25.0",
395 issue=11617,
396 )
398 return project_name
400 _subdirectory_fragment_re = re.compile(r"[#&]subdirectory=([^&]*)")
402 @property
403 def subdirectory_fragment(self) -> Optional[str]:
404 match = self._subdirectory_fragment_re.search(self._url)
405 if not match:
406 return None
407 return match.group(1)
409 def metadata_link(self) -> Optional["Link"]:
410 """Implementation of PEP 658 parsing."""
411 # Note that Link.from_element() parsing the "data-dist-info-metadata" attribute
412 # from an HTML anchor tag is typically how the Link.dist_info_metadata attribute
413 # gets set.
414 if self.dist_info_metadata is None:
415 return None
416 metadata_url = f"{self.url_without_fragment}.metadata"
417 metadata_link_hash = LinkHash.parse_pep658_hash(self.dist_info_metadata)
418 if metadata_link_hash is None:
419 return Link(metadata_url)
420 return Link(metadata_url, hashes=metadata_link_hash.as_dict())
422 def as_hashes(self) -> Hashes:
423 return Hashes({k: [v] for k, v in self._hashes.items()})
425 @property
426 def hash(self) -> Optional[str]:
427 return next(iter(self._hashes.values()), None)
429 @property
430 def hash_name(self) -> Optional[str]:
431 return next(iter(self._hashes), None)
433 @property
434 def show_url(self) -> str:
435 return posixpath.basename(self._url.split("#", 1)[0].split("?", 1)[0])
437 @property
438 def is_file(self) -> bool:
439 return self.scheme == "file"
441 def is_existing_dir(self) -> bool:
442 return self.is_file and os.path.isdir(self.file_path)
444 @property
445 def is_wheel(self) -> bool:
446 return self.ext == WHEEL_EXTENSION
448 @property
449 def is_vcs(self) -> bool:
450 from pip._internal.vcs import vcs
452 return self.scheme in vcs.all_schemes
454 @property
455 def is_yanked(self) -> bool:
456 return self.yanked_reason is not None
458 @property
459 def has_hash(self) -> bool:
460 return bool(self._hashes)
462 def is_hash_allowed(self, hashes: Optional[Hashes]) -> bool:
463 """
464 Return True if the link has a hash and it is allowed by `hashes`.
465 """
466 if hashes is None:
467 return False
468 return any(hashes.is_hash_allowed(k, v) for k, v in self._hashes.items())
471class _CleanResult(NamedTuple):
472 """Convert link for equivalency check.
474 This is used in the resolver to check whether two URL-specified requirements
475 likely point to the same distribution and can be considered equivalent. This
476 equivalency logic avoids comparing URLs literally, which can be too strict
477 (e.g. "a=1&b=2" vs "b=2&a=1") and produce conflicts unexpecting to users.
479 Currently this does three things:
481 1. Drop the basic auth part. This is technically wrong since a server can
482 serve different content based on auth, but if it does that, it is even
483 impossible to guarantee two URLs without auth are equivalent, since
484 the user can input different auth information when prompted. So the
485 practical solution is to assume the auth doesn't affect the response.
486 2. Parse the query to avoid the ordering issue. Note that ordering under the
487 same key in the query are NOT cleaned; i.e. "a=1&a=2" and "a=2&a=1" are
488 still considered different.
489 3. Explicitly drop most of the fragment part, except ``subdirectory=`` and
490 hash values, since it should have no impact the downloaded content. Note
491 that this drops the "egg=" part historically used to denote the requested
492 project (and extras), which is wrong in the strictest sense, but too many
493 people are supplying it inconsistently to cause superfluous resolution
494 conflicts, so we choose to also ignore them.
495 """
497 parsed: urllib.parse.SplitResult
498 query: Dict[str, List[str]]
499 subdirectory: str
500 hashes: Dict[str, str]
503def _clean_link(link: Link) -> _CleanResult:
504 parsed = link._parsed_url
505 netloc = parsed.netloc.rsplit("@", 1)[-1]
506 # According to RFC 8089, an empty host in file: means localhost.
507 if parsed.scheme == "file" and not netloc:
508 netloc = "localhost"
509 fragment = urllib.parse.parse_qs(parsed.fragment)
510 if "egg" in fragment:
511 logger.debug("Ignoring egg= fragment in %s", link)
512 try:
513 # If there are multiple subdirectory values, use the first one.
514 # This matches the behavior of Link.subdirectory_fragment.
515 subdirectory = fragment["subdirectory"][0]
516 except (IndexError, KeyError):
517 subdirectory = ""
518 # If there are multiple hash values under the same algorithm, use the
519 # first one. This matches the behavior of Link.hash_value.
520 hashes = {k: fragment[k][0] for k in _SUPPORTED_HASHES if k in fragment}
521 return _CleanResult(
522 parsed=parsed._replace(netloc=netloc, query="", fragment=""),
523 query=urllib.parse.parse_qs(parsed.query),
524 subdirectory=subdirectory,
525 hashes=hashes,
526 )
529@functools.lru_cache(maxsize=None)
530def links_equivalent(link1: Link, link2: Link) -> bool:
531 return _clean_link(link1) == _clean_link(link2)