Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/yarl/_url.py: 45%

604 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:52 +0000

1import functools 

2import math 

3import warnings 

4from collections.abc import Mapping, Sequence 

5from contextlib import suppress 

6from ipaddress import ip_address 

7from urllib.parse import SplitResult, parse_qsl, quote, urljoin, urlsplit, urlunsplit 

8 

9import idna 

10from multidict import MultiDict, MultiDictProxy 

11 

12from ._quoting import _Quoter, _Unquoter 

13 

14DEFAULT_PORTS = {"http": 80, "https": 443, "ws": 80, "wss": 443} 

15 

16sentinel = object() 

17 

18 

19def rewrite_module(obj: object) -> object: 

20 obj.__module__ = "yarl" 

21 return obj 

22 

23 

24class cached_property: 

25 """Use as a class method decorator. It operates almost exactly like 

26 the Python `@property` decorator, but it puts the result of the 

27 method it decorates into the instance dict after the first call, 

28 effectively replacing the function it decorates with an instance 

29 variable. It is, in Python parlance, a data descriptor. 

30 

31 """ 

32 

33 def __init__(self, wrapped): 

34 self.wrapped = wrapped 

35 try: 

36 self.__doc__ = wrapped.__doc__ 

37 except AttributeError: # pragma: no cover 

38 self.__doc__ = "" 

39 self.name = wrapped.__name__ 

40 

41 def __get__(self, inst, owner, _sentinel=sentinel): 

42 if inst is None: 

43 return self 

44 val = inst._cache.get(self.name, _sentinel) 

45 if val is not _sentinel: 

46 return val 

47 val = self.wrapped(inst) 

48 inst._cache[self.name] = val 

49 return val 

50 

51 def __set__(self, inst, value): 

52 raise AttributeError("cached property is read-only") 

53 

54 

55def _normalize_path_segments(segments): 

56 """Drop '.' and '..' from a sequence of str segments""" 

57 

58 resolved_path = [] 

59 

60 for seg in segments: 

61 if seg == "..": 

62 # ignore any .. segments that would otherwise cause an 

63 # IndexError when popped from resolved_path if 

64 # resolving for rfc3986 

65 with suppress(IndexError): 

66 resolved_path.pop() 

67 elif seg != ".": 

68 resolved_path.append(seg) 

69 

70 if segments and segments[-1] in (".", ".."): 

71 # do some post-processing here. 

72 # if the last segment was a relative dir, 

73 # then we need to append the trailing '/' 

74 resolved_path.append("") 

75 

76 return resolved_path 

77 

78 

79@rewrite_module 

80class URL: 

81 # Don't derive from str 

82 # follow pathlib.Path design 

83 # probably URL will not suffer from pathlib problems: 

84 # it's intended for libraries like aiohttp, 

85 # not to be passed into standard library functions like os.open etc. 

86 

87 # URL grammar (RFC 3986) 

88 # pct-encoded = "%" HEXDIG HEXDIG 

89 # reserved = gen-delims / sub-delims 

90 # gen-delims = ":" / "/" / "?" / "#" / "[" / "]" / "@" 

91 # sub-delims = "!" / "$" / "&" / "'" / "(" / ")" 

92 # / "*" / "+" / "," / ";" / "=" 

93 # unreserved = ALPHA / DIGIT / "-" / "." / "_" / "~" 

94 # URI = scheme ":" hier-part [ "?" query ] [ "#" fragment ] 

95 # hier-part = "//" authority path-abempty 

96 # / path-absolute 

97 # / path-rootless 

98 # / path-empty 

99 # scheme = ALPHA *( ALPHA / DIGIT / "+" / "-" / "." ) 

100 # authority = [ userinfo "@" ] host [ ":" port ] 

101 # userinfo = *( unreserved / pct-encoded / sub-delims / ":" ) 

102 # host = IP-literal / IPv4address / reg-name 

103 # IP-literal = "[" ( IPv6address / IPvFuture ) "]" 

104 # IPvFuture = "v" 1*HEXDIG "." 1*( unreserved / sub-delims / ":" ) 

105 # IPv6address = 6( h16 ":" ) ls32 

106 # / "::" 5( h16 ":" ) ls32 

107 # / [ h16 ] "::" 4( h16 ":" ) ls32 

108 # / [ *1( h16 ":" ) h16 ] "::" 3( h16 ":" ) ls32 

109 # / [ *2( h16 ":" ) h16 ] "::" 2( h16 ":" ) ls32 

110 # / [ *3( h16 ":" ) h16 ] "::" h16 ":" ls32 

111 # / [ *4( h16 ":" ) h16 ] "::" ls32 

112 # / [ *5( h16 ":" ) h16 ] "::" h16 

113 # / [ *6( h16 ":" ) h16 ] "::" 

114 # ls32 = ( h16 ":" h16 ) / IPv4address 

115 # ; least-significant 32 bits of address 

116 # h16 = 1*4HEXDIG 

117 # ; 16 bits of address represented in hexadecimal 

118 # IPv4address = dec-octet "." dec-octet "." dec-octet "." dec-octet 

119 # dec-octet = DIGIT ; 0-9 

