Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/yarl/_url.py: 39%

606 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:40 +0000

1import functools 

2import math 

3import warnings 

4from collections.abc import Mapping, Sequence 

5from contextlib import suppress 

6from ipaddress import ip_address 

7from urllib.parse import SplitResult, parse_qsl, quote, urljoin, urlsplit, urlunsplit 

8 

9import idna 

10from multidict import MultiDict, MultiDictProxy 

11 

12from ._quoting import _Quoter, _Unquoter 

13 

14DEFAULT_PORTS = {"http": 80, "https": 443, "ws": 80, "wss": 443} 

15 

16sentinel = object() 

17 

18 

19def rewrite_module(obj: object) -> object: 

20 obj.__module__ = "yarl" 

21 return obj 

22 

23 

24class cached_property: 

25 """Use as a class method decorator. It operates almost exactly like 

26 the Python `@property` decorator, but it puts the result of the 

27 method it decorates into the instance dict after the first call, 

28 effectively replacing the function it decorates with an instance 

29 variable. It is, in Python parlance, a data descriptor. 

30 

31 """ 

32 

33 def __init__(self, wrapped): 

34 self.wrapped = wrapped 

35 try: 

36 self.__doc__ = wrapped.__doc__ 

37 except AttributeError: # pragma: no cover 

38 self.__doc__ = "" 

39 self.name = wrapped.__name__ 

40 

41 def __get__(self, inst, owner, _sentinel=sentinel): 

42 if inst is None: 

43 return self 

44 val = inst._cache.get(self.name, _sentinel) 

45 if val is not _sentinel: 

46 return val 

47 val = self.wrapped(inst) 

48 inst._cache[self.name] = val 

49 return val 

50 

51 def __set__(self, inst, value): 

52 raise AttributeError("cached property is read-only") 

53 

54 

55def _normalize_path_segments(segments): 

56 """Drop '.' and '..' from a sequence of str segments""" 

57 

58 resolved_path = [] 

59 

60 for seg in segments: 

61 if seg == "..": 

62 # ignore any .. segments that would otherwise cause an 

63 # IndexError when popped from resolved_path if 

64 # resolving for rfc3986 

65 with suppress(IndexError): 

66 resolved_path.pop() 

67 elif seg != ".": 

68 resolved_path.append(seg) 

69 

70 if segments and segments[-1] in (".", ".."): 

71 # do some post-processing here. 

72 # if the last segment was a relative dir, 

73 # then we need to append the trailing '/' 

74 resolved_path.append("") 

75 

76 return resolved_path 

77 

78 

79@rewrite_module 

80class URL: 

81 # Don't derive from str 

82 # follow pathlib.Path design 

83 # probably URL will not suffer from pathlib problems: 

84 # it's intended for libraries like aiohttp, 

85 # not to be passed into standard library functions like os.open etc. 

86 

87 # URL grammar (RFC 3986) 

88 # pct-encoded = "%" HEXDIG HEXDIG 

89 # reserved = gen-delims / sub-delims 

90 # gen-delims = ":" / "/" / "?" / "#" / "[" / "]" / "@" 

91 # sub-delims = "!" / "$" / "&" / "'" / "(" / ")" 

92 # / "*" / "+" / "," / ";" / "=" 

93 # unreserved = ALPHA / DIGIT / "-" / "." / "_" / "~" 

94 # URI = scheme ":" hier-part [ "?" query ] [ "#" fragment ] 

95 # hier-part = "//" authority path-abempty 

96 # / path-absolute 

97 # / path-rootless 

98 # / path-empty 

99 # scheme = ALPHA *( ALPHA / DIGIT / "+" / "-" / "." ) 

100 # authority = [ userinfo "@" ] host [ ":" port ] 

101 # userinfo = *( unreserved / pct-encoded / sub-delims / ":" ) 

102 # host = IP-literal / IPv4address / reg-name 

103 # IP-literal = "[" ( IPv6address / IPvFuture ) "]" 

104 # IPvFuture = "v" 1*HEXDIG "." 1*( unreserved / sub-delims / ":" ) 

105 # IPv6address = 6( h16 ":" ) ls32 

106 # / "::" 5( h16 ":" ) ls32 

107 # / [ h16 ] "::" 4( h16 ":" ) ls32 

108 # / [ *1( h16 ":" ) h16 ] "::" 3( h16 ":" ) ls32 

109 # / [ *2( h16 ":" ) h16 ] "::" 2( h16 ":" ) ls32 

110 # / [ *3( h16 ":" ) h16 ] "::" h16 ":" ls32 

111 # / [ *4( h16 ":" ) h16 ] "::" ls32 

112 # / [ *5( h16 ":" ) h16 ] "::" h16 

113 # / [ *6( h16 ":" ) h16 ] "::" 

114 # ls32 = ( h16 ":" h16 ) / IPv4address 

115 # ; least-significant 32 bits of address 

116 # h16 = 1*4HEXDIG 

117 # ; 16 bits of address represented in hexadecimal 

118 # IPv4address = dec-octet "." dec-octet "." dec-octet "." dec-octet 

119 # dec-octet = DIGIT ; 0-9 

