Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pip/_internal/models/link.py: 41%

239 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:48 +0000

1import functools 

2import itertools 

3import logging 

4import os 

5import posixpath 

6import re 

7import urllib.parse 

8from dataclasses import dataclass 

9from typing import ( 

10 TYPE_CHECKING, 

11 Any, 

12 Dict, 

13 List, 

14 Mapping, 

15 NamedTuple, 

16 Optional, 

17 Tuple, 

18 Union, 

19) 

20 

21from pip._internal.utils.deprecation import deprecated 

22from pip._internal.utils.filetypes import WHEEL_EXTENSION 

23from pip._internal.utils.hashes import Hashes 

24from pip._internal.utils.misc import ( 

25 pairwise, 

26 redact_auth_from_url, 

27 split_auth_from_netloc, 

28 splitext, 

29) 

30from pip._internal.utils.models import KeyBasedCompareMixin 

31from pip._internal.utils.urls import path_to_url, url_to_path 

32 

33if TYPE_CHECKING: 

34 from pip._internal.index.collector import IndexContent 

35 

36logger = logging.getLogger(__name__) 

37 

38 

39# Order matters, earlier hashes have a precedence over later hashes for what 

40# we will pick to use. 

41_SUPPORTED_HASHES = ("sha512", "sha384", "sha256", "sha224", "sha1", "md5") 

42 

43 

44@dataclass(frozen=True) 

45class LinkHash: 

46 """Links to content may have embedded hash values. This class parses those. 

47 

48 `name` must be any member of `_SUPPORTED_HASHES`. 

49 

50 This class can be converted to and from `ArchiveInfo`. While ArchiveInfo intends to 

51 be JSON-serializable to conform to PEP 610, this class contains the logic for 

52 parsing a hash name and value for correctness, and then checking whether that hash 

53 conforms to a schema with `.is_hash_allowed()`.""" 

54 

55 name: str 

56 value: str 

57 

58 _hash_url_fragment_re = re.compile( 

59 # NB: we do not validate that the second group (.*) is a valid hex 

60 # digest. Instead, we simply keep that string in this class, and then check it 

61 # against Hashes when hash-checking is needed. This is easier to debug than 

62 # proactively discarding an invalid hex digest, as we handle incorrect hashes 

63 # and malformed hashes in the same place. 

64 r"[#&]({choices})=([^&]*)".format( 

65 choices="|".join(re.escape(hash_name) for hash_name in _SUPPORTED_HASHES) 

66 ), 

67 ) 

68 

69 def __post_init__(self) -> None: 

70 assert self.name in _SUPPORTED_HASHES 

71 

72 @classmethod 

73 def parse_pep658_hash(cls, dist_info_metadata: str) -> Optional["LinkHash"]: 

74 """Parse a PEP 658 data-dist-info-metadata hash.""" 

75 if dist_info_metadata == "true": 

76 return None 

77 name, sep, value = dist_info_metadata.partition("=") 

78 if not sep: 

79 return None 

80 if name not in _SUPPORTED_HASHES: 

81 return None 

82 return cls(name=name, value=value) 

83 

84 @classmethod 

85 @functools.lru_cache(maxsize=None) 

86 def find_hash_url_fragment(cls, url: str) -> Optional["LinkHash"]: 

87 """Search a string for a checksum algorithm name and encoded output value.""" 

88 match = cls._hash_url_fragment_re.search(url) 

89 if match is None: 

90 return None 

91 name, value = match.groups() 

92 return cls(name=name, value=value) 

93 

94 def as_dict(self) -> Dict[str, str]: 

95 return {self.name: self.value} 

96 

97 def as_hashes(self) -> Hashes: 

98 """Return a Hashes instance which checks only for the current hash.""" 

99 return Hashes({self.name: [self.value]}) 

100 

101 def is_hash_allowed(self, hashes: Optional[Hashes]) -> bool: 

102 """ 

103 Return True if the current hash is allowed by `hashes`. 

104 """ 

105 if hashes is None: 

106 return False 

107 return hashes.is_hash_allowed(self.name, hex_digest=self.value) 

108 

109 

110def _clean_url_path_part(part: str) -> str: 

