Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pip/_internal/models/link.py: 39%

259 statements  

« prev     ^ index     » next       coverage.py v7.4.3, created at 2024-02-26 06:33 +0000

1import functools 

2import itertools 

3import logging 

4import os 

5import posixpath 

6import re 

7import urllib.parse 

8from dataclasses import dataclass 

9from typing import ( 

10 TYPE_CHECKING, 

11 Any, 

12 Dict, 

13 List, 

14 Mapping, 

15 NamedTuple, 

16 Optional, 

17 Tuple, 

18 Union, 

19) 

20 

21from pip._internal.utils.deprecation import deprecated 

22from pip._internal.utils.filetypes import WHEEL_EXTENSION 

23from pip._internal.utils.hashes import Hashes 

24from pip._internal.utils.misc import ( 

25 pairwise, 

26 redact_auth_from_url, 

27 split_auth_from_netloc, 

28 splitext, 

29) 

30from pip._internal.utils.models import KeyBasedCompareMixin 

31from pip._internal.utils.urls import path_to_url, url_to_path 

32 

33if TYPE_CHECKING: 

34 from pip._internal.index.collector import IndexContent 

35 

36logger = logging.getLogger(__name__) 

37 

38 

39# Order matters, earlier hashes have a precedence over later hashes for what 

40# we will pick to use. 

41_SUPPORTED_HASHES = ("sha512", "sha384", "sha256", "sha224", "sha1", "md5") 

42 

43 

44@dataclass(frozen=True) 

45class LinkHash: 

46 """Links to content may have embedded hash values. This class parses those. 

47 

48 `name` must be any member of `_SUPPORTED_HASHES`. 

49 

50 This class can be converted to and from `ArchiveInfo`. While ArchiveInfo intends to 

51 be JSON-serializable to conform to PEP 610, this class contains the logic for 

52 parsing a hash name and value for correctness, and then checking whether that hash 

53 conforms to a schema with `.is_hash_allowed()`.""" 

54 

55 name: str 

56 value: str 

57 

58 _hash_url_fragment_re = re.compile( 

59 # NB: we do not validate that the second group (.*) is a valid hex 

60 # digest. Instead, we simply keep that string in this class, and then check it 

61 # against Hashes when hash-checking is needed. This is easier to debug than 

62 # proactively discarding an invalid hex digest, as we handle incorrect hashes 

63 # and malformed hashes in the same place. 

64 r"[#&]({choices})=([^&]*)".format( 

65 choices="|".join(re.escape(hash_name) for hash_name in _SUPPORTED_HASHES) 

66 ), 

67 ) 

68 

69 def __post_init__(self) -> None: 

70 assert self.name in _SUPPORTED_HASHES 

71 

72 @classmethod 

73 @functools.lru_cache(maxsize=None) 

74 def find_hash_url_fragment(cls, url: str) -> Optional["LinkHash"]: 

75 """Search a string for a checksum algorithm name and encoded output value.""" 

76 match = cls._hash_url_fragment_re.search(url) 

77 if match is None: 

78 return None 

79 name, value = match.groups() 

80 return cls(name=name, value=value) 

81 

82 def as_dict(self) -> Dict[str, str]: 

83 return {self.name: self.value} 

84 

85 def as_hashes(self) -> Hashes: 

86 """Return a Hashes instance which checks only for the current hash.""" 

87 return Hashes({self.name: [self.value]}) 

88 

89 def is_hash_allowed(self, hashes: Optional[Hashes]) -> bool: 

90 """ 

91 Return True if the current hash is allowed by `hashes`. 

92 """ 

93 if hashes is None: 

94 return False 

95 return hashes.is_hash_allowed(self.name, hex_digest=self.value) 

96 

97 

98@dataclass(frozen=True) 

99class MetadataFile: 

100 """Information about a core metadata file associated with a distribution.""" 

101 

102 hashes: Optional[Dict[str, str]] 

103 

104 def __post_init__(self) -> None: 

105 if self.hashes is not None: 

106 assert all(name in _SUPPORTED_HASHES for name in self.hashes) 

107 

108 

109def supported_hashes(hashes: Optional[Dict[str, str]]) -> Optional[Dict[str, str]]: 

110 # Remove any unsupported hash types from the mapping. If this leaves no 

111 # supported hashes, return None 

112 if hashes is None: 

113 return None 

114 hashes = {n: v for n, v in hashes.items() if n in _SUPPORTED_HASHES} 

115 if not hashes: 

116 return None 

117 return hashes 

118 