120 # / %x31-39 DIGIT ; 10-99 

121 # / "1" 2DIGIT ; 100-199 

122 # / "2" %x30-34 DIGIT ; 200-249 

123 # / "25" %x30-35 ; 250-255 

124 # reg-name = *( unreserved / pct-encoded / sub-delims ) 

125 # port = *DIGIT 

126 # path = path-abempty ; begins with "/" or is empty 

127 # / path-absolute ; begins with "/" but not "//" 

128 # / path-noscheme ; begins with a non-colon segment 

129 # / path-rootless ; begins with a segment 

130 # / path-empty ; zero characters 

131 # path-abempty = *( "/" segment ) 

132 # path-absolute = "/" [ segment-nz *( "/" segment ) ] 

133 # path-noscheme = segment-nz-nc *( "/" segment ) 

134 # path-rootless = segment-nz *( "/" segment ) 

135 # path-empty = 0<pchar> 

136 # segment = *pchar 

137 # segment-nz = 1*pchar 

138 # segment-nz-nc = 1*( unreserved / pct-encoded / sub-delims / "@" ) 

139 # ; non-zero-length segment without any colon ":" 

140 # pchar = unreserved / pct-encoded / sub-delims / ":" / "@" 

141 # query = *( pchar / "/" / "?" ) 

142 # fragment = *( pchar / "/" / "?" ) 

143 # URI-reference = URI / relative-ref 

144 # relative-ref = relative-part [ "?" query ] [ "#" fragment ] 

145 # relative-part = "//" authority path-abempty 

146 # / path-absolute 

147 # / path-noscheme 

148 # / path-empty 

149 # absolute-URI = scheme ":" hier-part [ "?" query ] 

150 __slots__ = ("_cache", "_val") 

151 

152 _QUOTER = _Quoter(requote=False) 

153 _REQUOTER = _Quoter() 

154 _PATH_QUOTER = _Quoter(safe="@:", protected="/+", requote=False) 

155 _PATH_REQUOTER = _Quoter(safe="@:", protected="/+") 

156 _QUERY_QUOTER = _Quoter(safe="?/:@", protected="=+&;", qs=True, requote=False) 

157 _QUERY_REQUOTER = _Quoter(safe="?/:@", protected="=+&;", qs=True) 

158 _QUERY_PART_QUOTER = _Quoter(safe="?/:@", qs=True, requote=False) 

159 _FRAGMENT_QUOTER = _Quoter(safe="?/:@", requote=False) 

160 _FRAGMENT_REQUOTER = _Quoter(safe="?/:@") 

161 

162 _UNQUOTER = _Unquoter() 

163 _PATH_UNQUOTER = _Unquoter(unsafe="+") 

164 _QS_UNQUOTER = _Unquoter(qs=True) 

165 

166 def __new__(cls, val="", *, encoded=False, strict=None): 

167 if strict is not None: # pragma: no cover 

168 warnings.warn("strict parameter is ignored") 

169 if type(val) is cls: 

170 return val 

171 if type(val) is str: 

172 val = urlsplit(val) 

173 elif type(val) is SplitResult: 

174 if not encoded: 

175 raise ValueError("Cannot apply decoding to SplitResult") 

176 elif isinstance(val, str): 

177 val = urlsplit(str(val)) 

178 else: 

179 raise TypeError("Constructor parameter should be str") 

180 

181 if not encoded: 

182 if not val[1]: # netloc 

183 netloc = "" 

184 host = "" 

185 else: 

186 host = val.hostname 

187 if host is None: 

188 raise ValueError("Invalid URL: host is required for absolute urls") 

189 

190 try: 

191 port = val.port 

192 except ValueError as e: 

193 raise ValueError( 

194 "Invalid URL: port can't be converted to integer" 

195 ) from e 

196 

197 netloc = cls._make_netloc( 

198 val.username, val.password, host, port, encode=True, requote=True 

199 ) 

200 path = cls._PATH_REQUOTER(val[2]) 

201 if netloc: 

202 path = cls._normalize_path(path) 

203 

204 cls._validate_authority_uri_abs_path(host=host, path=path) 

205 query = cls._QUERY_REQUOTER(val[3]) 

206 fragment = cls._FRAGMENT_REQUOTER(val[4]) 

207 val = SplitResult(val[0], netloc, path, query, fragment) 

208 

209 self = object.__new__(cls) 

210 self._val = val 

211 self._cache = {} 

212 return self 

213 

214 @classmethod 

215 def build( 

216 cls, 

217 *, 

218 scheme="", 

219 authority="", 

220 user=None, 

221 password=None, 

222 host="", 

223 port=None, 

224 path="", 

225 query=None, 

226 query_string="", 

227 fragment="", 

228 encoded=False, 

229 ): 

230 """Creates and returns a new URL""" 

231 

232 if authority and (user or password or host or port): 

233 raise ValueError( 

234 'Can\'t mix "authority" with "user", "password", "host" or "port".' 

235 ) 

236 if not isinstance(port, (int, type(None))): 

237 raise TypeError("The port is required to be int.") 

238 if port and not host: 

239 raise ValueError('Can\'t build URL with "port" but without "host".') 