120 # / %x31-39 DIGIT ; 10-99 

121 # / "1" 2DIGIT ; 100-199 

122 # / "2" %x30-34 DIGIT ; 200-249 

123 # / "25" %x30-35 ; 250-255 

124 # reg-name = *( unreserved / pct-encoded / sub-delims ) 

125 # port = *DIGIT 

126 # path = path-abempty ; begins with "/" or is empty 

127 # / path-absolute ; begins with "/" but not "//" 

128 # / path-noscheme ; begins with a non-colon segment 

129 # / path-rootless ; begins with a segment 

130 # / path-empty ; zero characters 

131 # path-abempty = *( "/" segment ) 

132 # path-absolute = "/" [ segment-nz *( "/" segment ) ] 

133 # path-noscheme = segment-nz-nc *( "/" segment ) 

134 # path-rootless = segment-nz *( "/" segment ) 

135 # path-empty = 0<pchar> 

136 # segment = *pchar 

137 # segment-nz = 1*pchar 

138 # segment-nz-nc = 1*( unreserved / pct-encoded / sub-delims / "@" ) 

139 # ; non-zero-length segment without any colon ":" 

140 # pchar = unreserved / pct-encoded / sub-delims / ":" / "@" 

141 # query = *( pchar / "/" / "?" ) 

142 # fragment = *( pchar / "/" / "?" ) 

143 # URI-reference = URI / relative-ref 

144 # relative-ref = relative-part [ "?" query ] [ "#" fragment ] 

145 # relative-part = "//" authority path-abempty 

146 # / path-absolute 

147 # / path-noscheme 

148 # / path-empty 

149 # absolute-URI = scheme ":" hier-part [ "?" query ] 

150 __slots__ = ("_cache", "_val") 

151 

152 _QUOTER = _Quoter(requote=False) 

153 _REQUOTER = _Quoter() 

154 _PATH_QUOTER = _Quoter(safe="@:", protected="/+", requote=False) 

155 _PATH_REQUOTER = _Quoter(safe="@:", protected="/+") 

156 _QUERY_QUOTER = _Quoter(safe="?/:@", protected="=+&;", qs=True, requote=False) 

157 _QUERY_REQUOTER = _Quoter(safe="?/:@", protected="=+&;", qs=True) 

158 _QUERY_PART_QUOTER = _Quoter(safe="?/:@", qs=True, requote=False) 

159 _FRAGMENT_QUOTER = _Quoter(safe="?/:@", requote=False) 

160 _FRAGMENT_REQUOTER = _Quoter(safe="?/:@") 

161 

162 _UNQUOTER = _Unquoter() 

163 _PATH_UNQUOTER = _Unquoter(unsafe="+") 

164 _QS_UNQUOTER = _Unquoter(qs=True) 

165 

166 def __new__(cls, val="", *, encoded=False, strict=None): 

167 if strict is not None: # pragma: no cover 

168 warnings.warn("strict parameter is ignored") 

169 if type(val) is cls: 

170 return val 

171 if type(val) is str: 

172 val = urlsplit(val) 

173 elif type(val) is SplitResult: 

174 if not encoded: 

175 raise ValueError("Cannot apply decoding to SplitResult") 

176 elif isinstance(val, str): 

177 val = urlsplit(str(val)) 

178 else: 

179 raise TypeError("Constructor parameter should be str") 

180 

181 if not encoded: 

182 if not val[1]: # netloc 

183 netloc = "" 

184 host = "" 

185 else: 

186 host = val.hostname 

187 if host is None: 

188 raise ValueError("Invalid URL: host is required for absolute urls") 

189 

190 try: 

191 port = val.port 

192 except ValueError as e: 

193 raise ValueError( 

194 "Invalid URL: port can't be converted to integer" 

195 ) from e 

196 

197 netloc = cls._make_netloc( 

198 val.username, val.password, host, port, encode=True, requote=True 

199 ) 

200 path = cls._PATH_REQUOTER(val[2]) 

201 if netloc: 

202 path = cls._normalize_path(path) 

203 

204 cls._validate_authority_uri_abs_path(host=host, path=path) 

205 query = cls._QUERY_REQUOTER(val[3]) 

206 fragment = cls._FRAGMENT_REQUOTER(val[4]) 

207 val = SplitResult(val[0], netloc, path, query, fragment) 

208 

209 self = object.__new__(cls) 

210 self._val = val 

211 self._cache = {} 

212 return self 

213 

214 @classmethod 

215 def build( 

216 cls, 

217 *, 

218 scheme="", 

219 authority="", 

220 user=None, 

221 password=None, 

222 host="", 

223 port=None, 

224 path="", 

225 query=None, 

226 query_string="", 

227 fragment="", 

228 encoded=False, 

229 ): 

230 """Creates and returns a new URL""" 

231 

232 if authority and (user or password or host or port): 

233 raise ValueError( 

234 'Can\'t mix "authority" with "user", "password", "host" or "port".' 

235 ) 

236 if port and not host: 

237 raise ValueError('Can\'t build URL with "port" but without "host".') 

238 if query and query_string: 