119 

120def _clean_url_path_part(part: str) -> str: 

121 """ 

122 Clean a "part" of a URL path (i.e. after splitting on "@" characters). 

123 """ 

124 # We unquote prior to quoting to make sure nothing is double quoted. 

125 return urllib.parse.quote(urllib.parse.unquote(part)) 

126 

127 

128def _clean_file_url_path(part: str) -> str: 

129 """ 

130 Clean the first part of a URL path that corresponds to a local 

131 filesystem path (i.e. the first part after splitting on "@" characters). 

132 """ 

133 # We unquote prior to quoting to make sure nothing is double quoted. 

134 # Also, on Windows the path part might contain a drive letter which 

135 # should not be quoted. On Linux where drive letters do not 

136 # exist, the colon should be quoted. We rely on urllib.request 

137 # to do the right thing here. 

138 return urllib.request.pathname2url(urllib.request.url2pathname(part)) 

139 

140 

141# percent-encoded: / 

142_reserved_chars_re = re.compile("(@|%2F)", re.IGNORECASE) 

143 

144 

145def _clean_url_path(path: str, is_local_path: bool) -> str: 

146 """ 

147 Clean the path portion of a URL. 

148 """ 

149 if is_local_path: 

150 clean_func = _clean_file_url_path 

151 else: 

152 clean_func = _clean_url_path_part 

153 

154 # Split on the reserved characters prior to cleaning so that 

155 # revision strings in VCS URLs are properly preserved. 

156 parts = _reserved_chars_re.split(path) 

157 

158 cleaned_parts = [] 

159 for to_clean, reserved in pairwise(itertools.chain(parts, [""])): 

160 cleaned_parts.append(clean_func(to_clean)) 

161 # Normalize %xx escapes (e.g. %2f -> %2F) 

162 cleaned_parts.append(reserved.upper()) 

163 

164 return "".join(cleaned_parts) 

165 

166 

167def _ensure_quoted_url(url: str) -> str: 

168 """ 

169 Make sure a link is fully quoted. 

170 For example, if ' ' occurs in the URL, it will be replaced with "%20", 

171 and without double-quoting other characters. 

172 """ 

173 # Split the URL into parts according to the general structure 

174 # `scheme://netloc/path;parameters?query#fragment`. 

175 result = urllib.parse.urlparse(url) 

176 # If the netloc is empty, then the URL refers to a local filesystem path. 

177 is_local_path = not result.netloc 

178 path = _clean_url_path(result.path, is_local_path=is_local_path) 

179 return urllib.parse.urlunparse(result._replace(path=path)) 

180 

181 

182class Link(KeyBasedCompareMixin): 

183 """Represents a parsed link from a Package Index's simple URL""" 

184 

185 __slots__ = [ 

186 "_parsed_url", 

187 "_url", 

188 "_hashes", 

189 "comes_from", 

190 "requires_python", 

191 "yanked_reason", 

192 "metadata_file_data", 

193 "cache_link_parsing", 

194 "egg_fragment", 

195 ] 

196 

197 def __init__( 

198 self, 

199 url: str, 

200 comes_from: Optional[Union[str, "IndexContent"]] = None, 

201 requires_python: Optional[str] = None, 

202 yanked_reason: Optional[str] = None, 

203 metadata_file_data: Optional[MetadataFile] = None, 

204 cache_link_parsing: bool = True, 

205 hashes: Optional[Mapping[str, str]] = None, 

206 ) -> None: 

207 """ 

208 :param url: url of the resource pointed to (href of the link) 

209 :param comes_from: instance of IndexContent where the link was found, 

210 or string. 

211 :param requires_python: String containing the `Requires-Python` 

212 metadata field, specified in PEP 345. This may be specified by 

213 a data-requires-python attribute in the HTML link tag, as 

214 described in PEP 503. 

215 :param yanked_reason: the reason the file has been yanked, if the 

216 file has been yanked, or None if the file hasn't been yanked. 

217 This is the value of the "data-yanked" attribute, if present, in 

218 a simple repository HTML link. If the file has been yanked but 

219 no reason was provided, this should be the empty string. See 

220 PEP 592 for more information and the specification. 

221 :param metadata_file_data: the metadata attached to the file, or None if 

222 no such metadata is provided. This argument, if not None, indicates 

223 that a separate metadata file exists, and also optionally supplies 

224 hashes for that file. 

225 :param cache_link_parsing: A flag that is used elsewhere to determine 

226 whether resources retrieved from this link should be cached. PyPI 

227 URLs should generally have this set to False, for example. 

228 :param hashes: A mapping of hash names to digests to allow us to 

229 determine the validity of a download. 

230 """ 