240 if query and query_string: 

241 raise ValueError('Only one of "query" or "query_string" should be passed') 

242 if ( 

243 scheme is None 

244 or authority is None 

245 or host is None 

246 or path is None 

247 or query_string is None 

248 or fragment is None 

249 ): 

250 raise TypeError( 

251 'NoneType is illegal for "scheme", "authority", "host", "path", ' 

252 '"query_string", and "fragment" args, use empty string instead.' 

253 ) 

254 

255 if authority: 

256 if encoded: 

257 netloc = authority 

258 else: 

259 tmp = SplitResult("", authority, "", "", "") 

260 netloc = cls._make_netloc( 

261 tmp.username, tmp.password, tmp.hostname, tmp.port, encode=True 

262 ) 

263 elif not user and not password and not host and not port: 

264 netloc = "" 

265 else: 

266 netloc = cls._make_netloc( 

267 user, password, host, port, encode=not encoded, encode_host=not encoded 

268 ) 

269 if not encoded: 

270 path = cls._PATH_QUOTER(path) 

271 if netloc: 

272 path = cls._normalize_path(path) 

273 

274 cls._validate_authority_uri_abs_path(host=host, path=path) 

275 query_string = cls._QUERY_QUOTER(query_string) 

276 fragment = cls._FRAGMENT_QUOTER(fragment) 

277 

278 url = cls( 

279 SplitResult(scheme, netloc, path, query_string, fragment), encoded=True 

280 ) 

281 

282 if query: 

283 return url.with_query(query) 

284 else: 

285 return url 

286 

287 def __init_subclass__(cls): 

288 raise TypeError(f"Inheriting a class {cls!r} from URL is forbidden") 

289 

290 def __str__(self): 

291 val = self._val 

292 if not val.path and self.is_absolute() and (val.query or val.fragment): 

293 val = val._replace(path="/") 

294 return urlunsplit(val) 

295 

296 def __repr__(self): 

297 return f"{self.__class__.__name__}('{str(self)}')" 

298 

299 def __bytes__(self): 

300 return str(self).encode("ascii") 

301 

302 def __eq__(self, other): 

303 if not type(other) is URL: 

304 return NotImplemented 

305 

306 val1 = self._val 

307 if not val1.path and self.is_absolute(): 

308 val1 = val1._replace(path="/") 

309 

310 val2 = other._val 

311 if not val2.path and other.is_absolute(): 

312 val2 = val2._replace(path="/") 

313 

314 return val1 == val2 

315 

316 def __hash__(self): 

317 ret = self._cache.get("hash") 

318 if ret is None: 

319 val = self._val 

320 if not val.path and self.is_absolute(): 

321 val = val._replace(path="/") 

322 ret = self._cache["hash"] = hash(val) 

323 return ret 

324 

325 def __le__(self, other): 

326 if not type(other) is URL: 

327 return NotImplemented 

328 return self._val <= other._val 

329 

330 def __lt__(self, other): 

331 if not type(other) is URL: 

332 return NotImplemented 

333 return self._val < other._val 

334 

335 def __ge__(self, other): 

336 if not type(other) is URL: 

337 return NotImplemented 

338 return self._val >= other._val 

339 

340 def __gt__(self, other): 

341 if not type(other) is URL: 

342 return NotImplemented 

343 return self._val > other._val 

344 

345 def __truediv__(self, name): 

346 if not isinstance(name, str): 

347 return NotImplemented 

348 return self._make_child((str(name),)) 

349 

350 def __mod__(self, query): 

351 return self.update_query(query) 

352 

353 def __bool__(self) -> bool: 

354 return bool( 

355 self._val.netloc or self._val.path or self._val.query or self._val.fragment 

356 ) 

357 

358 def __getstate__(self): 

359 return (self._val,) 

360 

361 def __setstate__(self, state): 

362 if state[0] is None and isinstance(state[1], dict): 

363 # default style pickle 

364 self._val = state[1]["_val"] 

365 else: 

366 self._val, *unused = state 

367 self._cache = {} 

368 

369 def is_absolute(self): 

370 """A check for absolute URLs. 

371 

372 Return True for absolute ones (having scheme or starting 

373 with //), False otherwise. 

374 

375 """ 

376 return self.raw_host is not None 

377 

378 def is_default_port(self): 

379 """A check for default port. 

380 

381 Return True if port is default for specified scheme, 

382 e.g. 'http://python.org' or 'http://python.org:80', False 

383 otherwise. 

384 

385 """ 

386 if self.port is None: 

387 return False 

388 default = DEFAULT_PORTS.get(self.scheme) 

389 if default is None: 

390 return False 

391 return self.port == default 

392 

393 def origin(self): 

394 """Return an URL with scheme, host and port parts only. 

395 

396 user, password, path, query and fragment are removed. 

397 

398 """ 

399 # TODO: add a keyword-only option for keeping user/pass maybe? 

400 if not self.is_absolute(): 

401 raise ValueError("URL should be absolute") 

402 if not self._val.scheme: 

403 raise ValueError("URL should have scheme") 

404 v = self._val 

405 netloc = self._make_netloc(None, None, v.hostname, v.port) 