239 raise ValueError('Only one of "query" or "query_string" should be passed') 

240 if ( 

241 scheme is None 

242 or authority is None 

243 or host is None 

244 or path is None 

245 or query_string is None 

246 or fragment is None 

247 ): 

248 raise TypeError( 

249 'NoneType is illegal for "scheme", "authority", "host", "path", ' 

250 '"query_string", and "fragment" args, use empty string instead.' 

251 ) 

252 

253 if authority: 

254 if encoded: 

255 netloc = authority 

256 else: 

257 tmp = SplitResult("", authority, "", "", "") 

258 netloc = cls._make_netloc( 

259 tmp.username, tmp.password, tmp.hostname, tmp.port, encode=True 

260 ) 

261 elif not user and not password and not host and not port: 

262 netloc = "" 

263 else: 

264 netloc = cls._make_netloc( 

265 user, password, host, port, encode=not encoded, encode_host=not encoded 

266 ) 

267 if not encoded: 

268 path = cls._PATH_QUOTER(path) 

269 if netloc: 

270 path = cls._normalize_path(path) 

271 

272 cls._validate_authority_uri_abs_path(host=host, path=path) 

273 query_string = cls._QUERY_QUOTER(query_string) 

274 fragment = cls._FRAGMENT_QUOTER(fragment) 

275 

276 url = cls( 

277 SplitResult(scheme, netloc, path, query_string, fragment), encoded=True 

278 ) 

279 

280 if query: 

281 return url.with_query(query) 

282 else: 

283 return url 

284 

285 def __init_subclass__(cls): 

286 raise TypeError(f"Inheriting a class {cls!r} from URL is forbidden") 

287 

288 def __str__(self): 

289 val = self._val 

290 if not val.path and self.is_absolute() and (val.query or val.fragment): 

291 val = val._replace(path="/") 

292 return urlunsplit(val) 

293 

294 def __repr__(self): 

295 return f"{self.__class__.__name__}('{str(self)}')" 

296 

297 def __bytes__(self): 

298 return str(self).encode("ascii") 

299 

300 def __eq__(self, other): 

301 if not type(other) is URL: 

302 return NotImplemented 

303 

304 val1 = self._val 

305 if not val1.path and self.is_absolute(): 

306 val1 = val1._replace(path="/") 

307 

308 val2 = other._val 

309 if not val2.path and other.is_absolute(): 

310 val2 = val2._replace(path="/") 

311 

312 return val1 == val2 

313 

314 def __hash__(self): 

315 ret = self._cache.get("hash") 

316 if ret is None: 

317 val = self._val 

318 if not val.path and self.is_absolute(): 

319 val = val._replace(path="/") 

320 ret = self._cache["hash"] = hash(val) 

321 return ret 

322 

323 def __le__(self, other): 

324 if not type(other) is URL: 

325 return NotImplemented 

326 return self._val <= other._val 

327 

328 def __lt__(self, other): 

329 if not type(other) is URL: 

330 return NotImplemented 

331 return self._val < other._val 

332 

333 def __ge__(self, other): 

334 if not type(other) is URL: 

335 return NotImplemented 

336 return self._val >= other._val 

337 

338 def __gt__(self, other): 

339 if not type(other) is URL: 

340 return NotImplemented 

341 return self._val > other._val 

342 

343 def __truediv__(self, name): 

344 if not type(name) is str: 

345 return NotImplemented 

346 return self._make_child((name,)) 

347 

348 def __mod__(self, query): 

349 return self.update_query(query) 

350 

351 def __bool__(self) -> bool: 

352 return bool( 

353 self._val.netloc or self._val.path or self._val.query or self._val.fragment 

354 ) 

355 

356 def __getstate__(self): 

357 return (self._val,) 

358 

359 def __setstate__(self, state): 

360 if state[0] is None and isinstance(state[1], dict): 

361 # default style pickle 

362 self._val = state[1]["_val"] 

363 else: 

364 self._val, *unused = state 

365 self._cache = {} 

366 

367 def is_absolute(self): 

368 """A check for absolute URLs. 

369 

370 Return True for absolute ones (having scheme or starting 

371 with //), False otherwise. 

372 

373 """ 

374 return self.raw_host is not None 

375 

376 def is_default_port(self): 

377 """A check for default port. 

378 

379 Return True if port is default for specified scheme, 

380 e.g. 'http://python.org' or 'http://python.org:80', False 

381 otherwise. 

382 

383 """ 

384 if self.port is None: 

385 return False 

386 default = DEFAULT_PORTS.get(self.scheme) 

387 if default is None: 

388 return False 

389 return self.port == default 

390 

391 def origin(self): 

392 """Return an URL with scheme, host and port parts only. 

393 

394 user, password, path, query and fragment are removed. 

395 

396 """ 

397 # TODO: add a keyword-only option for keeping user/pass maybe? 

398 if not self.is_absolute(): 

399 raise ValueError("URL should be absolute") 

400 if not self._val.scheme: 

401 raise ValueError("URL should have scheme") 

402 v = self._val 

403 netloc = self._make_netloc(None, None, v.hostname, v.port) 