231 

232 # The comes_from, requires_python, and metadata_file_data arguments are 

233 # only used by classmethods of this class, and are not used in client 

234 # code directly. 

235 

236 # url can be a UNC windows share 

237 if url.startswith("\\\\"): 

238 url = path_to_url(url) 

239 

240 self._parsed_url = urllib.parse.urlsplit(url) 

241 # Store the url as a private attribute to prevent accidentally 

242 # trying to set a new value. 

243 self._url = url 

244 

245 link_hash = LinkHash.find_hash_url_fragment(url) 

246 hashes_from_link = {} if link_hash is None else link_hash.as_dict() 

247 if hashes is None: 

248 self._hashes = hashes_from_link 

249 else: 

250 self._hashes = {**hashes, **hashes_from_link} 

251 

252 self.comes_from = comes_from 

253 self.requires_python = requires_python if requires_python else None 

254 self.yanked_reason = yanked_reason 

255 self.metadata_file_data = metadata_file_data 

256 

257 super().__init__(key=url, defining_class=Link) 

258 

259 self.cache_link_parsing = cache_link_parsing 

260 self.egg_fragment = self._egg_fragment() 

261 

262 @classmethod 

263 def from_json( 

264 cls, 

265 file_data: Dict[str, Any], 

266 page_url: str, 

267 ) -> Optional["Link"]: 

268 """ 

269 Convert an pypi json document from a simple repository page into a Link. 

270 """ 

271 file_url = file_data.get("url") 

272 if file_url is None: 

273 return None 

274 

275 url = _ensure_quoted_url(urllib.parse.urljoin(page_url, file_url)) 

276 pyrequire = file_data.get("requires-python") 

277 yanked_reason = file_data.get("yanked") 

278 hashes = file_data.get("hashes", {}) 

279 

280 # PEP 714: Indexes must use the name core-metadata, but 

281 # clients should support the old name as a fallback for compatibility. 

282 metadata_info = file_data.get("core-metadata") 

283 if metadata_info is None: 

284 metadata_info = file_data.get("dist-info-metadata") 

285 

286 # The metadata info value may be a boolean, or a dict of hashes. 

287 if isinstance(metadata_info, dict): 

288 # The file exists, and hashes have been supplied 

289 metadata_file_data = MetadataFile(supported_hashes(metadata_info)) 

290 elif metadata_info: 

291 # The file exists, but there are no hashes 

292 metadata_file_data = MetadataFile(None) 

293 else: 

294 # False or not present: the file does not exist 

295 metadata_file_data = None 

296 

297 # The Link.yanked_reason expects an empty string instead of a boolean. 

298 if yanked_reason and not isinstance(yanked_reason, str): 

299 yanked_reason = "" 

300 # The Link.yanked_reason expects None instead of False. 

301 elif not yanked_reason: 

302 yanked_reason = None 

303 

304 return cls( 

305 url, 

306 comes_from=page_url, 

307 requires_python=pyrequire, 

308 yanked_reason=yanked_reason, 

309 hashes=hashes, 

310 metadata_file_data=metadata_file_data, 

311 ) 

312 

313 @classmethod 

314 def from_element( 

315 cls, 

316 anchor_attribs: Dict[str, Optional[str]], 

317 page_url: str, 

318 base_url: str, 

319 ) -> Optional["Link"]: 

320 """ 

321 Convert an anchor element's attributes in a simple repository page to a Link. 

322 """ 

323 href = anchor_attribs.get("href") 

324 if not href: 

325 return None 

326 

327 url = _ensure_quoted_url(urllib.parse.urljoin(base_url, href)) 

328 pyrequire = anchor_attribs.get("data-requires-python") 

329 yanked_reason = anchor_attribs.get("data-yanked") 

330 

331 # PEP 714: Indexes must use the name data-core-metadata, but 

332 # clients should support the old name as a fallback for compatibility. 

333 metadata_info = anchor_attribs.get("data-core-metadata") 

334 if metadata_info is None: 

335 metadata_info = anchor_attribs.get("data-dist-info-metadata") 

336 # The metadata info value may be the string "true", or a string of 

337 # the form "hashname=hashval" 

338 if metadata_info == "true": 

339 # The file exists, but there are no hashes 

340 metadata_file_data = MetadataFile(None) 

341 elif metadata_info is None: 

342 # The file does not exist 

343 metadata_file_data = None 