406 val = v._replace(netloc=netloc, path="", query="", fragment="") 

407 return URL(val, encoded=True) 

408 

409 def relative(self): 

410 """Return a relative part of the URL. 

411 

412 scheme, user, password, host and port are removed. 

413 

414 """ 

415 if not self.is_absolute(): 

416 raise ValueError("URL should be absolute") 

417 val = self._val._replace(scheme="", netloc="") 

418 return URL(val, encoded=True) 

419 

420 @property 

421 def scheme(self): 

422 """Scheme for absolute URLs. 

423 

424 Empty string for relative URLs or URLs starting with // 

425 

426 """ 

427 return self._val.scheme 

428 

429 @property 

430 def raw_authority(self): 

431 """Encoded authority part of URL. 

432 

433 Empty string for relative URLs. 

434 

435 """ 

436 return self._val.netloc 

437 

438 @cached_property 

439 def authority(self): 

440 """Decoded authority part of URL. 

441 

442 Empty string for relative URLs. 

443 

444 """ 

445 return self._make_netloc( 

446 self.user, self.password, self.host, self.port, encode_host=False 

447 ) 

448 

449 @property 

450 def raw_user(self): 

451 """Encoded user part of URL. 

452 

453 None if user is missing. 

454 

455 """ 

456 # not .username 

457 ret = self._val.username 

458 if not ret: 

459 return None 

460 return ret 

461 

462 @cached_property 

463 def user(self): 

464 """Decoded user part of URL. 

465 

466 None if user is missing. 

467 

468 """ 

469 return self._UNQUOTER(self.raw_user) 

470 

471 @property 

472 def raw_password(self): 

473 """Encoded password part of URL. 

474 

475 None if password is missing. 

476 

477 """ 

478 return self._val.password 

479 

480 @cached_property 

481 def password(self): 

482 """Decoded password part of URL. 

483 

484 None if password is missing. 

485 

486 """ 

487 return self._UNQUOTER(self.raw_password) 

488 

489 @property 

490 def raw_host(self): 

491 """Encoded host part of URL. 

492 

493 None for relative URLs. 

494 

495 """ 

496 # Use host instead of hostname for sake of shortness 

497 # May add .hostname prop later 

498 return self._val.hostname 

499 

500 @cached_property 

501 def host(self): 

502 """Decoded host part of URL. 

503 

504 None for relative URLs. 

505 

506 """ 

507 raw = self.raw_host 

508 if raw is None: 

509 return None 

510 if "%" in raw: 

511 # Hack for scoped IPv6 addresses like 

512 # fe80::2%Перевірка 

513 # presence of '%' sign means only IPv6 address, so idna is useless. 

514 return raw 

515 return _idna_decode(raw) 

516 

517 @property 

518 def port(self): 

519 """Port part of URL, with scheme-based fallback. 

520 

521 None for relative URLs or URLs without explicit port and 

522 scheme without default port substitution. 

523 

524 """ 

525 return self._val.port or DEFAULT_PORTS.get(self._val.scheme) 

526 

527 @property 

528 def explicit_port(self): 

529 """Port part of URL, without scheme-based fallback. 

530 

531 None for relative URLs or URLs without explicit port. 

532 

533 """ 

534 return self._val.port 

535 

536 @property 

537 def raw_path(self): 

538 """Encoded path of URL. 

539 

540 / for absolute URLs without path part. 

541 

542 """ 

543 ret = self._val.path 

544 if not ret and self.is_absolute(): 

545 ret = "/" 

546 return ret 

547 

548 @cached_property 

549 def path(self): 

550 """Decoded path of URL. 

551 

552 / for absolute URLs without path part. 

553 

554 """ 

555 return self._PATH_UNQUOTER(self.raw_path) 

556 

557 @cached_property 

558 def query(self): 

559 """A MultiDictProxy representing parsed query parameters in decoded 

560 representation. 

561 

562 Empty value if URL has no query part. 

563 

564 """ 

565 ret = MultiDict(parse_qsl(self.raw_query_string, keep_blank_values=True)) 

566 return MultiDictProxy(ret) 

567 

568 @property 

569 def raw_query_string(self): 

570 """Encoded query part of URL. 

571 

572 Empty string if query is missing. 

573 

574 """ 

575 return self._val.query 

576 

577 @cached_property 

578 def query_string(self): 

579 """Decoded query part of URL. 

580 

581 Empty string if query is missing. 

582 

583 """ 

584 return self._QS_UNQUOTER(self.raw_query_string) 

585 

586 @cached_property 

587 def path_qs(self): 

588 """Decoded path of URL with query.""" 

589 if not self.query_string: 

590 return self.path 

591 return f"{self.path}?{self.query_string}" 

592 

593 @cached_property 

594 def raw_path_qs(self): 

595 """Encoded path of URL with query.""" 

596 if not self.raw_query_string: 

597 return self.raw_path 

598 return f"{self.raw_path}?{self.raw_query_string}" 

599 

600 @property 

601 def raw_fragment(self): 

602 """Encoded fragment part of URL. 

603 

604 Empty string if fragment is missing. 

605 

606 """ 

607 return self._val.fragment 