111 """ 

112 Clean a "part" of a URL path (i.e. after splitting on "@" characters). 

113 """ 

114 # We unquote prior to quoting to make sure nothing is double quoted. 

115 return urllib.parse.quote(urllib.parse.unquote(part)) 

116 

117 

118def _clean_file_url_path(part: str) -> str: 

119 """ 

120 Clean the first part of a URL path that corresponds to a local 

121 filesystem path (i.e. the first part after splitting on "@" characters). 

122 """ 

123 # We unquote prior to quoting to make sure nothing is double quoted. 

124 # Also, on Windows the path part might contain a drive letter which 

125 # should not be quoted. On Linux where drive letters do not 

126 # exist, the colon should be quoted. We rely on urllib.request 

127 # to do the right thing here. 

128 return urllib.request.pathname2url(urllib.request.url2pathname(part)) 

129 

130 

131# percent-encoded: / 

132_reserved_chars_re = re.compile("(@|%2F)", re.IGNORECASE) 

133 

134 

135def _clean_url_path(path: str, is_local_path: bool) -> str: 

136 """ 

137 Clean the path portion of a URL. 

138 """ 

139 if is_local_path: 

140 clean_func = _clean_file_url_path 

141 else: 

142 clean_func = _clean_url_path_part 

143 

144 # Split on the reserved characters prior to cleaning so that 

145 # revision strings in VCS URLs are properly preserved. 

146 parts = _reserved_chars_re.split(path) 

147 

148 cleaned_parts = [] 

149 for to_clean, reserved in pairwise(itertools.chain(parts, [""])): 

150 cleaned_parts.append(clean_func(to_clean)) 

151 # Normalize %xx escapes (e.g. %2f -> %2F) 

152 cleaned_parts.append(reserved.upper()) 

153 

154 return "".join(cleaned_parts) 

155 

156 

157def _ensure_quoted_url(url: str) -> str: 

158 """ 

159 Make sure a link is fully quoted. 

160 For example, if ' ' occurs in the URL, it will be replaced with "%20", 

161 and without double-quoting other characters. 

162 """ 

163 # Split the URL into parts according to the general structure 

164 # `scheme://netloc/path;parameters?query#fragment`. 

165 result = urllib.parse.urlparse(url) 

166 # If the netloc is empty, then the URL refers to a local filesystem path. 

167 is_local_path = not result.netloc 

168 path = _clean_url_path(result.path, is_local_path=is_local_path) 

169 return urllib.parse.urlunparse(result._replace(path=path)) 

170 

171 

172class Link(KeyBasedCompareMixin): 

173 """Represents a parsed link from a Package Index's simple URL""" 

174 

175 __slots__ = [ 

176 "_parsed_url", 

177 "_url", 

178 "_hashes", 

179 "comes_from", 

180 "requires_python", 

181 "yanked_reason", 

182 "dist_info_metadata", 

183 "cache_link_parsing", 

184 "egg_fragment", 

185 ] 

186 

187 def __init__( 

188 self, 

189 url: str, 

190 comes_from: Optional[Union[str, "IndexContent"]] = None, 

191 requires_python: Optional[str] = None, 

192 yanked_reason: Optional[str] = None, 

193 dist_info_metadata: Optional[str] = None, 

194 cache_link_parsing: bool = True, 

195 hashes: Optional[Mapping[str, str]] = None, 

196 ) -> None: 

197 """ 

198 :param url: url of the resource pointed to (href of the link) 

199 :param comes_from: instance of IndexContent where the link was found, 

200 or string. 

201 :param requires_python: String containing the `Requires-Python` 

202 metadata field, specified in PEP 345. This may be specified by 

203 a data-requires-python attribute in the HTML link tag, as 

204 described in PEP 503. 

205 :param yanked_reason: the reason the file has been yanked, if the 

206 file has been yanked, or None if the file hasn't been yanked. 

207 This is the value of the "data-yanked" attribute, if present, in 

208 a simple repository HTML link. If the file has been yanked but 

209 no reason was provided, this should be the empty string. See 

210 PEP 592 for more information and the specification. 

211 :param dist_info_metadata: the metadata attached to the file, or None if no such 

212 metadata is provided. This is the value of the "data-dist-info-metadata" 

213 attribute, if present, in a simple repository HTML link. This may be parsed 

214 into its own `Link` by `self.metadata_link()`. See PEP 658 for more 

215 information and the specification. 

216 :param cache_link_parsing: A flag that is used elsewhere to determine 

217 whether resources retrieved from this link should be cached. PyPI 

218 URLs should generally have this set to False, for example. 

219 :param hashes: A mapping of hash names to digests to allow us to 

220 determine the validity of a download. 

221 """ 