344 else: 

345 # The file exists, and hashes have been supplied 

346 hashname, sep, hashval = metadata_info.partition("=") 

347 if sep == "=": 

348 metadata_file_data = MetadataFile(supported_hashes({hashname: hashval})) 

349 else: 

350 # Error - data is wrong. Treat as no hashes supplied. 

351 logger.debug( 

352 "Index returned invalid data-dist-info-metadata value: %s", 

353 metadata_info, 

354 ) 

355 metadata_file_data = MetadataFile(None) 

356 

357 return cls( 

358 url, 

359 comes_from=page_url, 

360 requires_python=pyrequire, 

361 yanked_reason=yanked_reason, 

362 metadata_file_data=metadata_file_data, 

363 ) 

364 

365 def __str__(self) -> str: 

366 if self.requires_python: 

367 rp = f" (requires-python:{self.requires_python})" 

368 else: 

369 rp = "" 

370 if self.comes_from: 

371 return f"{redact_auth_from_url(self._url)} (from {self.comes_from}){rp}" 

372 else: 

373 return redact_auth_from_url(str(self._url)) 

374 

375 def __repr__(self) -> str: 

376 return f"<Link {self}>" 

377 

378 @property 

379 def url(self) -> str: 

380 return self._url 

381 

382 @property 

383 def filename(self) -> str: 

384 path = self.path.rstrip("/") 

385 name = posixpath.basename(path) 

386 if not name: 

387 # Make sure we don't leak auth information if the netloc 

388 # includes a username and password. 

389 netloc, user_pass = split_auth_from_netloc(self.netloc) 

390 return netloc 

391 

392 name = urllib.parse.unquote(name) 

393 assert name, f"URL {self._url!r} produced no filename" 

394 return name 

395 

396 @property 

397 def file_path(self) -> str: 

398 return url_to_path(self.url) 

399 

400 @property 

401 def scheme(self) -> str: 

402 return self._parsed_url.scheme 

403 

404 @property 

405 def netloc(self) -> str: 

406 """ 

407 This can contain auth information. 

408 """ 

409 return self._parsed_url.netloc 

410 

411 @property 

412 def path(self) -> str: 

413 return urllib.parse.unquote(self._parsed_url.path) 

414 

415 def splitext(self) -> Tuple[str, str]: 

416 return splitext(posixpath.basename(self.path.rstrip("/"))) 

417 

418 @property 

419 def ext(self) -> str: 

420 return self.splitext()[1] 

421 

422 @property 

423 def url_without_fragment(self) -> str: 

424 scheme, netloc, path, query, fragment = self._parsed_url 

425 return urllib.parse.urlunsplit((scheme, netloc, path, query, "")) 

426 

427 _egg_fragment_re = re.compile(r"[#&]egg=([^&]*)") 

428 

429 # Per PEP 508. 

430 _project_name_re = re.compile( 

431 r"^([A-Z0-9]|[A-Z0-9][A-Z0-9._-]*[A-Z0-9])$", re.IGNORECASE 

432 ) 

433 

434 def _egg_fragment(self) -> Optional[str]: 

435 match = self._egg_fragment_re.search(self._url) 

436 if not match: 

437 return None 

438 

439 # An egg fragment looks like a PEP 508 project name, along with 

440 # an optional extras specifier. Anything else is invalid. 

441 project_name = match.group(1) 

442 if not self._project_name_re.match(project_name): 

443 deprecated( 

444 reason=f"{self} contains an egg fragment with a non-PEP 508 name", 

445 replacement="to use the req @ url syntax, and remove the egg fragment", 

446 gone_in="25.0", 

447 issue=11617, 

448 ) 

449 

450 return project_name 

451 

452 _subdirectory_fragment_re = re.compile(r"[#&]subdirectory=([^&]*)") 

453 

454 @property 

455 def subdirectory_fragment(self) -> Optional[str]: 

456 match = self._subdirectory_fragment_re.search(self._url) 

457 if not match: 

458 return None 

459 return match.group(1) 

460 

461 def metadata_link(self) -> Optional["Link"]: 

462 """Return a link to the associated core metadata file (if any).""" 

463 if self.metadata_file_data is None: 

464 return None 

465 metadata_url = f"{self.url_without_fragment}.metadata" 

466 if self.metadata_file_data.hashes is None: 

467 return Link(metadata_url) 

468 return Link(metadata_url, hashes=self.metadata_file_data.hashes) 

469 

470 def as_hashes(self) -> Hashes: 