608 

609 @cached_property 

610 def fragment(self): 

611 """Decoded fragment part of URL. 

612 

613 Empty string if fragment is missing. 

614 

615 """ 

616 return self._UNQUOTER(self.raw_fragment) 

617 

618 @cached_property 

619 def raw_parts(self): 

620 """A tuple containing encoded *path* parts. 

621 

622 ('/',) for absolute URLs if *path* is missing. 

623 

624 """ 

625 path = self._val.path 

626 if self.is_absolute(): 

627 if not path: 

628 parts = ["/"] 

629 else: 

630 parts = ["/"] + path[1:].split("/") 

631 else: 

632 if path.startswith("/"): 

633 parts = ["/"] + path[1:].split("/") 

634 else: 

635 parts = path.split("/") 

636 return tuple(parts) 

637 

638 @cached_property 

639 def parts(self): 

640 """A tuple containing decoded *path* parts. 

641 

642 ('/',) for absolute URLs if *path* is missing. 

643 

644 """ 

645 return tuple(self._UNQUOTER(part) for part in self.raw_parts) 

646 

647 @cached_property 

648 def parent(self): 

649 """A new URL with last part of path removed and cleaned up query and 

650 fragment. 

651 

652 """ 

653 path = self.raw_path 

654 if not path or path == "/": 

655 if self.raw_fragment or self.raw_query_string: 

656 return URL(self._val._replace(query="", fragment=""), encoded=True) 

657 return self 

658 parts = path.split("/") 

659 val = self._val._replace(path="/".join(parts[:-1]), query="", fragment="") 

660 return URL(val, encoded=True) 

661 

662 @cached_property 

663 def raw_name(self): 

664 """The last part of raw_parts.""" 

665 parts = self.raw_parts 

666 if self.is_absolute(): 

667 parts = parts[1:] 

668 if not parts: 

669 return "" 

670 else: 

671 return parts[-1] 

672 else: 

673 return parts[-1] 

674 

675 @cached_property 

676 def name(self): 

677 """The last part of parts.""" 

678 return self._UNQUOTER(self.raw_name) 

679 

680 @cached_property 

681 def raw_suffix(self): 

682 name = self.raw_name 

683 i = name.rfind(".") 

684 if 0 < i < len(name) - 1: 

685 return name[i:] 

686 else: 

687 return "" 

688 

689 @cached_property 

690 def suffix(self): 

691 return self._UNQUOTER(self.raw_suffix) 

692 

693 @cached_property 

694 def raw_suffixes(self): 

695 name = self.raw_name 

696 if name.endswith("."): 

697 return () 

698 name = name.lstrip(".") 

699 return tuple("." + suffix for suffix in name.split(".")[1:]) 

700 

701 @cached_property 

702 def suffixes(self): 

703 return tuple(self._UNQUOTER(suffix) for suffix in self.raw_suffixes) 

704 

705 @staticmethod 

706 def _validate_authority_uri_abs_path(host, path): 

707 """Ensure that path in URL with authority starts with a leading slash. 

708 

709 Raise ValueError if not. 

710 """ 

711 if len(host) > 0 and len(path) > 0 and not path.startswith("/"): 

712 raise ValueError( 

713 "Path in a URL with authority should start with a slash ('/') if set" 

714 ) 

715 

716 def _make_child(self, segments, encoded=False): 

717 """add segments to self._val.path, accounting for absolute vs relative paths""" 

718 # keep the trailing slash if the last segment ends with / 

719 parsed = [""] if segments and segments[-1][-1:] == "/" else [] 

720 for seg in reversed(segments): 

721 if not seg: 

722 continue 

723 if seg[0] == "/": 

724 raise ValueError( 

725 f"Appending path {seg!r} starting from slash is forbidden" 

726 ) 

727 seg = seg if encoded else self._PATH_QUOTER(seg) 

728 if "/" in seg: 

729 parsed += ( 

730 sub for sub in reversed(seg.split("/")) if sub and sub != "." 

731 ) 

732 elif seg != ".": 

733 parsed.append(seg) 

734 parsed.reverse() 

735 old_path = self._val.path 

736 if old_path: 

737 parsed = [*old_path.rstrip("/").split("/"), *parsed] 

738 if self.is_absolute(): 

739 parsed = _normalize_path_segments(parsed) 

740 if parsed and parsed[0] != "": 

741 # inject a leading slash when adding a path to an absolute URL 

742 # where there was none before 

743 parsed = ["", *parsed] 

744 new_path = "/".join(parsed) 

745 return URL( 

746 self._val._replace(path=new_path, query="", fragment=""), encoded=True 

747 ) 

748 

749 @classmethod 

750 def _normalize_path(cls, path): 

751 # Drop '.' and '..' from str path 

752 

753 prefix = "" 

754 if path.startswith("/"): 

755 # preserve the "/" root element of absolute paths, copying it to the 

756 # normalised output as per sections 5.2.4 and 6.2.2.3 of rfc3986. 

757 prefix = "/" 

758 path = path[1:] 

759 

760 segments = path.split("/") 

761 return prefix + "/".join(_normalize_path_segments(segments)) 

762 