222 

223 # url can be a UNC windows share 

224 if url.startswith("\\\\"): 

225 url = path_to_url(url) 

226 

227 self._parsed_url = urllib.parse.urlsplit(url) 

228 # Store the url as a private attribute to prevent accidentally 

229 # trying to set a new value. 

230 self._url = url 

231 

232 link_hash = LinkHash.find_hash_url_fragment(url) 

233 hashes_from_link = {} if link_hash is None else link_hash.as_dict() 

234 if hashes is None: 

235 self._hashes = hashes_from_link 

236 else: 

237 self._hashes = {**hashes, **hashes_from_link} 

238 

239 self.comes_from = comes_from 

240 self.requires_python = requires_python if requires_python else None 

241 self.yanked_reason = yanked_reason 

242 self.dist_info_metadata = dist_info_metadata 

243 

244 super().__init__(key=url, defining_class=Link) 

245 

246 self.cache_link_parsing = cache_link_parsing 

247 self.egg_fragment = self._egg_fragment() 

248 

249 @classmethod 

250 def from_json( 

251 cls, 

252 file_data: Dict[str, Any], 

253 page_url: str, 

254 ) -> Optional["Link"]: 

255 """ 

256 Convert an pypi json document from a simple repository page into a Link. 

257 """ 

258 file_url = file_data.get("url") 

259 if file_url is None: 

260 return None 

261 

262 url = _ensure_quoted_url(urllib.parse.urljoin(page_url, file_url)) 

263 pyrequire = file_data.get("requires-python") 

264 yanked_reason = file_data.get("yanked") 

265 dist_info_metadata = file_data.get("dist-info-metadata") 

266 hashes = file_data.get("hashes", {}) 

267 

268 # The Link.yanked_reason expects an empty string instead of a boolean. 

269 if yanked_reason and not isinstance(yanked_reason, str): 

270 yanked_reason = "" 

271 # The Link.yanked_reason expects None instead of False. 

272 elif not yanked_reason: 

273 yanked_reason = None 

274 

275 return cls( 

276 url, 

277 comes_from=page_url, 

278 requires_python=pyrequire, 

279 yanked_reason=yanked_reason, 

280 hashes=hashes, 

281 dist_info_metadata=dist_info_metadata, 

282 ) 

283 

284 @classmethod 

285 def from_element( 

286 cls, 

287 anchor_attribs: Dict[str, Optional[str]], 

288 page_url: str, 

289 base_url: str, 

290 ) -> Optional["Link"]: 

291 """ 

292 Convert an anchor element's attributes in a simple repository page to a Link. 

293 """ 

294 href = anchor_attribs.get("href") 

295 if not href: 

296 return None 

297 

298 url = _ensure_quoted_url(urllib.parse.urljoin(base_url, href)) 

299 pyrequire = anchor_attribs.get("data-requires-python") 

300 yanked_reason = anchor_attribs.get("data-yanked") 

301 dist_info_metadata = anchor_attribs.get("data-dist-info-metadata") 

302 

303 return cls( 

304 url, 

305 comes_from=page_url, 

306 requires_python=pyrequire, 

307 yanked_reason=yanked_reason, 

308 dist_info_metadata=dist_info_metadata, 

309 ) 

310 

311 def __str__(self) -> str: 

312 if self.requires_python: 

313 rp = f" (requires-python:{self.requires_python})" 

314 else: 

315 rp = "" 

316 if self.comes_from: 

317 return "{} (from {}){}".format( 

318 redact_auth_from_url(self._url), self.comes_from, rp 

319 ) 