404 val = v._replace(netloc=netloc, path="", query="", fragment="") 

405 return URL(val, encoded=True) 

406 

407 def relative(self): 

408 """Return a relative part of the URL. 

409 

410 scheme, user, password, host and port are removed. 

411 

412 """ 

413 if not self.is_absolute(): 

414 raise ValueError("URL should be absolute") 

415 val = self._val._replace(scheme="", netloc="") 

416 return URL(val, encoded=True) 

417 

418 @property 

419 def scheme(self): 

420 """Scheme for absolute URLs. 

421 

422 Empty string for relative URLs or URLs starting with // 

423 

424 """ 

425 return self._val.scheme 

426 

427 @property 

428 def raw_authority(self): 

429 """Encoded authority part of URL. 

430 

431 Empty string for relative URLs. 

432 

433 """ 

434 return self._val.netloc 

435 

436 @cached_property 

437 def authority(self): 

438 """Decoded authority part of URL. 

439 

440 Empty string for relative URLs. 

441 

442 """ 

443 return self._make_netloc( 

444 self.user, self.password, self.host, self.port, encode_host=False 

445 ) 

446 

447 @property 

448 def raw_user(self): 

449 """Encoded user part of URL. 

450 

451 None if user is missing. 

452 

453 """ 

454 # not .username 

455 ret = self._val.username 

456 if not ret: 

457 return None 

458 return ret 

459 

460 @cached_property 

461 def user(self): 

462 """Decoded user part of URL. 

463 

464 None if user is missing. 

465 

466 """ 

467 return self._UNQUOTER(self.raw_user) 

468 

469 @property 

470 def raw_password(self): 

471 """Encoded password part of URL. 

472 

473 None if password is missing. 

474 

475 """ 

476 return self._val.password 

477 

478 @cached_property 

479 def password(self): 

480 """Decoded password part of URL. 

481 

482 None if password is missing. 

483 

484 """ 

485 return self._UNQUOTER(self.raw_password) 

486 

487 @property 

488 def raw_host(self): 

489 """Encoded host part of URL. 

490 

491 None for relative URLs. 

492 

493 """ 

494 # Use host instead of hostname for sake of shortness 

495 # May add .hostname prop later 

496 return self._val.hostname 

497 

498 @cached_property 

499 def host(self): 

500 """Decoded host part of URL. 

501 

502 None for relative URLs. 

503 

504 """ 

505 raw = self.raw_host 

506 if raw is None: 

507 return None 

508 if "%" in raw: 

509 # Hack for scoped IPv6 addresses like 

510 # fe80::2%Проверка 

511 # presence of '%' sign means only IPv6 address, so idna is useless. 

512 return raw 

513 return _idna_decode(raw) 

514 

515 @property 

516 def port(self): 

517 """Port part of URL, with scheme-based fallback. 

518 

519 None for relative URLs or URLs without explicit port and 

520 scheme without default port substitution. 

521 

522 """ 

523 return self._val.port or DEFAULT_PORTS.get(self._val.scheme) 

524 

525 @property 

526 def explicit_port(self): 

527 """Port part of URL, without scheme-based fallback. 

528 

529 None for relative URLs or URLs without explicit port. 

530 

531 """ 

532 return self._val.port 

533 

534 @property 

535 def raw_path(self): 

536 """Encoded path of URL. 

537 

538 / for absolute URLs without path part. 

539 

540 """ 

541 ret = self._val.path 

542 if not ret and self.is_absolute(): 

543 ret = "/" 

544 return ret 

545 

546 @cached_property 

547 def path(self): 

548 """Decoded path of URL. 

549 

550 / for absolute URLs without path part. 

551 

552 """ 

553 return self._PATH_UNQUOTER(self.raw_path) 

554 

555 @cached_property 

556 def query(self): 

557 """A MultiDictProxy representing parsed query parameters in decoded 

558 representation. 

559 

560 Empty value if URL has no query part. 

561 

562 """ 

563 ret = MultiDict(parse_qsl(self.raw_query_string, keep_blank_values=True)) 

564 return MultiDictProxy(ret) 

565 

566 @property 

567 def raw_query_string(self): 

568 """Encoded query part of URL. 

569 

570 Empty string if query is missing. 

571 

572 """ 

573 return self._val.query 

574 

575 @cached_property 

576 def query_string(self): 

577 """Decoded query part of URL. 

578 

579 Empty string if query is missing. 

580 

581 """ 

582 return self._QS_UNQUOTER(self.raw_query_string) 

583 

584 @cached_property 

585 def path_qs(self): 

586 """Decoded path of URL with query.""" 

587 if not self.query_string: 

588 return self.path 

589 return f"{self.path}?{self.query_string}" 

590 

591 @cached_property 

592 def raw_path_qs(self): 

593 """Encoded path of URL with query.""" 

594 if not self.raw_query_string: 

595 return self.raw_path 

596 return f"{self.raw_path}?{self.raw_query_string}" 

597 

598 @property 

599 def raw_fragment(self): 

600 """Encoded fragment part of URL. 

601 

602 Empty string if fragment is missing. 

603 

604 """ 

605 return self._val.fragment 