763 @classmethod 

764 def _encode_host(cls, host, human=False): 

765 try: 

766 ip, sep, zone = host.partition("%") 

767 ip = ip_address(ip) 

768 except ValueError: 

769 host = host.lower() 

770 # IDNA encoding is slow, 

771 # skip it for ASCII-only strings 

772 # Don't move the check into _idna_encode() helper 

773 # to reduce the cache size 

774 if human or host.isascii(): 

775 return host 

776 host = _idna_encode(host) 

777 else: 

778 host = ip.compressed 

779 if sep: 

780 host += "%" + zone 

781 if ip.version == 6: 

782 host = "[" + host + "]" 

783 return host 

784 

785 @classmethod 

786 def _make_netloc( 

787 cls, user, password, host, port, encode=False, encode_host=True, requote=False 

788 ): 

789 quoter = cls._REQUOTER if requote else cls._QUOTER 

790 if encode_host: 

791 ret = cls._encode_host(host) 

792 else: 

793 ret = host 

794 if port is not None: 

795 ret = ret + ":" + str(port) 

796 if password is not None: 

797 if not user: 

798 user = "" 

799 else: 

800 if encode: 

801 user = quoter(user) 

802 if encode: 

803 password = quoter(password) 

804 user = user + ":" + password 

805 elif user and encode: 

806 user = quoter(user) 

807 if user: 

808 ret = user + "@" + ret 

809 return ret 

810 

811 def with_scheme(self, scheme): 

812 """Return a new URL with scheme replaced.""" 

813 # N.B. doesn't cleanup query/fragment 

814 if not isinstance(scheme, str): 

815 raise TypeError("Invalid scheme type") 

816 if not self.is_absolute(): 

817 raise ValueError("scheme replacement is not allowed for relative URLs") 

818 return URL(self._val._replace(scheme=scheme.lower()), encoded=True) 

819 

820 def with_user(self, user): 

821 """Return a new URL with user replaced. 

822 

823 Autoencode user if needed. 

824 

825 Clear user/password if user is None. 

826 

827 """ 

828 # N.B. doesn't cleanup query/fragment 

829 val = self._val 

830 if user is None: 

831 password = None 

832 elif isinstance(user, str): 

833 user = self._QUOTER(user) 

834 password = val.password 

835 else: 

836 raise TypeError("Invalid user type") 

837 if not self.is_absolute(): 

838 raise ValueError("user replacement is not allowed for relative URLs") 

839 return URL( 

840 self._val._replace( 

841 netloc=self._make_netloc(user, password, val.hostname, val.port) 

842 ), 

843 encoded=True, 

844 ) 

845 

846 def with_password(self, password): 

847 """Return a new URL with password replaced. 

848 

849 Autoencode password if needed. 

850 

851 Clear password if argument is None. 

852 

853 """ 

854 # N.B. doesn't cleanup query/fragment 

855 if password is None: 

856 pass 

857 elif isinstance(password, str): 

858 password = self._QUOTER(password) 

859 else: 

860 raise TypeError("Invalid password type") 

861 if not self.is_absolute(): 

862 raise ValueError("password replacement is not allowed for relative URLs") 

863 val = self._val 

864 return URL( 

865 self._val._replace( 

866 netloc=self._make_netloc(val.username, password, val.hostname, val.port) 

867 ), 

868 encoded=True, 

869 ) 

870 

871 def with_host(self, host): 

872 """Return a new URL with host replaced. 

873 

874 Autoencode host if needed. 

875 

876 Changing host for relative URLs is not allowed, use .join() 

877 instead. 

878 

879 """ 

880 # N.B. doesn't cleanup query/fragment 

881 if not isinstance(host, str): 

882 raise TypeError("Invalid host type") 

883 if not self.is_absolute(): 

884 raise ValueError("host replacement is not allowed for relative URLs") 

885 if not host: 

886 raise ValueError("host removing is not allowed") 

887 val = self._val 

888 return URL( 

889 self._val._replace( 

890 netloc=self._make_netloc(val.username, val.password, host, val.port) 

891 ), 

892 encoded=True, 

893 ) 

894 

895 def with_port(self, port): 

896 """Return a new URL with port replaced. 

897 

898 Clear port to default if None is passed. 

899 

900 """ 

901 # N.B. doesn't cleanup query/fragment 

902 if port is not None: 

903 if isinstance(port, bool) or not isinstance(port, int): 

904 raise TypeError(f"port should be int or None, got {type(port)}") 

905 if port < 0 or port > 65535: 

906 raise ValueError(f"port must be between 0 and 65535, got {port}") 

907 if not self.is_absolute(): 

908 raise ValueError("port replacement is not allowed for relative URLs") 

909 val = self._val 

910 return URL( 

911 self._val._replace( 

912 netloc=self._make_netloc(val.username, val.password, val.hostname, port) 

913 ), 

914 encoded=True, 

915 ) 

916 

917 def with_path(self, path, *, encoded=False): 

918 """Return a new URL with path replaced.""" 

919 if not encoded: 

920 path = self._PATH_QUOTER(path) 

921 if self.is_absolute(): 

922 path = self._normalize_path(path) 