320 else: 

321 return redact_auth_from_url(str(self._url)) 

322 

323 def __repr__(self) -> str: 

324 return f"<Link {self}>" 

325 

326 @property 

327 def url(self) -> str: 

328 return self._url 

329 

330 @property 

331 def filename(self) -> str: 

332 path = self.path.rstrip("/") 

333 name = posixpath.basename(path) 

334 if not name: 

335 # Make sure we don't leak auth information if the netloc 

336 # includes a username and password. 

337 netloc, user_pass = split_auth_from_netloc(self.netloc) 

338 return netloc 

339 

340 name = urllib.parse.unquote(name) 

341 assert name, f"URL {self._url!r} produced no filename" 

342 return name 

343 

344 @property 

345 def file_path(self) -> str: 

346 return url_to_path(self.url) 

347 

348 @property 

349 def scheme(self) -> str: 

350 return self._parsed_url.scheme 

351 

352 @property 

353 def netloc(self) -> str: 

354 """ 

355 This can contain auth information. 

356 """ 

357 return self._parsed_url.netloc 

358 

359 @property 

360 def path(self) -> str: 

361 return urllib.parse.unquote(self._parsed_url.path) 

362 

363 def splitext(self) -> Tuple[str, str]: 

364 return splitext(posixpath.basename(self.path.rstrip("/"))) 

365 

366 @property 

367 def ext(self) -> str: 

368 return self.splitext()[1] 

369 

370 @property 

371 def url_without_fragment(self) -> str: 

372 scheme, netloc, path, query, fragment = self._parsed_url 

373 return urllib.parse.urlunsplit((scheme, netloc, path, query, "")) 

374 

375 _egg_fragment_re = re.compile(r"[#&]egg=([^&]*)") 

376 

377 # Per PEP 508. 

378 _project_name_re = re.compile( 

379 r"^([A-Z0-9]|[A-Z0-9][A-Z0-9._-]*[A-Z0-9])$", re.IGNORECASE 

380 ) 

381 

382 def _egg_fragment(self) -> Optional[str]: 

383 match = self._egg_fragment_re.search(self._url) 

384 if not match: 

385 return None 

386 

387 # An egg fragment looks like a PEP 508 project name, along with 

388 # an optional extras specifier. Anything else is invalid. 

389 project_name = match.group(1) 

390 if not self._project_name_re.match(project_name): 

391 deprecated( 

392 reason=f"{self} contains an egg fragment with a non-PEP 508 name", 

393 replacement="to use the req @ url syntax, and remove the egg fragment", 

394 gone_in="25.0", 

395 issue=11617, 

396 ) 

397 

398 return project_name 

399 

400 _subdirectory_fragment_re = re.compile(r"[#&]subdirectory=([^&]*)") 

401 

402 @property 

403 def subdirectory_fragment(self) -> Optional[str]: 

404 match = self._subdirectory_fragment_re.search(self._url) 

405 if not match: 

406 return None 

407 return match.group(1) 

408 

409 def metadata_link(self) -> Optional["Link"]: 

410 """Implementation of PEP 658 parsing.""" 

411 # Note that Link.from_element() parsing the "data-dist-info-metadata" attribute 

412 # from an HTML anchor tag is typically how the Link.dist_info_metadata attribute 

413 # gets set. 

414 if self.dist_info_metadata is None: 

415 return None 

416 metadata_url = f"{self.url_without_fragment}.metadata" 

417 metadata_link_hash = LinkHash.parse_pep658_hash(self.dist_info_metadata) 

418 if metadata_link_hash is None: 

419 return Link(metadata_url) 

420 return Link(metadata_url, hashes=metadata_link_hash.as_dict()) 

421 

422 def as_hashes(self) -> Hashes: 

423 return Hashes({k: [v] for k, v in self._hashes.items()}) 

424 

425 @property 

426 def hash(self) -> Optional[str]: 

427 return next(iter(self._hashes.values()), None) 

428 

429 @property 

430 def hash_name(self) -> Optional[str]: 

431 return next(iter(self._hashes), None) 

432 

433 @property 

434 def show_url(self) -> str: 