606 

607 @cached_property 

608 def fragment(self): 

609 """Decoded fragment part of URL. 

610 

611 Empty string if fragment is missing. 

612 

613 """ 

614 return self._UNQUOTER(self.raw_fragment) 

615 

616 @cached_property 

617 def raw_parts(self): 

618 """A tuple containing encoded *path* parts. 

619 

620 ('/',) for absolute URLs if *path* is missing. 

621 

622 """ 

623 path = self._val.path 

624 if self.is_absolute(): 

625 if not path: 

626 parts = ["/"] 

627 else: 

628 parts = ["/"] + path[1:].split("/") 

629 else: 

630 if path.startswith("/"): 

631 parts = ["/"] + path[1:].split("/") 

632 else: 

633 parts = path.split("/") 

634 return tuple(parts) 

635 

636 @cached_property 

637 def parts(self): 

638 """A tuple containing decoded *path* parts. 

639 

640 ('/',) for absolute URLs if *path* is missing. 

641 

642 """ 

643 return tuple(self._UNQUOTER(part) for part in self.raw_parts) 

644 

645 @cached_property 

646 def parent(self): 

647 """A new URL with last part of path removed and cleaned up query and 

648 fragment. 

649 

650 """ 

651 path = self.raw_path 

652 if not path or path == "/": 

653 if self.raw_fragment or self.raw_query_string: 

654 return URL(self._val._replace(query="", fragment=""), encoded=True) 

655 return self 

656 parts = path.split("/") 

657 val = self._val._replace(path="/".join(parts[:-1]), query="", fragment="") 

658 return URL(val, encoded=True) 

659 

660 @cached_property 

661 def raw_name(self): 

662 """The last part of raw_parts.""" 

663 parts = self.raw_parts 

664 if self.is_absolute(): 

665 parts = parts[1:] 

666 if not parts: 

667 return "" 

668 else: 

669 return parts[-1] 

670 else: 

671 return parts[-1] 

672 

673 @cached_property 

674 def name(self): 

675 """The last part of parts.""" 

676 return self._UNQUOTER(self.raw_name) 

677 

678 @cached_property 

679 def raw_suffix(self): 

680 name = self.raw_name 

681 i = name.rfind(".") 

682 if 0 < i < len(name) - 1: 

683 return name[i:] 

684 else: 

685 return "" 

686 

687 @cached_property 

688 def suffix(self): 

689 return self._UNQUOTER(self.raw_suffix) 

690 

691 @cached_property 

692 def raw_suffixes(self): 

693 name = self.raw_name 

694 if name.endswith("."): 

695 return () 

696 name = name.lstrip(".") 

697 return tuple("." + suffix for suffix in name.split(".")[1:]) 

698 

699 @cached_property 

700 def suffixes(self): 

701 return tuple(self._UNQUOTER(suffix) for suffix in self.raw_suffixes) 

702 

703 @staticmethod 

704 def _validate_authority_uri_abs_path(host, path): 

705 """Ensure that path in URL with authority starts with a leading slash. 

706 

707 Raise ValueError if not. 

708 """ 

709 if len(host) > 0 and len(path) > 0 and not path.startswith("/"): 

710 raise ValueError( 

711 "Path in a URL with authority should start with a slash ('/') if set" 

712 ) 

713 

714 def _make_child(self, segments, encoded=False): 

715 """add segments to self._val.path, accounting for absolute vs relative paths""" 

716 parsed = [] 

717 for seg in reversed(segments): 

718 if not seg: 

719 continue 

720 if seg[0] == "/": 

721 raise ValueError( 

722 f"Appending path {seg!r} starting from slash is forbidden" 

723 ) 

724 seg = seg if encoded else self._PATH_QUOTER(seg) 

725 if "/" in seg: 

726 parsed += ( 

727 sub for sub in reversed(seg.split("/")) if sub and sub != "." 

728 ) 

729 elif seg != ".": 

730 parsed.append(seg) 

731 parsed.reverse() 

732 old_path = self._val.path 

733 if old_path: 

734 parsed = [*old_path.rstrip("/").split("/"), *parsed] 

735 if self.is_absolute(): 

736 parsed = _normalize_path_segments(parsed) 

737 if parsed and parsed[0] != "": 

738 # inject a leading slash when adding a path to an absolute URL 

739 # where there was none before 

740 parsed = ["", *parsed] 

741 new_path = "/".join(parsed) 

742 return URL( 

743 self._val._replace(path=new_path, query="", fragment=""), encoded=True 

744 ) 

745 

746 @classmethod 

747 def _normalize_path(cls, path): 

748 # Drop '.' and '..' from str path 

749 

750 prefix = "" 

751 if path.startswith("/"): 

752 # preserve the "/" root element of absolute paths, copying it to the 

753 # normalised output as per sections 5.2.4 and 6.2.2.3 of rfc3986. 

754 prefix = "/" 

755 path = path[1:] 

756 

757 segments = path.split("/") 

758 return prefix + "/".join(_normalize_path_segments(segments)) 

759 

760 @classmethod 

761 def _encode_host(cls, host, human=False): 

762 try: 