923 if len(path) > 0 and path[0] != "/": 

924 path = "/" + path 

925 return URL(self._val._replace(path=path, query="", fragment=""), encoded=True) 

926 

927 @classmethod 

928 def _query_seq_pairs(cls, quoter, pairs): 

929 for key, val in pairs: 

930 if isinstance(val, (list, tuple)): 

931 for v in val: 

932 yield quoter(key) + "=" + quoter(cls._query_var(v)) 

933 else: 

934 yield quoter(key) + "=" + quoter(cls._query_var(val)) 

935 

936 @staticmethod 

937 def _query_var(v): 

938 cls = type(v) 

939 if issubclass(cls, str): 

940 return v 

941 if issubclass(cls, float): 

942 if math.isinf(v): 

943 raise ValueError("float('inf') is not supported") 

944 if math.isnan(v): 

945 raise ValueError("float('nan') is not supported") 

946 return str(float(v)) 

947 if issubclass(cls, int) and cls is not bool: 

948 return str(int(v)) 

949 raise TypeError( 

950 "Invalid variable type: value " 

951 "should be str, int or float, got {!r} " 

952 "of type {}".format(v, cls) 

953 ) 

954 

955 def _get_str_query(self, *args, **kwargs): 

956 if kwargs: 

957 if len(args) > 0: 

958 raise ValueError( 

959 "Either kwargs or single query parameter must be present" 

960 ) 

961 query = kwargs 

962 elif len(args) == 1: 

963 query = args[0] 

964 else: 

965 raise ValueError("Either kwargs or single query parameter must be present") 

966 

967 if query is None: 

968 query = None 

969 elif isinstance(query, Mapping): 

970 quoter = self._QUERY_PART_QUOTER 

971 query = "&".join(self._query_seq_pairs(quoter, query.items())) 

972 elif isinstance(query, str): 

973 query = self._QUERY_QUOTER(query) 

974 elif isinstance(query, (bytes, bytearray, memoryview)): 

975 raise TypeError( 

976 "Invalid query type: bytes, bytearray and memoryview are forbidden" 

977 ) 

978 elif isinstance(query, Sequence): 

979 quoter = self._QUERY_PART_QUOTER 

980 # We don't expect sequence values if we're given a list of pairs 

981 # already; only mappings like builtin `dict` which can't have the 

982 # same key pointing to multiple values are allowed to use 

983 # `_query_seq_pairs`. 

984 query = "&".join( 

985 quoter(k) + "=" + quoter(self._query_var(v)) for k, v in query 

986 ) 

987 else: 

988 raise TypeError( 

989 "Invalid query type: only str, mapping or " 

990 "sequence of (key, value) pairs is allowed" 

991 ) 

992 

993 return query 

994 

995 def with_query(self, *args, **kwargs): 

996 """Return a new URL with query part replaced. 

997 

998 Accepts any Mapping (e.g. dict, multidict.MultiDict instances) 

999 or str, autoencode the argument if needed. 

1000 

1001 A sequence of (key, value) pairs is supported as well. 

1002 

1003 It also can take an arbitrary number of keyword arguments. 

1004 

1005 Clear query if None is passed. 

1006 

1007 """ 

1008 # N.B. doesn't cleanup query/fragment 

1009 

1010 new_query = self._get_str_query(*args, **kwargs) or "" 

1011 return URL( 

1012 self._val._replace(path=self._val.path, query=new_query), encoded=True 

1013 ) 

1014 

1015 def update_query(self, *args, **kwargs): 

1016 """Return a new URL with query part updated.""" 

1017 s = self._get_str_query(*args, **kwargs) 

1018 query = None 

1019 if s is not None: 

1020 new_query = MultiDict(parse_qsl(s, keep_blank_values=True)) 

1021 query = MultiDict(self.query) 

1022 query.update(new_query) 

1023 

1024 return URL( 

1025 self._val._replace(query=self._get_str_query(query) or ""), encoded=True 

1026 ) 

1027 

1028 def with_fragment(self, fragment): 

1029 """Return a new URL with fragment replaced. 

1030 

1031 Autoencode fragment if needed. 

1032 

1033 Clear fragment to default if None is passed. 

1034 

1035 """ 

1036 # N.B. doesn't cleanup query/fragment 

1037 if fragment is None: 

1038 raw_fragment = "" 

1039 elif not isinstance(fragment, str): 

1040 raise TypeError("Invalid fragment type") 

1041 else: 

1042 raw_fragment = self._FRAGMENT_QUOTER(fragment) 

1043 if self.raw_fragment == raw_fragment: 

1044 return self 

1045 return URL(self._val._replace(fragment=raw_fragment), encoded=True) 

1046 

1047 def with_name(self, name): 

1048 """Return a new URL with name (last part of path) replaced. 

1049 

1050 Query and fragment parts are cleaned up. 

1051 

1052 Name is encoded if needed. 

1053 

1054 """ 

1055 # N.B. DOES cleanup query/fragment 

1056 if not isinstance(name, str): 

1057 raise TypeError("Invalid name type") 

1058 if "/" in name: 

1059 raise ValueError("Slash in name is not allowed") 