435 return posixpath.basename(self._url.split("#", 1)[0].split("?", 1)[0]) 

436 

437 @property 

438 def is_file(self) -> bool: 

439 return self.scheme == "file" 

440 

441 def is_existing_dir(self) -> bool: 

442 return self.is_file and os.path.isdir(self.file_path) 

443 

444 @property 

445 def is_wheel(self) -> bool: 

446 return self.ext == WHEEL_EXTENSION 

447 

448 @property 

449 def is_vcs(self) -> bool: 

450 from pip._internal.vcs import vcs 

451 

452 return self.scheme in vcs.all_schemes 

453 

454 @property 

455 def is_yanked(self) -> bool: 

456 return self.yanked_reason is not None 

457 

458 @property 

459 def has_hash(self) -> bool: 

460 return bool(self._hashes) 

461 

462 def is_hash_allowed(self, hashes: Optional[Hashes]) -> bool: 

463 """ 

464 Return True if the link has a hash and it is allowed by `hashes`. 

465 """ 

466 if hashes is None: 

467 return False 

468 return any(hashes.is_hash_allowed(k, v) for k, v in self._hashes.items()) 

469 

470 

471class _CleanResult(NamedTuple): 

472 """Convert link for equivalency check. 

473 

474 This is used in the resolver to check whether two URL-specified requirements 

475 likely point to the same distribution and can be considered equivalent. This 

476 equivalency logic avoids comparing URLs literally, which can be too strict 

477 (e.g. "a=1&b=2" vs "b=2&a=1") and produce conflicts unexpecting to users. 

478 

479 Currently this does three things: 

480 

481 1. Drop the basic auth part. This is technically wrong since a server can 

482 serve different content based on auth, but if it does that, it is even 

483 impossible to guarantee two URLs without auth are equivalent, since 

484 the user can input different auth information when prompted. So the 

485 practical solution is to assume the auth doesn't affect the response. 

486 2. Parse the query to avoid the ordering issue. Note that ordering under the 

487 same key in the query are NOT cleaned; i.e. "a=1&a=2" and "a=2&a=1" are 

488 still considered different. 

489 3. Explicitly drop most of the fragment part, except ``subdirectory=`` and 

490 hash values, since it should have no impact the downloaded content. Note 

491 that this drops the "egg=" part historically used to denote the requested 

492 project (and extras), which is wrong in the strictest sense, but too many 

493 people are supplying it inconsistently to cause superfluous resolution 

494 conflicts, so we choose to also ignore them. 

495 """ 

496 

497 parsed: urllib.parse.SplitResult 

498 query: Dict[str, List[str]] 

499 subdirectory: str 

500 hashes: Dict[str, str] 

501 

502 

503def _clean_link(link: Link) -> _CleanResult: 

504 parsed = link._parsed_url 

505 netloc = parsed.netloc.rsplit("@", 1)[-1] 

506 # According to RFC 8089, an empty host in file: means localhost. 

507 if parsed.scheme == "file" and not netloc: 

508 netloc = "localhost" 

509 fragment = urllib.parse.parse_qs(parsed.fragment) 

510 if "egg" in fragment: 

511 logger.debug("Ignoring egg= fragment in %s", link) 

512 try: 

513 # If there are multiple subdirectory values, use the first one. 

514 # This matches the behavior of Link.subdirectory_fragment. 

515 subdirectory = fragment["subdirectory"][0] 

516 except (IndexError, KeyError): 

517 subdirectory = "" 

518 # If there are multiple hash values under the same algorithm, use the 

519 # first one. This matches the behavior of Link.hash_value. 

520 hashes = {k: fragment[k][0] for k in _SUPPORTED_HASHES if k in fragment} 

521 return _CleanResult( 

522 parsed=parsed._replace(netloc=netloc, query="", fragment=""), 

523 query=urllib.parse.parse_qs(parsed.query), 

524 subdirectory=subdirectory, 

525 hashes=hashes, 

526 ) 

527 

528 

529@functools.lru_cache(maxsize=None) 

530def links_equivalent(link1: Link, link2: Link) -> bool: 

531 return _clean_link(link1) == _clean_link(link2)