763 ip, sep, zone = host.partition("%") 

764 ip = ip_address(ip) 

765 except ValueError: 

766 host = host.lower() 

767 # IDNA encoding is slow, 

768 # skip it for ASCII-only strings 

769 # Don't move the check into _idna_encode() helper 

770 # to reduce the cache size 

771 if human or host.isascii(): 

772 return host 

773 host = _idna_encode(host) 

774 else: 

775 host = ip.compressed 

776 if sep: 

777 host += "%" + zone 

778 if ip.version == 6: 

779 host = "[" + host + "]" 

780 return host 

781 

782 @classmethod 

783 def _make_netloc( 

784 cls, user, password, host, port, encode=False, encode_host=True, requote=False 

785 ): 

786 quoter = cls._REQUOTER if requote else cls._QUOTER 

787 if encode_host: 

788 ret = cls._encode_host(host) 

789 else: 

790 ret = host 

791 if port is not None: 

792 ret = ret + ":" + str(port) 

793 if password is not None: 

794 if not user: 

795 user = "" 

796 else: 

797 if encode: 

798 user = quoter(user) 

799 if encode: 

800 password = quoter(password) 

801 user = user + ":" + password 

802 elif user and encode: 

803 user = quoter(user) 

804 if user: 

805 ret = user + "@" + ret 

806 return ret 

807 

808 def with_scheme(self, scheme): 

809 """Return a new URL with scheme replaced.""" 

810 # N.B. doesn't cleanup query/fragment 

811 if not isinstance(scheme, str): 

812 raise TypeError("Invalid scheme type") 

813 if not self.is_absolute(): 

814 raise ValueError("scheme replacement is not allowed for relative URLs") 

815 return URL(self._val._replace(scheme=scheme.lower()), encoded=True) 

816 

817 def with_user(self, user): 

818 """Return a new URL with user replaced. 

819 

820 Autoencode user if needed. 

821 

822 Clear user/password if user is None. 

823 

824 """ 

825 # N.B. doesn't cleanup query/fragment 

826 val = self._val 

827 if user is None: 

828 password = None 

829 elif isinstance(user, str): 

830 user = self._QUOTER(user) 

831 password = val.password 

832 else: 

833 raise TypeError("Invalid user type") 

834 if not self.is_absolute(): 

835 raise ValueError("user replacement is not allowed for relative URLs") 

836 return URL( 

837 self._val._replace( 

838 netloc=self._make_netloc(user, password, val.hostname, val.port) 

839 ), 

840 encoded=True, 

841 ) 

842 

843 def with_password(self, password): 

844 """Return a new URL with password replaced. 

845 

846 Autoencode password if needed. 

847 

848 Clear password if argument is None. 

849 

850 """ 

851 # N.B. doesn't cleanup query/fragment 

852 if password is None: 

853 pass 

854 elif isinstance(password, str): 

855 password = self._QUOTER(password) 

856 else: 

857 raise TypeError("Invalid password type") 

858 if not self.is_absolute(): 

859 raise ValueError("password replacement is not allowed for relative URLs") 

860 val = self._val 

861 return URL( 

862 self._val._replace( 

863 netloc=self._make_netloc(val.username, password, val.hostname, val.port) 

864 ), 

865 encoded=True, 

866 ) 

867 

868 def with_host(self, host): 

869 """Return a new URL with host replaced. 

870 

871 Autoencode host if needed. 

872 

873 Changing host for relative URLs is not allowed, use .join() 

874 instead. 

875 

876 """ 

877 # N.B. doesn't cleanup query/fragment 

878 if not isinstance(host, str): 

879 raise TypeError("Invalid host type") 

880 if not self.is_absolute(): 

881 raise ValueError("host replacement is not allowed for relative URLs") 

882 if not host: 

883 raise ValueError("host removing is not allowed") 

884 val = self._val 

885 return URL( 

886 self._val._replace( 

887 netloc=self._make_netloc(val.username, val.password, host, val.port) 

888 ), 

889 encoded=True, 

890 ) 

891 

892 def with_port(self, port): 

893 """Return a new URL with port replaced. 

894 

895 Clear port to default if None is passed. 

896 

897 """ 

898 # N.B. doesn't cleanup query/fragment 

899 if port is not None: 

900 if isinstance(port, bool) or not isinstance(port, int): 

901 raise TypeError(f"port should be int or None, got {type(port)}") 

902 if port < 0 or port > 65535: 

903 raise ValueError(f"port must be between 0 and 65535, got {port}") 

904 if not self.is_absolute(): 

905 raise ValueError("port replacement is not allowed for relative URLs") 

906 val = self._val 

907 return URL( 

908 self._val._replace( 

909 netloc=self._make_netloc(val.username, val.password, val.hostname, port) 

910 ), 

911 encoded=True, 

912 ) 

913 

914 def with_path(self, path, *, encoded=False): 

915 """Return a new URL with path replaced.""" 

916 if not encoded: 

917 path = self._PATH_QUOTER(path) 

918 if self.is_absolute(): 

919 path = self._normalize_path(path) 

920 if len(path) > 0 and path[0] != "/": 