1060 name = self._PATH_QUOTER(name) 

1061 if name in (".", ".."): 

1062 raise ValueError(". and .. values are forbidden") 

1063 parts = list(self.raw_parts) 

1064 if self.is_absolute(): 

1065 if len(parts) == 1: 

1066 parts.append(name) 

1067 else: 

1068 parts[-1] = name 

1069 parts[0] = "" # replace leading '/' 

1070 else: 

1071 parts[-1] = name 

1072 if parts[0] == "/": 

1073 parts[0] = "" # replace leading '/' 

1074 return URL( 

1075 self._val._replace(path="/".join(parts), query="", fragment=""), 

1076 encoded=True, 

1077 ) 

1078 

1079 def with_suffix(self, suffix): 

1080 """Return a new URL with suffix (file extension of name) replaced. 

1081 

1082 Query and fragment parts are cleaned up. 

1083 

1084 suffix is encoded if needed. 

1085 """ 

1086 if not isinstance(suffix, str): 

1087 raise TypeError("Invalid suffix type") 

1088 if suffix and not suffix.startswith(".") or suffix == ".": 

1089 raise ValueError(f"Invalid suffix {suffix!r}") 

1090 name = self.raw_name 

1091 if not name: 

1092 raise ValueError(f"{self!r} has an empty name") 

1093 old_suffix = self.raw_suffix 

1094 if not old_suffix: 

1095 name = name + suffix 

1096 else: 

1097 name = name[: -len(old_suffix)] + suffix 

1098 return self.with_name(name) 

1099 

1100 def join(self, url): 

1101 """Join URLs 

1102 

1103 Construct a full (“absolute”) URL by combining a “base URL” 

1104 (self) with another URL (url). 

1105 

1106 Informally, this uses components of the base URL, in 

1107 particular the addressing scheme, the network location and 

1108 (part of) the path, to provide missing components in the 

1109 relative URL. 

1110 

1111 """ 

1112 # See docs for urllib.parse.urljoin 

1113 if not isinstance(url, URL): 

1114 raise TypeError("url should be URL") 

1115 return URL(urljoin(str(self), str(url)), encoded=True) 

1116 

1117 def joinpath(self, *other, encoded=False): 

1118 """Return a new URL with the elements in other appended to the path.""" 

1119 return self._make_child(other, encoded=encoded) 

1120 

1121 def human_repr(self): 

1122 """Return decoded human readable string for URL representation.""" 

1123 user = _human_quote(self.user, "#/:?@[]") 

1124 password = _human_quote(self.password, "#/:?@[]") 

1125 host = self.host 

1126 if host: 

1127 host = self._encode_host(self.host, human=True) 

1128 path = _human_quote(self.path, "#?") 

1129 query_string = "&".join( 

1130 "{}={}".format(_human_quote(k, "#&+;="), _human_quote(v, "#&+;=")) 

1131 for k, v in self.query.items() 

1132 ) 

1133 fragment = _human_quote(self.fragment, "") 

1134 return urlunsplit( 

1135 SplitResult( 

1136 self.scheme, 

1137 self._make_netloc( 

1138 user, 

1139 password, 

1140 host, 

1141 self._val.port, 

1142 encode_host=False, 

1143 ), 

1144 path, 

1145 query_string, 

1146 fragment, 

1147 ) 

1148 ) 

1149 

1150 

1151def _human_quote(s, unsafe): 

1152 if not s: 

1153 return s 

1154 for c in "%" + unsafe: 

1155 if c in s: 

1156 s = s.replace(c, f"%{ord(c):02X}") 

1157 if s.isprintable(): 

1158 return s 

1159 return "".join(c if c.isprintable() else quote(c) for c in s) 

1160 

1161 

1162_MAXCACHE = 256 

1163 

1164 

1165@functools.lru_cache(_MAXCACHE) 

1166def _idna_decode(raw): 

1167 try: 

1168 return idna.decode(raw.encode("ascii")) 

1169 except UnicodeError: # e.g. '::1' 

1170 return raw.encode("ascii").decode("idna") 

1171 

1172 

1173@functools.lru_cache(_MAXCACHE) 

1174def _idna_encode(host): 

1175 try: 

1176 return idna.encode(host, uts46=True).decode("ascii") 

1177 except UnicodeError: 

1178 return host.encode("idna").decode("ascii") 

1179 

1180 

1181@rewrite_module 

1182def cache_clear(): 

1183 _idna_decode.cache_clear() 

1184 _idna_encode.cache_clear() 

1185 

1186 

1187@rewrite_module 

1188def cache_info(): 

1189 return { 

1190 "idna_encode": _idna_encode.cache_info(), 

1191 "idna_decode": _idna_decode.cache_info(), 

1192 } 

1193 

1194 

1195@rewrite_module 

1196def cache_configure(*, idna_encode_size=_MAXCACHE, idna_decode_size=_MAXCACHE): 

1197 global _idna_decode, _idna_encode 

1198 

1199 _idna_encode = functools.lru_cache(idna_encode_size)(_idna_encode.__wrapped__) 

1200 _idna_decode = functools.lru_cache(idna_decode_size)(_idna_decode.__wrapped__)