471 return Hashes({k: [v] for k, v in self._hashes.items()}) 

472 

473 @property 

474 def hash(self) -> Optional[str]: 

475 return next(iter(self._hashes.values()), None) 

476 

477 @property 

478 def hash_name(self) -> Optional[str]: 

479 return next(iter(self._hashes), None) 

480 

481 @property 

482 def show_url(self) -> str: 

483 return posixpath.basename(self._url.split("#", 1)[0].split("?", 1)[0]) 

484 

485 @property 

486 def is_file(self) -> bool: 

487 return self.scheme == "file" 

488 

489 def is_existing_dir(self) -> bool: 

490 return self.is_file and os.path.isdir(self.file_path) 

491 

492 @property 

493 def is_wheel(self) -> bool: 

494 return self.ext == WHEEL_EXTENSION 

495 

496 @property 

497 def is_vcs(self) -> bool: 

498 from pip._internal.vcs import vcs 

499 

500 return self.scheme in vcs.all_schemes 

501 

502 @property 

503 def is_yanked(self) -> bool: 

504 return self.yanked_reason is not None 

505 

506 @property 

507 def has_hash(self) -> bool: 

508 return bool(self._hashes) 

509 

510 def is_hash_allowed(self, hashes: Optional[Hashes]) -> bool: 

511 """ 

512 Return True if the link has a hash and it is allowed by `hashes`. 

513 """ 

514 if hashes is None: 

515 return False 

516 return any(hashes.is_hash_allowed(k, v) for k, v in self._hashes.items()) 

517 

518 

519class _CleanResult(NamedTuple): 

520 """Convert link for equivalency check. 

521 

522 This is used in the resolver to check whether two URL-specified requirements 

523 likely point to the same distribution and can be considered equivalent. This 

524 equivalency logic avoids comparing URLs literally, which can be too strict 

525 (e.g. "a=1&b=2" vs "b=2&a=1") and produce conflicts unexpecting to users. 

526 

527 Currently this does three things: 

528 

529 1. Drop the basic auth part. This is technically wrong since a server can 

530 serve different content based on auth, but if it does that, it is even 

531 impossible to guarantee two URLs without auth are equivalent, since 

532 the user can input different auth information when prompted. So the 

533 practical solution is to assume the auth doesn't affect the response. 

534 2. Parse the query to avoid the ordering issue. Note that ordering under the 

535 same key in the query are NOT cleaned; i.e. "a=1&a=2" and "a=2&a=1" are 

536 still considered different. 

537 3. Explicitly drop most of the fragment part, except ``subdirectory=`` and 

538 hash values, since it should have no impact the downloaded content. Note 

539 that this drops the "egg=" part historically used to denote the requested 

540 project (and extras), which is wrong in the strictest sense, but too many 

541 people are supplying it inconsistently to cause superfluous resolution 

542 conflicts, so we choose to also ignore them. 

543 """ 

544 

545 parsed: urllib.parse.SplitResult 

546 query: Dict[str, List[str]] 

547 subdirectory: str 

548 hashes: Dict[str, str] 

549 

550 

551def _clean_link(link: Link) -> _CleanResult: 

552 parsed = link._parsed_url 

553 netloc = parsed.netloc.rsplit("@", 1)[-1] 

554 # According to RFC 8089, an empty host in file: means localhost. 

555 if parsed.scheme == "file" and not netloc: 

556 netloc = "localhost" 

557 fragment = urllib.parse.parse_qs(parsed.fragment) 

558 if "egg" in fragment: 

559 logger.debug("Ignoring egg= fragment in %s", link) 

560 try: 

561 # If there are multiple subdirectory values, use the first one. 

562 # This matches the behavior of Link.subdirectory_fragment. 

563 subdirectory = fragment["subdirectory"][0] 

564 except (IndexError, KeyError): 

565 subdirectory = "" 

566 # If there are multiple hash values under the same algorithm, use the 

567 # first one. This matches the behavior of Link.hash_value. 

568 hashes = {k: fragment[k][0] for k in _SUPPORTED_HASHES if k in fragment} 

569 return _CleanResult( 

570 parsed=parsed._replace(netloc=netloc, query="", fragment=""), 

571 query=urllib.parse.parse_qs(parsed.query), 

572 subdirectory=subdirectory, 

573 hashes=hashes, 

574 ) 

575 

576 

577@functools.lru_cache(maxsize=None) 

578def links_equivalent(link1: Link, link2: Link) -> bool: 

579 return _clean_link(link1) == _clean_link(link2)