921 path = "/" + path 

922 return URL(self._val._replace(path=path, query="", fragment=""), encoded=True) 

923 

924 @classmethod 

925 def _query_seq_pairs(cls, quoter, pairs): 

926 for key, val in pairs: 

927 if isinstance(val, (list, tuple)): 

928 for v in val: 

929 yield quoter(key) + "=" + quoter(cls._query_var(v)) 

930 else: 

931 yield quoter(key) + "=" + quoter(cls._query_var(val)) 

932 

933 @staticmethod 

934 def _query_var(v): 

935 cls = type(v) 

936 if issubclass(cls, str): 

937 return v 

938 if issubclass(cls, float): 

939 if math.isinf(v): 

940 raise ValueError("float('inf') is not supported") 

941 if math.isnan(v): 

942 raise ValueError("float('nan') is not supported") 

943 return str(float(v)) 

944 if issubclass(cls, int) and cls is not bool: 

945 return str(int(v)) 

946 raise TypeError( 

947 "Invalid variable type: value " 

948 "should be str, int or float, got {!r} " 

949 "of type {}".format(v, cls) 

950 ) 

951 

952 def _get_str_query(self, *args, **kwargs): 

953 if kwargs: 

954 if len(args) > 0: 

955 raise ValueError( 

956 "Either kwargs or single query parameter must be present" 

957 ) 

958 query = kwargs 

959 elif len(args) == 1: 

960 query = args[0] 

961 else: 

962 raise ValueError("Either kwargs or single query parameter must be present") 

963 

964 if query is None: 

965 query = None 

966 elif isinstance(query, Mapping): 

967 quoter = self._QUERY_PART_QUOTER 

968 query = "&".join(self._query_seq_pairs(quoter, query.items())) 

969 elif isinstance(query, str): 

970 query = self._QUERY_QUOTER(query) 

971 elif isinstance(query, (bytes, bytearray, memoryview)): 

972 raise TypeError( 

973 "Invalid query type: bytes, bytearray and memoryview are forbidden" 

974 ) 

975 elif isinstance(query, Sequence): 

976 quoter = self._QUERY_PART_QUOTER 

977 # We don't expect sequence values if we're given a list of pairs 

978 # already; only mappings like builtin `dict` which can't have the 

979 # same key pointing to multiple values are allowed to use 

980 # `_query_seq_pairs`. 

981 query = "&".join( 

982 quoter(k) + "=" + quoter(self._query_var(v)) for k, v in query 

983 ) 

984 else: 

985 raise TypeError( 

986 "Invalid query type: only str, mapping or " 

987 "sequence of (key, value) pairs is allowed" 

988 ) 

989 

990 return query 

991 

992 def with_query(self, *args, **kwargs): 

993 """Return a new URL with query part replaced. 

994 

995 Accepts any Mapping (e.g. dict, multidict.MultiDict instances) 

996 or str, autoencode the argument if needed. 

997 

998 A sequence of (key, value) pairs is supported as well. 

999 

1000 It also can take an arbitrary number of keyword arguments. 

1001 

1002 Clear query if None is passed. 

1003 

1004 """ 

1005 # N.B. doesn't cleanup query/fragment 

1006 

1007 new_query = self._get_str_query(*args, **kwargs) or "" 

1008 return URL( 

1009 self._val._replace(path=self._val.path, query=new_query), encoded=True 

1010 ) 

1011 

1012 def update_query(self, *args, **kwargs): 

1013 """Return a new URL with query part updated.""" 

1014 s = self._get_str_query(*args, **kwargs) 

1015 query = None 

1016 if s is not None: 

1017 new_query = MultiDict(parse_qsl(s, keep_blank_values=True)) 

1018 query = MultiDict(self.query) 

1019 query.update(new_query) 

1020 

1021 return URL( 

1022 self._val._replace(query=self._get_str_query(query) or ""), encoded=True 

1023 ) 

1024 

1025 def with_fragment(self, fragment): 

1026 """Return a new URL with fragment replaced. 

1027 

1028 Autoencode fragment if needed. 

1029 

1030 Clear fragment to default if None is passed. 

1031 

1032 """ 

1033 # N.B. doesn't cleanup query/fragment 

1034 if fragment is None: 

1035 raw_fragment = "" 

1036 elif not isinstance(fragment, str): 

1037 raise TypeError("Invalid fragment type") 

1038 else: 

1039 raw_fragment = self._FRAGMENT_QUOTER(fragment) 

1040 if self.raw_fragment == raw_fragment: 

1041 return self 

1042 return URL(self._val._replace(fragment=raw_fragment), encoded=True) 

1043 

1044 def with_name(self, name): 

1045 """Return a new URL with name (last part of path) replaced. 

1046 

1047 Query and fragment parts are cleaned up. 

1048 

1049 Name is encoded if needed. 

1050 

1051 """ 

1052 # N.B. DOES cleanup query/fragment 

1053 if not isinstance(name, str): 

1054 raise TypeError("Invalid name type") 

1055 if "/" in name: 

1056 raise ValueError("Slash in name is not allowed") 

1057 name = self._PATH_QUOTER(name) 

