Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pip/_internal/models/link.py: 39%
259 statements
« prev ^ index » next coverage.py v7.4.3, created at 2024-02-26 06:33 +0000
« prev ^ index » next coverage.py v7.4.3, created at 2024-02-26 06:33 +0000
1import functools
2import itertools
3import logging
4import os
5import posixpath
6import re
7import urllib.parse
8from dataclasses import dataclass
9from typing import (
10 TYPE_CHECKING,
11 Any,
12 Dict,
13 List,
14 Mapping,
15 NamedTuple,
16 Optional,
17 Tuple,
18 Union,
19)
21from pip._internal.utils.deprecation import deprecated
22from pip._internal.utils.filetypes import WHEEL_EXTENSION
23from pip._internal.utils.hashes import Hashes
24from pip._internal.utils.misc import (
25 pairwise,
26 redact_auth_from_url,
27 split_auth_from_netloc,
28 splitext,
29)
30from pip._internal.utils.models import KeyBasedCompareMixin
31from pip._internal.utils.urls import path_to_url, url_to_path
33if TYPE_CHECKING:
34 from pip._internal.index.collector import IndexContent
36logger = logging.getLogger(__name__)
39# Order matters, earlier hashes have a precedence over later hashes for what
40# we will pick to use.
41_SUPPORTED_HASHES = ("sha512", "sha384", "sha256", "sha224", "sha1", "md5")
44@dataclass(frozen=True)
45class LinkHash:
46 """Links to content may have embedded hash values. This class parses those.
48 `name` must be any member of `_SUPPORTED_HASHES`.
50 This class can be converted to and from `ArchiveInfo`. While ArchiveInfo intends to
51 be JSON-serializable to conform to PEP 610, this class contains the logic for
52 parsing a hash name and value for correctness, and then checking whether that hash
53 conforms to a schema with `.is_hash_allowed()`."""
55 name: str
56 value: str
58 _hash_url_fragment_re = re.compile(
59 # NB: we do not validate that the second group (.*) is a valid hex
60 # digest. Instead, we simply keep that string in this class, and then check it
61 # against Hashes when hash-checking is needed. This is easier to debug than
62 # proactively discarding an invalid hex digest, as we handle incorrect hashes
63 # and malformed hashes in the same place.
64 r"[#&]({choices})=([^&]*)".format(
65 choices="|".join(re.escape(hash_name) for hash_name in _SUPPORTED_HASHES)
66 ),
67 )
69 def __post_init__(self) -> None:
70 assert self.name in _SUPPORTED_HASHES
72 @classmethod
73 @functools.lru_cache(maxsize=None)
74 def find_hash_url_fragment(cls, url: str) -> Optional["LinkHash"]:
75 """Search a string for a checksum algorithm name and encoded output value."""
76 match = cls._hash_url_fragment_re.search(url)
77 if match is None:
78 return None
79 name, value = match.groups()
80 return cls(name=name, value=value)
82 def as_dict(self) -> Dict[str, str]:
83 return {self.name: self.value}
85 def as_hashes(self) -> Hashes:
86 """Return a Hashes instance which checks only for the current hash."""
87 return Hashes({self.name: [self.value]})
89 def is_hash_allowed(self, hashes: Optional[Hashes]) -> bool:
90 """
91 Return True if the current hash is allowed by `hashes`.
92 """
93 if hashes is None:
94 return False
95 return hashes.is_hash_allowed(self.name, hex_digest=self.value)
98@dataclass(frozen=True)
99class MetadataFile:
100 """Information about a core metadata file associated with a distribution."""
102 hashes: Optional[Dict[str, str]]
104 def __post_init__(self) -> None:
105 if self.hashes is not None:
106 assert all(name in _SUPPORTED_HASHES for name in self.hashes)
109def supported_hashes(hashes: Optional[Dict[str, str]]) -> Optional[Dict[str, str]]:
110 # Remove any unsupported hash types from the mapping. If this leaves no
111 # supported hashes, return None
112 if hashes is None:
113 return None
114 hashes = {n: v for n, v in hashes.items() if n in _SUPPORTED_HASHES}
115 if not hashes:
116 return None
117 return hashes
120def _clean_url_path_part(part: str) -> str:
121 """
122 Clean a "part" of a URL path (i.e. after splitting on "@" characters).
123 """
124 # We unquote prior to quoting to make sure nothing is double quoted.
125 return urllib.parse.quote(urllib.parse.unquote(part))
128def _clean_file_url_path(part: str) -> str:
129 """
130 Clean the first part of a URL path that corresponds to a local
131 filesystem path (i.e. the first part after splitting on "@" characters).
132 """
133 # We unquote prior to quoting to make sure nothing is double quoted.
134 # Also, on Windows the path part might contain a drive letter which
135 # should not be quoted. On Linux where drive letters do not
136 # exist, the colon should be quoted. We rely on urllib.request
137 # to do the right thing here.
138 return urllib.request.pathname2url(urllib.request.url2pathname(part))
141# percent-encoded: /
142_reserved_chars_re = re.compile("(@|%2F)", re.IGNORECASE)
145def _clean_url_path(path: str, is_local_path: bool) -> str:
146 """
147 Clean the path portion of a URL.
148 """
149 if is_local_path:
150 clean_func = _clean_file_url_path
151 else:
152 clean_func = _clean_url_path_part
154 # Split on the reserved characters prior to cleaning so that
155 # revision strings in VCS URLs are properly preserved.
156 parts = _reserved_chars_re.split(path)
158 cleaned_parts = []
159 for to_clean, reserved in pairwise(itertools.chain(parts, [""])):
160 cleaned_parts.append(clean_func(to_clean))
161 # Normalize %xx escapes (e.g. %2f -> %2F)
162 cleaned_parts.append(reserved.upper())
164 return "".join(cleaned_parts)
167def _ensure_quoted_url(url: str) -> str:
168 """
169 Make sure a link is fully quoted.
170 For example, if ' ' occurs in the URL, it will be replaced with "%20",
171 and without double-quoting other characters.
172 """
173 # Split the URL into parts according to the general structure
174 # `scheme://netloc/path;parameters?query#fragment`.
175 result = urllib.parse.urlparse(url)
176 # If the netloc is empty, then the URL refers to a local filesystem path.
177 is_local_path = not result.netloc
178 path = _clean_url_path(result.path, is_local_path=is_local_path)
179 return urllib.parse.urlunparse(result._replace(path=path))
182class Link(KeyBasedCompareMixin):
183 """Represents a parsed link from a Package Index's simple URL"""
185 __slots__ = [
186 "_parsed_url",
187 "_url",
188 "_hashes",
189 "comes_from",
190 "requires_python",
191 "yanked_reason",
192 "metadata_file_data",
193 "cache_link_parsing",
194 "egg_fragment",
195 ]
197 def __init__(
198 self,
199 url: str,
200 comes_from: Optional[Union[str, "IndexContent"]] = None,
201 requires_python: Optional[str] = None,
202 yanked_reason: Optional[str] = None,
203 metadata_file_data: Optional[MetadataFile] = None,
204 cache_link_parsing: bool = True,
205 hashes: Optional[Mapping[str, str]] = None,
206 ) -> None:
207 """
208 :param url: url of the resource pointed to (href of the link)
209 :param comes_from: instance of IndexContent where the link was found,
210 or string.
211 :param requires_python: String containing the `Requires-Python`
212 metadata field, specified in PEP 345. This may be specified by
213 a data-requires-python attribute in the HTML link tag, as
214 described in PEP 503.
215 :param yanked_reason: the reason the file has been yanked, if the
216 file has been yanked, or None if the file hasn't been yanked.
217 This is the value of the "data-yanked" attribute, if present, in
218 a simple repository HTML link. If the file has been yanked but
219 no reason was provided, this should be the empty string. See
220 PEP 592 for more information and the specification.
221 :param metadata_file_data: the metadata attached to the file, or None if
222 no such metadata is provided. This argument, if not None, indicates
223 that a separate metadata file exists, and also optionally supplies
224 hashes for that file.
225 :param cache_link_parsing: A flag that is used elsewhere to determine
226 whether resources retrieved from this link should be cached. PyPI
227 URLs should generally have this set to False, for example.
228 :param hashes: A mapping of hash names to digests to allow us to
229 determine the validity of a download.
230 """
232 # The comes_from, requires_python, and metadata_file_data arguments are
233 # only used by classmethods of this class, and are not used in client
234 # code directly.
236 # url can be a UNC windows share
237 if url.startswith("\\\\"):
238 url = path_to_url(url)
240 self._parsed_url = urllib.parse.urlsplit(url)
241 # Store the url as a private attribute to prevent accidentally
242 # trying to set a new value.
243 self._url = url
245 link_hash = LinkHash.find_hash_url_fragment(url)
246 hashes_from_link = {} if link_hash is None else link_hash.as_dict()
247 if hashes is None:
248 self._hashes = hashes_from_link
249 else:
250 self._hashes = {**hashes, **hashes_from_link}
252 self.comes_from = comes_from
253 self.requires_python = requires_python if requires_python else None
254 self.yanked_reason = yanked_reason
255 self.metadata_file_data = metadata_file_data
257 super().__init__(key=url, defining_class=Link)
259 self.cache_link_parsing = cache_link_parsing
260 self.egg_fragment = self._egg_fragment()
262 @classmethod
263 def from_json(
264 cls,
265 file_data: Dict[str, Any],
266 page_url: str,
267 ) -> Optional["Link"]:
268 """
269 Convert an pypi json document from a simple repository page into a Link.
270 """
271 file_url = file_data.get("url")
272 if file_url is None:
273 return None
275 url = _ensure_quoted_url(urllib.parse.urljoin(page_url, file_url))
276 pyrequire = file_data.get("requires-python")
277 yanked_reason = file_data.get("yanked")
278 hashes = file_data.get("hashes", {})
280 # PEP 714: Indexes must use the name core-metadata, but
281 # clients should support the old name as a fallback for compatibility.
282 metadata_info = file_data.get("core-metadata")
283 if metadata_info is None:
284 metadata_info = file_data.get("dist-info-metadata")
286 # The metadata info value may be a boolean, or a dict of hashes.
287 if isinstance(metadata_info, dict):
288 # The file exists, and hashes have been supplied
289 metadata_file_data = MetadataFile(supported_hashes(metadata_info))
290 elif metadata_info:
291 # The file exists, but there are no hashes
292 metadata_file_data = MetadataFile(None)
293 else:
294 # False or not present: the file does not exist
295 metadata_file_data = None
297 # The Link.yanked_reason expects an empty string instead of a boolean.
298 if yanked_reason and not isinstance(yanked_reason, str):
299 yanked_reason = ""
300 # The Link.yanked_reason expects None instead of False.
301 elif not yanked_reason:
302 yanked_reason = None
304 return cls(
305 url,
306 comes_from=page_url,
307 requires_python=pyrequire,
308 yanked_reason=yanked_reason,
309 hashes=hashes,
310 metadata_file_data=metadata_file_data,
311 )
313 @classmethod
314 def from_element(
315 cls,
316 anchor_attribs: Dict[str, Optional[str]],
317 page_url: str,
318 base_url: str,
319 ) -> Optional["Link"]:
320 """
321 Convert an anchor element's attributes in a simple repository page to a Link.
322 """
323 href = anchor_attribs.get("href")
324 if not href:
325 return None
327 url = _ensure_quoted_url(urllib.parse.urljoin(base_url, href))
328 pyrequire = anchor_attribs.get("data-requires-python")
329 yanked_reason = anchor_attribs.get("data-yanked")
331 # PEP 714: Indexes must use the name data-core-metadata, but
332 # clients should support the old name as a fallback for compatibility.
333 metadata_info = anchor_attribs.get("data-core-metadata")
334 if metadata_info is None:
335 metadata_info = anchor_attribs.get("data-dist-info-metadata")
336 # The metadata info value may be the string "true", or a string of
337 # the form "hashname=hashval"
338 if metadata_info == "true":
339 # The file exists, but there are no hashes
340 metadata_file_data = MetadataFile(None)
341 elif metadata_info is None:
342 # The file does not exist
343 metadata_file_data = None
344 else:
345 # The file exists, and hashes have been supplied
346 hashname, sep, hashval = metadata_info.partition("=")
347 if sep == "=":
348 metadata_file_data = MetadataFile(supported_hashes({hashname: hashval}))
349 else:
350 # Error - data is wrong. Treat as no hashes supplied.
351 logger.debug(
352 "Index returned invalid data-dist-info-metadata value: %s",
353 metadata_info,
354 )
355 metadata_file_data = MetadataFile(None)
357 return cls(
358 url,
359 comes_from=page_url,
360 requires_python=pyrequire,
361 yanked_reason=yanked_reason,
362 metadata_file_data=metadata_file_data,
363 )
365 def __str__(self) -> str:
366 if self.requires_python:
367 rp = f" (requires-python:{self.requires_python})"
368 else:
369 rp = ""
370 if self.comes_from:
371 return f"{redact_auth_from_url(self._url)} (from {self.comes_from}){rp}"
372 else:
373 return redact_auth_from_url(str(self._url))
375 def __repr__(self) -> str:
376 return f"<Link {self}>"
378 @property
379 def url(self) -> str:
380 return self._url
382 @property
383 def filename(self) -> str:
384 path = self.path.rstrip("/")
385 name = posixpath.basename(path)
386 if not name:
387 # Make sure we don't leak auth information if the netloc
388 # includes a username and password.
389 netloc, user_pass = split_auth_from_netloc(self.netloc)
390 return netloc
392 name = urllib.parse.unquote(name)
393 assert name, f"URL {self._url!r} produced no filename"
394 return name
396 @property
397 def file_path(self) -> str:
398 return url_to_path(self.url)
400 @property
401 def scheme(self) -> str:
402 return self._parsed_url.scheme
404 @property
405 def netloc(self) -> str:
406 """
407 This can contain auth information.
408 """
409 return self._parsed_url.netloc
411 @property
412 def path(self) -> str:
413 return urllib.parse.unquote(self._parsed_url.path)
415 def splitext(self) -> Tuple[str, str]:
416 return splitext(posixpath.basename(self.path.rstrip("/")))
418 @property
419 def ext(self) -> str:
420 return self.splitext()[1]
422 @property
423 def url_without_fragment(self) -> str:
424 scheme, netloc, path, query, fragment = self._parsed_url
425 return urllib.parse.urlunsplit((scheme, netloc, path, query, ""))
427 _egg_fragment_re = re.compile(r"[#&]egg=([^&]*)")
429 # Per PEP 508.
430 _project_name_re = re.compile(
431 r"^([A-Z0-9]|[A-Z0-9][A-Z0-9._-]*[A-Z0-9])$", re.IGNORECASE
432 )
434 def _egg_fragment(self) -> Optional[str]:
435 match = self._egg_fragment_re.search(self._url)
436 if not match:
437 return None
439 # An egg fragment looks like a PEP 508 project name, along with
440 # an optional extras specifier. Anything else is invalid.
441 project_name = match.group(1)
442 if not self._project_name_re.match(project_name):
443 deprecated(
444 reason=f"{self} contains an egg fragment with a non-PEP 508 name",
445 replacement="to use the req @ url syntax, and remove the egg fragment",
446 gone_in="25.0",
447 issue=11617,
448 )
450 return project_name
452 _subdirectory_fragment_re = re.compile(r"[#&]subdirectory=([^&]*)")
454 @property
455 def subdirectory_fragment(self) -> Optional[str]:
456 match = self._subdirectory_fragment_re.search(self._url)
457 if not match:
458 return None
459 return match.group(1)
461 def metadata_link(self) -> Optional["Link"]:
462 """Return a link to the associated core metadata file (if any)."""
463 if self.metadata_file_data is None:
464 return None
465 metadata_url = f"{self.url_without_fragment}.metadata"
466 if self.metadata_file_data.hashes is None:
467 return Link(metadata_url)
468 return Link(metadata_url, hashes=self.metadata_file_data.hashes)
470 def as_hashes(self) -> Hashes:
471 return Hashes({k: [v] for k, v in self._hashes.items()})
473 @property
474 def hash(self) -> Optional[str]:
475 return next(iter(self._hashes.values()), None)
477 @property
478 def hash_name(self) -> Optional[str]:
479 return next(iter(self._hashes), None)
481 @property
482 def show_url(self) -> str:
483 return posixpath.basename(self._url.split("#", 1)[0].split("?", 1)[0])
485 @property
486 def is_file(self) -> bool:
487 return self.scheme == "file"
489 def is_existing_dir(self) -> bool:
490 return self.is_file and os.path.isdir(self.file_path)
492 @property
493 def is_wheel(self) -> bool:
494 return self.ext == WHEEL_EXTENSION
496 @property
497 def is_vcs(self) -> bool:
498 from pip._internal.vcs import vcs
500 return self.scheme in vcs.all_schemes
502 @property
503 def is_yanked(self) -> bool:
504 return self.yanked_reason is not None
506 @property
507 def has_hash(self) -> bool:
508 return bool(self._hashes)
510 def is_hash_allowed(self, hashes: Optional[Hashes]) -> bool:
511 """
512 Return True if the link has a hash and it is allowed by `hashes`.
513 """
514 if hashes is None:
515 return False
516 return any(hashes.is_hash_allowed(k, v) for k, v in self._hashes.items())
519class _CleanResult(NamedTuple):
520 """Convert link for equivalency check.
522 This is used in the resolver to check whether two URL-specified requirements
523 likely point to the same distribution and can be considered equivalent. This
524 equivalency logic avoids comparing URLs literally, which can be too strict
525 (e.g. "a=1&b=2" vs "b=2&a=1") and produce conflicts unexpecting to users.
527 Currently this does three things:
529 1. Drop the basic auth part. This is technically wrong since a server can
530 serve different content based on auth, but if it does that, it is even
531 impossible to guarantee two URLs without auth are equivalent, since
532 the user can input different auth information when prompted. So the
533 practical solution is to assume the auth doesn't affect the response.
534 2. Parse the query to avoid the ordering issue. Note that ordering under the
535 same key in the query are NOT cleaned; i.e. "a=1&a=2" and "a=2&a=1" are
536 still considered different.
537 3. Explicitly drop most of the fragment part, except ``subdirectory=`` and
538 hash values, since it should have no impact the downloaded content. Note
539 that this drops the "egg=" part historically used to denote the requested
540 project (and extras), which is wrong in the strictest sense, but too many
541 people are supplying it inconsistently to cause superfluous resolution
542 conflicts, so we choose to also ignore them.
543 """
545 parsed: urllib.parse.SplitResult
546 query: Dict[str, List[str]]
547 subdirectory: str
548 hashes: Dict[str, str]
551def _clean_link(link: Link) -> _CleanResult:
552 parsed = link._parsed_url
553 netloc = parsed.netloc.rsplit("@", 1)[-1]
554 # According to RFC 8089, an empty host in file: means localhost.
555 if parsed.scheme == "file" and not netloc:
556 netloc = "localhost"
557 fragment = urllib.parse.parse_qs(parsed.fragment)
558 if "egg" in fragment:
559 logger.debug("Ignoring egg= fragment in %s", link)
560 try:
561 # If there are multiple subdirectory values, use the first one.
562 # This matches the behavior of Link.subdirectory_fragment.
563 subdirectory = fragment["subdirectory"][0]
564 except (IndexError, KeyError):
565 subdirectory = ""
566 # If there are multiple hash values under the same algorithm, use the
567 # first one. This matches the behavior of Link.hash_value.
568 hashes = {k: fragment[k][0] for k in _SUPPORTED_HASHES if k in fragment}
569 return _CleanResult(
570 parsed=parsed._replace(netloc=netloc, query="", fragment=""),
571 query=urllib.parse.parse_qs(parsed.query),
572 subdirectory=subdirectory,
573 hashes=hashes,
574 )
577@functools.lru_cache(maxsize=None)
578def links_equivalent(link1: Link, link2: Link) -> bool:
579 return _clean_link(link1) == _clean_link(link2)