1058 if name in (".", ".."): 

1059 raise ValueError(". and .. values are forbidden") 

1060 parts = list(self.raw_parts) 

1061 if self.is_absolute(): 

1062 if len(parts) == 1: 

1063 parts.append(name) 

1064 else: 

1065 parts[-1] = name 

1066 parts[0] = "" # replace leading '/' 

1067 else: 

1068 parts[-1] = name 

1069 if parts[0] == "/": 

1070 parts[0] = "" # replace leading '/' 

1071 return URL( 

1072 self._val._replace(path="/".join(parts), query="", fragment=""), 

1073 encoded=True, 

1074 ) 

1075 

1076 def with_suffix(self, suffix): 

1077 """Return a new URL with suffix (file extension of name) replaced. 

1078 

1079 Query and fragment parts are cleaned up. 

1080 

1081 suffix is encoded if needed. 

1082 """ 

1083 if not isinstance(suffix, str): 

1084 raise TypeError("Invalid suffix type") 

1085 if suffix and not suffix.startswith(".") or suffix == ".": 

1086 raise ValueError(f"Invalid suffix {suffix!r}") 

1087 name = self.raw_name 

1088 if not name: 

1089 raise ValueError(f"{self!r} has an empty name") 

1090 old_suffix = self.raw_suffix 

1091 if not old_suffix: 

1092 name = name + suffix 

1093 else: 

1094 name = name[: -len(old_suffix)] + suffix 

1095 return self.with_name(name) 

1096 

1097 def join(self, url): 

1098 """Join URLs 

1099 

1100 Construct a full (“absolute”) URL by combining a “base URL” 

1101 (self) with another URL (url). 

1102 

1103 Informally, this uses components of the base URL, in 

1104 particular the addressing scheme, the network location and 

1105 (part of) the path, to provide missing components in the 

1106 relative URL. 

1107 

1108 """ 

1109 # See docs for urllib.parse.urljoin 

1110 if not isinstance(url, URL): 

1111 raise TypeError("url should be URL") 

1112 return URL(urljoin(str(self), str(url)), encoded=True) 

1113 

1114 def joinpath(self, *other, encoded=False): 

1115 """Return a new URL with the elements in other appended to the path.""" 

1116 return self._make_child(other, encoded=encoded) 

1117 

1118 def human_repr(self): 

1119 """Return decoded human readable string for URL representation.""" 

1120 user = _human_quote(self.user, "#/:?@") 

1121 password = _human_quote(self.password, "#/:?@") 

1122 host = self.host 

1123 if host: 

1124 host = self._encode_host(self.host, human=True) 

1125 path = _human_quote(self.path, "#?") 

1126 query_string = "&".join( 

1127 "{}={}".format(_human_quote(k, "#&+;="), _human_quote(v, "#&+;=")) 

1128 for k, v in self.query.items() 

1129 ) 

1130 fragment = _human_quote(self.fragment, "") 

1131 return urlunsplit( 

1132 SplitResult( 

1133 self.scheme, 

1134 self._make_netloc( 

1135 user, 

1136 password, 

1137 host, 

1138 self._val.port, 

1139 encode_host=False, 

1140 ), 

1141 path, 

1142 query_string, 

1143 fragment, 

1144 ) 

1145 ) 

1146 

1147 

1148def _human_quote(s, unsafe): 

1149 if not s: 

1150 return s 

1151 for c in "%" + unsafe: 

1152 if c in s: 

1153 s = s.replace(c, f"%{ord(c):02X}") 

1154 if s.isprintable(): 

1155 return s 

1156 return "".join(c if c.isprintable() else quote(c) for c in s) 

1157 

1158 

1159_MAXCACHE = 256 

1160 

1161 

1162@functools.lru_cache(_MAXCACHE) 

1163def _idna_decode(raw): 

1164 try: 

1165 return idna.decode(raw.encode("ascii")) 

1166 except UnicodeError: # e.g. '::1' 

1167 return raw.encode("ascii").decode("idna") 

1168 

1169 

1170@functools.lru_cache(_MAXCACHE) 

1171def _idna_encode(host): 

1172 try: 

1173 return idna.encode(host, uts46=True).decode("ascii") 

1174 except UnicodeError: 

1175 return host.encode("idna").decode("ascii") 

1176 

1177 

1178@rewrite_module 

1179def cache_clear(): 

1180 _idna_decode.cache_clear() 

1181 _idna_encode.cache_clear() 

1182 

1183 

1184@rewrite_module 

1185def cache_info(): 

1186 return { 

1187 "idna_encode": _idna_encode.cache_info(), 

1188 "idna_decode": _idna_decode.cache_info(), 

1189 } 

1190 

1191 

1192@rewrite_module 

1193def cache_configure(*, idna_encode_size=_MAXCACHE, idna_decode_size=_MAXCACHE): 

1194 global _idna_decode, _idna_encode 

1195 

1196 _idna_encode = functools.lru_cache(idna_encode_size)(_idna_encode.__wrapped__) 

1197 _idna_decode = functools.lru_cache(idna_decode_size)(_idna_decode.__wrapped__)