Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/yarl/_url.py: 45%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

801 statements  

1import re 

2import sys 

3import warnings 

4from collections.abc import Mapping, Sequence 

5from enum import Enum 

6from functools import _CacheInfo, lru_cache 

7from importlib.util import find_spec 

8from ipaddress import ip_address 

9from typing import ( 

10 TYPE_CHECKING, 

11 Any, 

12 NoReturn, 

13 TypedDict, 

14 TypeVar, 

15 Union, 

16 cast, 

17 overload, 

18) 

19from urllib.parse import SplitResult, scheme_chars, uses_relative 

20 

21import idna 

22from multidict import MultiDict, MultiDictProxy, istr 

23from propcache.api import under_cached_property as cached_property 

24 

25from ._parse import ( 

26 USES_AUTHORITY, 

27 SplitURLType, 

28 make_netloc, 

29 query_to_pairs, 

30 split_netloc, 

31 split_url, 

32 unsplit_result, 

33) 

34from ._path import normalize_path, normalize_path_segments 

35from ._query import ( 

36 Query, 

37 QueryVariable, 

38 SimpleQuery, 

39 get_str_query, 

40 get_str_query_from_iterable, 

41 get_str_query_from_sequence_iterable, 

42) 

43from ._quoters import ( 

44 FRAGMENT_QUOTER, 

45 FRAGMENT_REQUOTER, 

46 PATH_QUOTER, 

47 PATH_REQUOTER, 

48 PATH_SAFE_UNQUOTER, 

49 PATH_UNQUOTER, 

50 QS_UNQUOTER, 

51 QUERY_QUOTER, 

52 QUERY_REQUOTER, 

53 QUOTER, 

54 REQUOTER, 

55 UNQUOTER, 

56 human_quote, 

57) 

58 

59# Avoid Pydantic import if not used (increases yarl's import time by 3-7x). 

60HAS_PYDANTIC = find_spec("pydantic_core") is not None 

61if TYPE_CHECKING: 

62 from pydantic import GetCoreSchemaHandler, GetJsonSchemaHandler 

63 from pydantic.json_schema import JsonSchemaValue 

64 from pydantic_core import CoreSchema 

65 

66 

67DEFAULT_PORTS = {"http": 80, "https": 443, "ws": 80, "wss": 443, "ftp": 21} 

68USES_RELATIVE = frozenset(uses_relative) 

69_SCHEME_CHARS = frozenset(scheme_chars) 

70 

71# Special schemes https://url.spec.whatwg.org/#special-scheme 

72# are not allowed to have an empty host https://url.spec.whatwg.org/#url-representation 

73SCHEME_REQUIRES_HOST = frozenset(("http", "https", "ws", "wss", "ftp")) 

74 

75 

76# reg-name: unreserved / pct-encoded / sub-delims 

77# this pattern matches anything that is *not* in those classes. and is only used 

78# on lower-cased ASCII values. 

79NOT_REG_NAME = re.compile( 

80 r""" 

81 # any character not in the unreserved or sub-delims sets, plus % 

82 # (validated with the additional check for pct-encoded sequences below) 

83 [^a-z0-9\-._~!$&'()*+,;=%] 

84 | 

85 # % only allowed if it is part of a pct-encoded 

86 # sequence of 2 hex digits. 

87 %(?![0-9a-f]{2}) 

88 """, 

89 re.VERBOSE, 

90) 

91 

92_T = TypeVar("_T") 

93 

94if sys.version_info >= (3, 11): 

95 from typing import Self 

96else: 

97 Self = Any 

98 

99 

100class UndefinedType(Enum): 

101 """Singleton type for use with not set sentinel values.""" 

102 

103 _singleton = 0 

104 

105 

106UNDEFINED = UndefinedType._singleton 

107 

108 

109class CacheInfo(TypedDict): 

110 """Host encoding cache.""" 

111 

112 idna_encode: _CacheInfo 

113 idna_decode: _CacheInfo 

114 ip_address: _CacheInfo 

115 host_validate: _CacheInfo 

116 encode_host: _CacheInfo 

117 

118 

119class _InternalURLCache(TypedDict, total=False): 

120 _val: SplitURLType 

121 _origin: "URL" 

122 absolute: bool 

123 hash: int 

124 scheme: str 

125 raw_authority: str 

126 authority: str 

127 raw_user: str | None 

128 user: str | None 

129 raw_password: str | None 

130 password: str | None 

131 raw_host: str | None 

132 host: str | None 

133 host_subcomponent: str | None 

134 host_port_subcomponent: str | None 

135 port: int | None 

136 explicit_port: int | None 

137 raw_path: str 

138 path: str 

139 _parsed_query: list[tuple[str, str]] 

140 query: "MultiDictProxy[str]" 

141 raw_query_string: str 

142 query_string: str 

143 path_qs: str 

144 raw_path_qs: str 

145 raw_fragment: str 

146 fragment: str 

147 raw_parts: tuple[str, ...] 

148 parts: tuple[str, ...] 

149 parent: "URL" 

150 raw_name: str 

151 name: str 

152 raw_suffix: str 

153 suffix: str 

154 raw_suffixes: tuple[str, ...] 

155 suffixes: tuple[str, ...] 

156 

157 

158def rewrite_module(obj: _T) -> _T: 

159 obj.__module__ = "yarl" 

160 return obj 

161 

162 

163def _encode_relative_scheme_colon(path: str) -> str: 

164 """Re-encode a scheme-shaped leading ``:`` in a relative path to ``%3A``.""" 

165 colon_pos = path.find(":") 

166 if colon_pos <= 0: 

167 return path 

168 for c in path[:colon_pos]: 

169 if c not in _SCHEME_CHARS: 

170 return path 

171 return path[:colon_pos] + "%3A" + path[colon_pos + 1 :] 

172 

173 

174@lru_cache 

175def encode_url(url_str: str) -> "URL": 

176 """Parse unencoded URL.""" 

177 cache: _InternalURLCache = {} 

178 host: str | None 

179 scheme, netloc, path, query, fragment = split_url(url_str) 

180 if not netloc: # netloc 

181 host = "" 

182 else: 

183 if ":" in netloc or "@" in netloc or "[" in netloc: 

184 # Complex netloc 

185 username, password, host, port = split_netloc(netloc) 

186 else: 

187 username = password = port = None 

188 host = netloc 

189 if host is None: 

190 if scheme in SCHEME_REQUIRES_HOST: 

191 msg = ( 

192 "Invalid URL: host is required for " 

193 f"absolute urls with the {scheme} scheme" 

194 ) 

195 raise ValueError(msg) 

196 else: 

197 host = "" 

198 host = _encode_host(host, validate_host=False) 

199 # Remove brackets as host encoder adds back brackets for IPv6 addresses 

200 cache["raw_host"] = host[1:-1] if "[" in host else host 

201 cache["explicit_port"] = port 

202 if password is None and username is None: 

203 # Fast path for URLs without user, password 

204 netloc = host if port is None else f"{host}:{port}" 

205 cache["raw_user"] = None 

206 cache["raw_password"] = None 

207 else: 

208 raw_user = REQUOTER(username) if username else username 

209 raw_password = REQUOTER(password) if password else password 

210 netloc = make_netloc(raw_user, raw_password, host, port) 

211 cache["raw_user"] = raw_user 

212 cache["raw_password"] = raw_password 

213 

214 if path: 

215 path = PATH_REQUOTER(path) 

216 if netloc and "." in path: 

217 path = normalize_path(path) 

218 elif not scheme and not netloc: 

219 path = _encode_relative_scheme_colon(path) 

220 if query: 

221 query = QUERY_REQUOTER(query) 

222 if fragment: 

223 fragment = FRAGMENT_REQUOTER(fragment) 

224 

225 cache["scheme"] = scheme 

226 cache["raw_path"] = "/" if not path and netloc else path 

227 cache["raw_query_string"] = query 

228 cache["raw_fragment"] = fragment 

229 

230 self = object.__new__(URL) 

231 self._scheme = scheme 

232 self._netloc = netloc 

233 self._path = path 

234 self._query = query 

235 self._fragment = fragment 

236 self._cache = cache 

237 return self 

238 

239 

240@lru_cache 

241def pre_encoded_url(url_str: str) -> "URL": 

242 """Parse pre-encoded URL.""" 

243 self = object.__new__(URL) 

244 val = split_url(url_str) 

245 self._scheme, self._netloc, self._path, self._query, self._fragment = val 

246 self._cache = {} 

247 return self 

248 

249 

250@lru_cache 

251def build_pre_encoded_url( 

252 scheme: str, 

253 authority: str, 

254 user: str | None, 

255 password: str | None, 

256 host: str, 

257 port: int | None, 

258 path: str, 

259 query_string: str, 

260 fragment: str, 

261) -> "URL": 

262 """Build a pre-encoded URL from parts.""" 

263 self = object.__new__(URL) 

264 self._scheme = scheme 

265 if authority: 

266 self._netloc = authority 

267 elif host: 

268 if port is not None: 

269 port = None if port == DEFAULT_PORTS.get(scheme) else port 

270 if user is None and password is None: 

271 self._netloc = host if port is None else f"{host}:{port}" 

272 else: 

273 self._netloc = make_netloc(user, password, host, port) 

274 else: 

275 self._netloc = "" 

276 self._path = path 

277 self._query = query_string 

278 self._fragment = fragment 

279 self._cache = {} 

280 return self 

281 

282 

283def from_parts_uncached( 

284 scheme: str, netloc: str, path: str, query: str, fragment: str 

285) -> "URL": 

286 """Create a new URL from parts.""" 

287 self = object.__new__(URL) 

288 self._scheme = scheme 

289 self._netloc = netloc 

290 self._path = path 

291 self._query = query 

292 self._fragment = fragment 

293 self._cache = {} 

294 return self 

295 

296 

297from_parts = lru_cache(from_parts_uncached) 

298 

299 

300@rewrite_module 

301class URL: 

302 # Don't derive from str 

303 # follow pathlib.Path design 

304 # probably URL will not suffer from pathlib problems: 

305 # it's intended for libraries like aiohttp, 

306 # not to be passed into standard library functions like os.open etc. 

307 

308 # URL grammar (RFC 3986) 

309 # pct-encoded = "%" HEXDIG HEXDIG 

310 # reserved = gen-delims / sub-delims 

311 # gen-delims = ":" / "/" / "?" / "#" / "[" / "]" / "@" 

312 # sub-delims = "!" / "$" / "&" / "'" / "(" / ")" 

313 # / "*" / "+" / "," / ";" / "=" 

314 # unreserved = ALPHA / DIGIT / "-" / "." / "_" / "~" 

315 # URI = scheme ":" hier-part [ "?" query ] [ "#" fragment ] 

316 # hier-part = "//" authority path-abempty 

317 # / path-absolute 

318 # / path-rootless 

319 # / path-empty 

320 # scheme = ALPHA *( ALPHA / DIGIT / "+" / "-" / "." ) 

321 # authority = [ userinfo "@" ] host [ ":" port ] 

322 # userinfo = *( unreserved / pct-encoded / sub-delims / ":" ) 

323 # host = IP-literal / IPv4address / reg-name 

324 # IP-literal = "[" ( IPv6address / IPvFuture ) "]" 

325 # IPvFuture = "v" 1*HEXDIG "." 1*( unreserved / sub-delims / ":" ) 

326 # IPv6address = 6( h16 ":" ) ls32 

327 # / "::" 5( h16 ":" ) ls32 

328 # / [ h16 ] "::" 4( h16 ":" ) ls32 

329 # / [ *1( h16 ":" ) h16 ] "::" 3( h16 ":" ) ls32 

330 # / [ *2( h16 ":" ) h16 ] "::" 2( h16 ":" ) ls32 

331 # / [ *3( h16 ":" ) h16 ] "::" h16 ":" ls32 

332 # / [ *4( h16 ":" ) h16 ] "::" ls32 

333 # / [ *5( h16 ":" ) h16 ] "::" h16 

334 # / [ *6( h16 ":" ) h16 ] "::" 

335 # ls32 = ( h16 ":" h16 ) / IPv4address 

336 # ; least-significant 32 bits of address 

337 # h16 = 1*4HEXDIG 

338 # ; 16 bits of address represented in hexadecimal 

339 # IPv4address = dec-octet "." dec-octet "." dec-octet "." dec-octet 

340 # dec-octet = DIGIT ; 0-9 

341 # / %x31-39 DIGIT ; 10-99 

342 # / "1" 2DIGIT ; 100-199 

343 # / "2" %x30-34 DIGIT ; 200-249 

344 # / "25" %x30-35 ; 250-255 

345 # reg-name = *( unreserved / pct-encoded / sub-delims ) 

346 # port = *DIGIT 

347 # path = path-abempty ; begins with "/" or is empty 

348 # / path-absolute ; begins with "/" but not "//" 

349 # / path-noscheme ; begins with a non-colon segment 

350 # / path-rootless ; begins with a segment 

351 # / path-empty ; zero characters 

352 # path-abempty = *( "/" segment ) 

353 # path-absolute = "/" [ segment-nz *( "/" segment ) ] 

354 # path-noscheme = segment-nz-nc *( "/" segment ) 

355 # path-rootless = segment-nz *( "/" segment ) 

356 # path-empty = 0<pchar> 

357 # segment = *pchar 

358 # segment-nz = 1*pchar 

359 # segment-nz-nc = 1*( unreserved / pct-encoded / sub-delims / "@" ) 

360 # ; non-zero-length segment without any colon ":" 

361 # pchar = unreserved / pct-encoded / sub-delims / ":" / "@" 

362 # query = *( pchar / "/" / "?" ) 

363 # fragment = *( pchar / "/" / "?" ) 

364 # URI-reference = URI / relative-ref 

365 # relative-ref = relative-part [ "?" query ] [ "#" fragment ] 

366 # relative-part = "//" authority path-abempty 

367 # / path-absolute 

368 # / path-noscheme 

369 # / path-empty 

370 # absolute-URI = scheme ":" hier-part [ "?" query ] 

371 __slots__ = ("_cache", "_scheme", "_netloc", "_path", "_query", "_fragment") 

372 

373 _cache: _InternalURLCache 

374 _scheme: str 

375 _netloc: str 

376 _path: str 

377 _query: str 

378 _fragment: str 

379 

380 def __new__( 

381 cls, 

382 val: Union[str, SplitResult, "URL", UndefinedType] = UNDEFINED, 

383 *, 

384 encoded: bool = False, 

385 strict: bool | None = None, 

386 ) -> "URL": 

387 if strict is not None: # pragma: no cover 

388 warnings.warn("strict parameter is ignored") 

389 if type(val) is str: 

390 return pre_encoded_url(val) if encoded else encode_url(val) 

391 if type(val) is cls: 

392 return val 

393 if type(val) is SplitResult: 

394 if not encoded: 

395 raise ValueError("Cannot apply decoding to SplitResult") 

396 return from_parts(*val) 

397 if isinstance(val, str): 

398 return pre_encoded_url(str(val)) if encoded else encode_url(str(val)) 

399 if val is UNDEFINED: 

400 # Special case for UNDEFINED since it might be unpickling and we do 

401 # not want to cache as the `__set_state__` call would mutate the URL 

402 # object in the `pre_encoded_url` or `encoded_url` caches. 

403 self = object.__new__(URL) 

404 self._scheme = self._netloc = self._path = self._query = self._fragment = "" 

405 self._cache = {} 

406 return self 

407 raise TypeError("Constructor parameter should be str") 

408 

409 @classmethod 

410 def build( 

411 cls, 

412 *, 

413 scheme: str = "", 

414 authority: str = "", 

415 user: str | None = None, 

416 password: str | None = None, 

417 host: str = "", 

418 port: int | None = None, 

419 path: str = "", 

420 query: Query | None = None, 

421 query_string: str = "", 

422 fragment: str = "", 

423 encoded: bool = False, 

424 ) -> "URL": 

425 """Creates and returns a new URL""" 

426 

427 if authority and (user or password or host or port): 

428 raise ValueError( 

429 'Can\'t mix "authority" with "user", "password", "host" or "port".' 

430 ) 

431 if port is not None and not isinstance(port, int): 

432 raise TypeError(f"The port is required to be int, got {type(port)!r}.") 

433 if port and not host: 

434 raise ValueError('Can\'t build URL with "port" but without "host".') 

435 if query and query_string: 

436 raise ValueError('Only one of "query" or "query_string" should be passed') 

437 if ( 

438 scheme is None # type: ignore[redundant-expr] 

439 or authority is None # type: ignore[redundant-expr] 

440 or host is None # type: ignore[redundant-expr] 

441 or path is None # type: ignore[redundant-expr] 

442 or query_string is None # type: ignore[redundant-expr] 

443 or fragment is None 

444 ): 

445 raise TypeError( 

446 'NoneType is illegal for "scheme", "authority", "host", "path", ' 

447 '"query_string", and "fragment" args, use empty string instead.' 

448 ) 

449 

450 if query: 

451 query_string = get_str_query(query) or "" 

452 

453 if encoded: 

454 return build_pre_encoded_url( 

455 scheme, 

456 authority, 

457 user, 

458 password, 

459 host, 

460 port, 

461 path, 

462 query_string, 

463 fragment, 

464 ) 

465 

466 self = object.__new__(URL) 

467 self._scheme = scheme 

468 _host: str | None = None 

469 if authority: 

470 user, password, _host, port = split_netloc(authority) 

471 _host = _encode_host(_host, validate_host=False) if _host else "" 

472 elif host: 

473 _host = _encode_host(host, validate_host=True) 

474 else: 

475 self._netloc = "" 

476 

477 if _host is not None: 

478 if port is not None: 

479 port = None if port == DEFAULT_PORTS.get(scheme) else port 

480 if user is None and password is None: 

481 self._netloc = _host if port is None else f"{_host}:{port}" 

482 else: 

483 self._netloc = make_netloc(user, password, _host, port, True) 

484 

485 path = PATH_QUOTER(path) if path else path 

486 if path and self._netloc: 

487 if "." in path: 

488 path = normalize_path(path) 

489 if path[0] != "/": 

490 msg = ( 

491 "Path in a URL with authority should " 

492 "start with a slash ('/') if set" 

493 ) 

494 raise ValueError(msg) 

495 

496 self._path = path 

497 if not query and query_string: 

498 query_string = QUERY_QUOTER(query_string) 

499 self._query = query_string 

500 self._fragment = FRAGMENT_QUOTER(fragment) if fragment else fragment 

501 self._cache = {} 

502 return self 

503 

504 def __init_subclass__(cls) -> NoReturn: 

505 raise TypeError(f"Inheriting a class {cls!r} from URL is forbidden") 

506 

507 def __str__(self) -> str: 

508 if not self._path and self._netloc and (self._query or self._fragment): 

509 path = "/" 

510 else: 

511 path = self._path 

512 if (port := self.explicit_port) is not None and port == DEFAULT_PORTS.get( 

513 self._scheme 

514 ): 

515 # port normalization - using None for default ports to remove from rendering 

516 # https://datatracker.ietf.org/doc/html/rfc3986.html#section-6.2.3 

517 host = self.host_subcomponent 

518 netloc = make_netloc(self.raw_user, self.raw_password, host, None) 

519 else: 

520 netloc = self._netloc 

521 return unsplit_result(self._scheme, netloc, path, self._query, self._fragment) 

522 

523 def __repr__(self) -> str: 

524 return f"{self.__class__.__name__}('{str(self)}')" 

525 

526 def __bytes__(self) -> bytes: 

527 return str(self).encode("ascii") 

528 

529 def __eq__(self, other: object) -> bool: 

530 if type(other) is not URL: 

531 return NotImplemented 

532 

533 path1 = "/" if not self._path and self._netloc else self._path 

534 path2 = "/" if not other._path and other._netloc else other._path 

535 return ( 

536 self._scheme == other._scheme 

537 and self._netloc == other._netloc 

538 and path1 == path2 

539 and self._query == other._query 

540 and self._fragment == other._fragment 

541 ) 

542 

543 def __hash__(self) -> int: 

544 if (ret := self._cache.get("hash")) is None: 

545 path = "/" if not self._path and self._netloc else self._path 

546 ret = self._cache["hash"] = hash( 

547 (self._scheme, self._netloc, path, self._query, self._fragment) 

548 ) 

549 return ret 

550 

551 def __le__(self, other: object) -> bool: 

552 if type(other) is not URL: 

553 return NotImplemented 

554 return self._val <= other._val 

555 

556 def __lt__(self, other: object) -> bool: 

557 if type(other) is not URL: 

558 return NotImplemented 

559 return self._val < other._val 

560 

561 def __ge__(self, other: object) -> bool: 

562 if type(other) is not URL: 

563 return NotImplemented 

564 return self._val >= other._val 

565 

566 def __gt__(self, other: object) -> bool: 

567 if type(other) is not URL: 

568 return NotImplemented 

569 return self._val > other._val 

570 

571 def __truediv__(self, name: str) -> "URL": 

572 if not isinstance(name, str): 

573 return NotImplemented 

574 return self._make_child((str(name),)) 

575 

576 def __mod__(self, query: Query) -> "URL": 

577 return self.update_query(query) 

578 

579 def __bool__(self) -> bool: 

580 return bool(self._netloc or self._path or self._query or self._fragment) 

581 

582 def __getstate__(self) -> tuple[SplitURLType]: 

583 # Return a plain tuple rather than a ``SplitResult``. Constructing a 

584 # ``SplitResult`` via ``tuple.__new__`` skips its ``__init__`` and on 

585 # Python 3.15+ leaves ``_keep_empty`` unset, which breaks pickling: the 

586 # new ``SplitResult.__getstate__`` indexes a state that ends up as 

587 # ``None`` (gh-1632). ``__setstate__`` already unpacks both shapes, so 

588 # pickles produced by older yarl releases (which embed a real 

589 # ``SplitResult``) still load correctly. 

590 return (self._val,) 

591 

592 def __setstate__( 

593 self, state: tuple[SplitURLType] | tuple[None, _InternalURLCache] 

594 ) -> None: 

595 if state[0] is None and isinstance(state[1], dict): 

596 # default style pickle 

597 val = state[1]["_val"] 

598 else: 

599 unused: list[object] 

600 val, *unused = state 

601 self._scheme, self._netloc, self._path, self._query, self._fragment = val 

602 self._cache = {} 

603 

604 def _cache_netloc(self) -> None: 

605 """Cache the netloc parts of the URL.""" 

606 c = self._cache 

607 split_loc = split_netloc(self._netloc) 

608 c["raw_user"], c["raw_password"], c["raw_host"], c["explicit_port"] = split_loc 

609 

610 def is_absolute(self) -> bool: 

611 """A check for absolute URLs. 

612 

613 Return True for absolute ones (having scheme or starting 

614 with //), False otherwise. 

615 

616 Is is preferred to call the .absolute property instead 

617 as it is cached. 

618 """ 

619 return self.absolute 

620 

621 def is_default_port(self) -> bool: 

622 """A check for default port. 

623 

624 Return True if port is default for specified scheme, 

625 e.g. 'http://python.org' or 'http://python.org:80', False 

626 otherwise. 

627 

628 Return False for relative URLs. 

629 

630 """ 

631 if (explicit := self.explicit_port) is None: 

632 # If the explicit port is None, then the URL must be 

633 # using the default port unless its a relative URL 

634 # which does not have an implicit port / default port 

635 return self._netloc != "" 

636 return explicit == DEFAULT_PORTS.get(self._scheme) 

637 

638 def origin(self) -> "URL": 

639 """Return an URL with scheme, host and port parts only. 

640 

641 user, password, path, query and fragment are removed. 

642 

643 """ 

644 # TODO: add a keyword-only option for keeping user/pass maybe? 

645 return self._origin 

646 

647 @cached_property 

648 def _val(self) -> SplitURLType: 

649 return (self._scheme, self._netloc, self._path, self._query, self._fragment) 

650 

651 @cached_property 

652 def _origin(self) -> "URL": 

653 """Return an URL with scheme, host and port parts only. 

654 

655 user, password, path, query and fragment are removed. 

656 """ 

657 if not (netloc := self._netloc): 

658 raise ValueError("URL should be absolute") 

659 if not (scheme := self._scheme): 

660 raise ValueError("URL should have scheme") 

661 if "@" in netloc: 

662 encoded_host = self.host_subcomponent 

663 netloc = make_netloc(None, None, encoded_host, self.explicit_port) 

664 elif not self._path and not self._query and not self._fragment: 

665 return self 

666 return from_parts(scheme, netloc, "", "", "") 

667 

668 def relative(self) -> "URL": 

669 """Return a relative part of the URL. 

670 

671 scheme, user, password, host and port are removed. 

672 

673 """ 

674 if not self._netloc: 

675 raise ValueError("URL should be absolute") 

676 return from_parts("", "", self._path, self._query, self._fragment) 

677 

678 @cached_property 

679 def absolute(self) -> bool: 

680 """A check for absolute URLs. 

681 

682 Return True for absolute ones (having scheme or starting 

683 with //), False otherwise. 

684 

685 """ 

686 # `netloc`` is an empty string for relative URLs 

687 # Checking `netloc` is faster than checking `hostname` 

688 # because `hostname` is a property that does some extra work 

689 # to parse the host from the `netloc` 

690 return self._netloc != "" 

691 

692 @cached_property 

693 def scheme(self) -> str: 

694 """Scheme for absolute URLs. 

695 

696 Empty string for relative URLs or URLs starting with // 

697 

698 """ 

699 return self._scheme 

700 

701 @cached_property 

702 def raw_authority(self) -> str: 

703 """Encoded authority part of URL. 

704 

705 Empty string for relative URLs. 

706 

707 """ 

708 return self._netloc 

709 

710 @cached_property 

711 def authority(self) -> str: 

712 """Decoded authority part of URL. 

713 

714 Empty string for relative URLs. 

715 

716 """ 

717 return make_netloc(self.user, self.password, self.host, self.port) 

718 

719 @cached_property 

720 def raw_user(self) -> str | None: 

721 """Encoded user part of URL. 

722 

723 None if user is missing. 

724 

725 """ 

726 # not .username 

727 self._cache_netloc() 

728 return self._cache["raw_user"] 

729 

730 @cached_property 

731 def user(self) -> str | None: 

732 """Decoded user part of URL. 

733 

734 None if user is missing. 

735 

736 """ 

737 if (raw_user := self.raw_user) is None: 

738 return None 

739 return UNQUOTER(raw_user) 

740 

741 @cached_property 

742 def raw_password(self) -> str | None: 

743 """Encoded password part of URL. 

744 

745 None if password is missing. 

746 

747 """ 

748 self._cache_netloc() 

749 return self._cache["raw_password"] 

750 

751 @cached_property 

752 def password(self) -> str | None: 

753 """Decoded password part of URL. 

754 

755 None if password is missing. 

756 

757 """ 

758 if (raw_password := self.raw_password) is None: 

759 return None 

760 return UNQUOTER(raw_password) 

761 

762 @cached_property 

763 def raw_host(self) -> str | None: 

764 """Encoded host part of URL. 

765 

766 None for relative URLs. 

767 

768 When working with IPv6 addresses, use the `host_subcomponent` property instead 

769 as it will return the host subcomponent with brackets. 

770 """ 

771 # Use host instead of hostname for sake of shortness 

772 # May add .hostname prop later 

773 self._cache_netloc() 

774 return self._cache["raw_host"] 

775 

776 @cached_property 

777 def host(self) -> str | None: 

778 """Decoded host part of URL. 

779 

780 None for relative URLs. 

781 

782 """ 

783 if (raw := self.raw_host) is None: 

784 return None 

785 if raw and raw[-1].isdigit() or ":" in raw: 

786 # IP addresses are never IDNA encoded 

787 return raw 

788 return _idna_decode(raw) 

789 

790 @cached_property 

791 def host_subcomponent(self) -> str | None: 

792 """Return the host subcomponent part of URL. 

793 

794 None for relative URLs. 

795 

796 https://datatracker.ietf.org/doc/html/rfc3986#section-3.2.2 

797 

798 `IP-literal = "[" ( IPv6address / IPvFuture ) "]"` 

799 

800 Examples: 

801 - `http://example.com:8080` -> `example.com` 

802 - `http://example.com:80` -> `example.com` 

803 - `https://127.0.0.1:8443` -> `127.0.0.1` 

804 - `https://[::1]:8443` -> `[::1]` 

805 - `http://[::1]` -> `[::1]` 

806 

807 """ 

808 if (raw := self.raw_host) is None: 

809 return None 

810 return f"[{raw}]" if ":" in raw else raw 

811 

812 @cached_property 

813 def host_port_subcomponent(self) -> str | None: 

814 """Return the host and port subcomponent part of URL. 

815 

816 Trailing dots are removed from the host part. 

817 

818 This value is suitable for use in the Host header of an HTTP request. 

819 

820 None for relative URLs. 

821 

822 https://datatracker.ietf.org/doc/html/rfc3986#section-3.2.2 

823 `IP-literal = "[" ( IPv6address / IPvFuture ) "]"` 

824 https://datatracker.ietf.org/doc/html/rfc3986#section-3.2.3 

825 port = *DIGIT 

826 

827 Examples: 

828 - `http://example.com:8080` -> `example.com:8080` 

829 - `http://example.com:80` -> `example.com` 

830 - `http://example.com.:80` -> `example.com` 

831 - `https://127.0.0.1:8443` -> `127.0.0.1:8443` 

832 - `https://[::1]:8443` -> `[::1]:8443` 

833 - `http://[::1]` -> `[::1]` 

834 

835 """ 

836 if (raw := self.raw_host) is None: 

837 return None 

838 if raw[-1] == ".": 

839 # Remove all trailing dots from the netloc as while 

840 # they are valid FQDNs in DNS, TLS validation fails. 

841 # See https://github.com/aio-libs/aiohttp/issues/3636. 

842 # To avoid string manipulation we only call rstrip if 

843 # the last character is a dot. 

844 raw = raw.rstrip(".") 

845 port = self.explicit_port 

846 if port is None or port == DEFAULT_PORTS.get(self._scheme): 

847 return f"[{raw}]" if ":" in raw else raw 

848 return f"[{raw}]:{port}" if ":" in raw else f"{raw}:{port}" 

849 

850 @cached_property 

851 def port(self) -> int | None: 

852 """Port part of URL, with scheme-based fallback. 

853 

854 None for relative URLs or URLs without explicit port and 

855 scheme without default port substitution. 

856 

857 """ 

858 if (explicit_port := self.explicit_port) is not None: 

859 return explicit_port 

860 return DEFAULT_PORTS.get(self._scheme) 

861 

862 @cached_property 

863 def explicit_port(self) -> int | None: 

864 """Port part of URL, without scheme-based fallback. 

865 

866 None for relative URLs or URLs without explicit port. 

867 

868 """ 

869 self._cache_netloc() 

870 return self._cache["explicit_port"] 

871 

872 @cached_property 

873 def raw_path(self) -> str: 

874 """Encoded path of URL. 

875 

876 / for absolute URLs without path part. 

877 

878 """ 

879 return self._path if self._path or not self._netloc else "/" 

880 

881 @cached_property 

882 def path(self) -> str: 

883 """Decoded path of URL. 

884 

885 / for absolute URLs without path part. 

886 

887 """ 

888 return PATH_UNQUOTER(self._path) if self._path else "/" if self._netloc else "" 

889 

890 @cached_property 

891 def path_safe(self) -> str: 

892 """Decoded path of URL. 

893 

894 / for absolute URLs without path part. 

895 

896 / (%2F) and % (%25) are not decoded 

897 

898 """ 

899 if self._path: 

900 return PATH_SAFE_UNQUOTER(self._path) 

901 return "/" if self._netloc else "" 

902 

903 @cached_property 

904 def _parsed_query(self) -> list[tuple[str, str]]: 

905 """Parse query part of URL.""" 

906 return query_to_pairs(self._query) 

907 

908 @cached_property 

909 def query(self) -> "MultiDictProxy[str]": 

910 """A MultiDictProxy representing parsed query parameters in decoded 

911 representation. 

912 

913 Empty value if URL has no query part. 

914 

915 """ 

916 return MultiDictProxy(MultiDict(self._parsed_query)) 

917 

918 @cached_property 

919 def raw_query_string(self) -> str: 

920 """Encoded query part of URL. 

921 

922 Empty string if query is missing. 

923 

924 """ 

925 return self._query 

926 

927 @cached_property 

928 def query_string(self) -> str: 

929 """Decoded query part of URL. 

930 

931 Empty string if query is missing. 

932 

933 """ 

934 return QS_UNQUOTER(self._query) if self._query else "" 

935 

936 @cached_property 

937 def path_qs(self) -> str: 

938 """Decoded path of URL with query.""" 

939 return self.path if not (q := self.query_string) else f"{self.path}?{q}" 

940 

941 @cached_property 

942 def raw_path_qs(self) -> str: 

943 """Encoded path of URL with query.""" 

944 if q := self._query: 

945 return f"{self._path}?{q}" if self._path or not self._netloc else f"/?{q}" 

946 return self._path if self._path or not self._netloc else "/" 

947 

948 @cached_property 

949 def raw_fragment(self) -> str: 

950 """Encoded fragment part of URL. 

951 

952 Empty string if fragment is missing. 

953 

954 """ 

955 return self._fragment 

956 

957 @cached_property 

958 def fragment(self) -> str: 

959 """Decoded fragment part of URL. 

960 

961 Empty string if fragment is missing. 

962 

963 """ 

964 return UNQUOTER(self._fragment) if self._fragment else "" 

965 

966 @cached_property 

967 def raw_parts(self) -> tuple[str, ...]: 

968 """A tuple containing encoded *path* parts. 

969 

970 ('/',) for absolute URLs if *path* is missing. 

971 

972 """ 

973 path = self._path 

974 if self._netloc: 

975 return ("/", *path[1:].split("/")) if path else ("/",) 

976 if path and path[0] == "/": 

977 return ("/", *path[1:].split("/")) 

978 return tuple(path.split("/")) 

979 

980 @cached_property 

981 def parts(self) -> tuple[str, ...]: 

982 """A tuple containing decoded *path* parts. 

983 

984 ('/',) for absolute URLs if *path* is missing. 

985 

986 """ 

987 return tuple(UNQUOTER(part) for part in self.raw_parts) 

988 

989 @cached_property 

990 def parent(self) -> "URL": 

991 """A new URL with last part of path removed and cleaned up query and 

992 fragment. 

993 

994 """ 

995 path = self._path 

996 if not path or path == "/": 

997 if self._fragment or self._query: 

998 return from_parts(self._scheme, self._netloc, path, "", "") 

999 return self 

1000 parts = path.split("/") 

1001 return from_parts(self._scheme, self._netloc, "/".join(parts[:-1]), "", "") 

1002 

1003 @cached_property 

1004 def raw_name(self) -> str: 

1005 """The last part of raw_parts.""" 

1006 parts = self.raw_parts 

1007 if not self._netloc: 

1008 return parts[-1] 

1009 parts = parts[1:] 

1010 return parts[-1] if parts else "" 

1011 

1012 @cached_property 

1013 def name(self) -> str: 

1014 """The last part of parts.""" 

1015 return UNQUOTER(self.raw_name) 

1016 

1017 @cached_property 

1018 def raw_suffix(self) -> str: 

1019 name = self.raw_name 

1020 i = name.rfind(".") 

1021 return name[i:] if 0 < i < len(name) - 1 else "" 

1022 

1023 @cached_property 

1024 def suffix(self) -> str: 

1025 return UNQUOTER(self.raw_suffix) 

1026 

1027 @cached_property 

1028 def raw_suffixes(self) -> tuple[str, ...]: 

1029 name = self.raw_name 

1030 if name.endswith("."): 

1031 return () 

1032 name = name.lstrip(".") 

1033 return tuple("." + suffix for suffix in name.split(".")[1:]) 

1034 

1035 @cached_property 

1036 def suffixes(self) -> tuple[str, ...]: 

1037 return tuple(UNQUOTER(suffix) for suffix in self.raw_suffixes) 

1038 

1039 def _make_child(self, paths: "Sequence[str]", encoded: bool = False) -> "URL": 

1040 """ 

1041 add paths to self._path, accounting for absolute vs relative paths, 

1042 keep existing, but do not create new, empty segments 

1043 """ 

1044 parsed: list[str] = [] 

1045 needs_normalize: bool = False 

1046 for idx, path in enumerate(reversed(paths)): 

1047 # empty segment of last is not removed 

1048 last = idx == 0 

1049 if path and path[0] == "/": 

1050 raise ValueError( 

1051 f"Appending path {path!r} starting from slash is forbidden" 

1052 ) 

1053 # We need to quote the path if it is not already encoded 

1054 # This cannot be done at the end because the existing 

1055 # path is already quoted and we do not want to double quote 

1056 # the existing path. 

1057 path = path if encoded else PATH_QUOTER(path) 

1058 needs_normalize |= "." in path 

1059 segments = path.split("/") 

1060 segments.reverse() 

1061 # remove trailing empty segment for all but the last path 

1062 parsed += segments[1:] if not last and segments[0] == "" else segments 

1063 

1064 if (path := self._path) and (old_segments := path.split("/")): 

1065 # If the old path ends with a slash, the last segment is an empty string 

1066 # and should be removed before adding the new path segments. 

1067 old = old_segments[:-1] if old_segments[-1] == "" else old_segments 

1068 old.reverse() 

1069 parsed += old 

1070 

1071 # If the netloc is present, inject a leading slash when adding a 

1072 # path to an absolute URL where there was none before. 

1073 if (netloc := self._netloc) and parsed and parsed[-1] != "": 

1074 parsed.append("") 

1075 

1076 parsed.reverse() 

1077 if not netloc or not needs_normalize: 

1078 return from_parts(self._scheme, netloc, "/".join(parsed), "", "") 

1079 

1080 path = "/".join(normalize_path_segments(parsed)) 

1081 # If normalizing the path segments removed the leading slash, add it back. 

1082 if path and path[0] != "/": 

1083 path = f"/{path}" 

1084 return from_parts(self._scheme, netloc, path, "", "") 

1085 

1086 def with_scheme(self, scheme: str) -> "URL": 

1087 """Return a new URL with scheme replaced.""" 

1088 # N.B. doesn't cleanup query/fragment 

1089 if not isinstance(scheme, str): 

1090 raise TypeError("Invalid scheme type") 

1091 lower_scheme = scheme.lower() 

1092 netloc = self._netloc 

1093 if not netloc and lower_scheme in SCHEME_REQUIRES_HOST: 

1094 msg = ( 

1095 "scheme replacement is not allowed for " 

1096 f"relative URLs for the {lower_scheme} scheme" 

1097 ) 

1098 raise ValueError(msg) 

1099 return from_parts(lower_scheme, netloc, self._path, self._query, self._fragment) 

1100 

1101 def with_user(self, user: str | None) -> "URL": 

1102 """Return a new URL with user replaced. 

1103 

1104 Autoencode user if needed. 

1105 

1106 Clear user/password if user is None. 

1107 

1108 """ 

1109 # N.B. doesn't cleanup query/fragment 

1110 if user is None: 

1111 password = None 

1112 elif isinstance(user, str): 

1113 user = QUOTER(user) 

1114 password = self.raw_password 

1115 else: 

1116 raise TypeError("Invalid user type") 

1117 if not (netloc := self._netloc): 

1118 raise ValueError("user replacement is not allowed for relative URLs") 

1119 encoded_host = self.host_subcomponent or "" 

1120 netloc = make_netloc(user, password, encoded_host, self.explicit_port) 

1121 return from_parts(self._scheme, netloc, self._path, self._query, self._fragment) 

1122 

1123 def with_password(self, password: str | None) -> "URL": 

1124 """Return a new URL with password replaced. 

1125 

1126 Autoencode password if needed. 

1127 

1128 Clear password if argument is None. 

1129 

1130 """ 

1131 # N.B. doesn't cleanup query/fragment 

1132 if password is None: 

1133 pass 

1134 elif isinstance(password, str): 

1135 password = QUOTER(password) 

1136 else: 

1137 raise TypeError("Invalid password type") 

1138 if not (netloc := self._netloc): 

1139 raise ValueError("password replacement is not allowed for relative URLs") 

1140 encoded_host = self.host_subcomponent or "" 

1141 port = self.explicit_port 

1142 netloc = make_netloc(self.raw_user, password, encoded_host, port) 

1143 return from_parts(self._scheme, netloc, self._path, self._query, self._fragment) 

1144 

1145 def with_host(self, host: str) -> "URL": 

1146 """Return a new URL with host replaced. 

1147 

1148 Autoencode host if needed. 

1149 

1150 Changing host for relative URLs is not allowed, use .join() 

1151 instead. 

1152 

1153 """ 

1154 # N.B. doesn't cleanup query/fragment 

1155 if not isinstance(host, str): 

1156 raise TypeError("Invalid host type") 

1157 if not (netloc := self._netloc): 

1158 raise ValueError("host replacement is not allowed for relative URLs") 

1159 if not host: 

1160 raise ValueError("host removing is not allowed") 

1161 encoded_host = _encode_host(host, validate_host=True) if host else "" 

1162 port = self.explicit_port 

1163 netloc = make_netloc(self.raw_user, self.raw_password, encoded_host, port) 

1164 return from_parts(self._scheme, netloc, self._path, self._query, self._fragment) 

1165 

1166 def with_port(self, port: int | None) -> "URL": 

1167 """Return a new URL with port replaced. 

1168 

1169 Clear port to default if None is passed. 

1170 

1171 """ 

1172 # N.B. doesn't cleanup query/fragment 

1173 if port is not None: 

1174 if isinstance(port, bool) or not isinstance(port, int): 

1175 raise TypeError(f"port should be int or None, got {type(port)}") 

1176 if not (0 <= port <= 65535): 

1177 raise ValueError(f"port must be between 0 and 65535, got {port}") 

1178 if not (netloc := self._netloc): 

1179 raise ValueError("port replacement is not allowed for relative URLs") 

1180 encoded_host = self.host_subcomponent or "" 

1181 netloc = make_netloc(self.raw_user, self.raw_password, encoded_host, port) 

1182 return from_parts(self._scheme, netloc, self._path, self._query, self._fragment) 

1183 

1184 def with_path( 

1185 self, 

1186 path: str, 

1187 *, 

1188 encoded: bool = False, 

1189 keep_query: bool = False, 

1190 keep_fragment: bool = False, 

1191 ) -> "URL": 

1192 """Return a new URL with path replaced.""" 

1193 netloc = self._netloc 

1194 if not encoded: 

1195 path = PATH_QUOTER(path) 

1196 if netloc: 

1197 path = normalize_path(path) if "." in path else path 

1198 if path and path[0] != "/": 

1199 path = f"/{path}" 

1200 query = self._query if keep_query else "" 

1201 fragment = self._fragment if keep_fragment else "" 

1202 return from_parts(self._scheme, netloc, path, query, fragment) 

1203 

1204 @overload 

1205 def with_query(self, query: Query) -> "URL": ... 

1206 

1207 @overload 

1208 def with_query(self, **kwargs: QueryVariable) -> "URL": ... 

1209 

1210 def with_query(self, *args: Any, **kwargs: Any) -> "URL": 

1211 """Return a new URL with query part replaced. 

1212 

1213 Accepts any Mapping (e.g. dict, multidict.MultiDict instances) 

1214 or str, autoencode the argument if needed. 

1215 

1216 A sequence of (key, value) pairs is supported as well. 

1217 

1218 It also can take an arbitrary number of keyword arguments. 

1219 

1220 Clear query if None is passed. 

1221 

1222 """ 

1223 # N.B. doesn't cleanup query/fragment 

1224 query = get_str_query(*args, **kwargs) or "" 

1225 return from_parts_uncached( 

1226 self._scheme, self._netloc, self._path, query, self._fragment 

1227 ) 

1228 

1229 @overload 

1230 def extend_query(self, query: Query) -> "URL": ... 

1231 

1232 @overload 

1233 def extend_query(self, **kwargs: QueryVariable) -> "URL": ... 

1234 

1235 def extend_query(self, *args: Any, **kwargs: Any) -> "URL": 

1236 """Return a new URL with query part combined with the existing. 

1237 

1238 This method will not remove existing query parameters. 

1239 

1240 Example: 

1241 >>> url = URL('http://example.com/?a=1&b=2') 

1242 >>> url.extend_query(a=3, c=4) 

1243 URL('http://example.com/?a=1&b=2&a=3&c=4') 

1244 """ 

1245 if not (new_query := get_str_query(*args, **kwargs)): 

1246 return self 

1247 if query := self._query: 

1248 # both strings are already encoded so we can use a simple 

1249 # string join 

1250 query += new_query if query[-1] == "&" else f"&{new_query}" 

1251 else: 

1252 query = new_query 

1253 return from_parts_uncached( 

1254 self._scheme, self._netloc, self._path, query, self._fragment 

1255 ) 

1256 

1257 @overload 

1258 def update_query(self, query: Query) -> "URL": ... 

1259 

1260 @overload 

1261 def update_query(self, **kwargs: QueryVariable) -> "URL": ... 

1262 

1263 def update_query(self, *args: Any, **kwargs: Any) -> "URL": 

1264 """Return a new URL with query part updated. 

1265 

1266 This method will overwrite existing query parameters. 

1267 

1268 Example: 

1269 >>> url = URL('http://example.com/?a=1&b=2') 

1270 >>> url.update_query(a=3, c=4) 

1271 URL('http://example.com/?a=3&b=2&c=4') 

1272 """ 

1273 in_query: ( 

1274 str 

1275 | Mapping[str, QueryVariable] 

1276 | Sequence[tuple[str | istr, SimpleQuery]] 

1277 | None 

1278 ) 

1279 if kwargs: 

1280 if args: 

1281 msg = "Either kwargs or single query parameter must be present" 

1282 raise ValueError(msg) 

1283 in_query = kwargs 

1284 elif len(args) == 1: 

1285 in_query = args[0] 

1286 else: 

1287 raise ValueError("Either kwargs or single query parameter must be present") 

1288 

1289 if in_query is None: 

1290 query = "" 

1291 elif not in_query: 

1292 query = self._query 

1293 elif isinstance(in_query, Mapping): 

1294 qm: MultiDict[QueryVariable] = MultiDict(self._parsed_query) 

1295 qm.update(in_query) 

1296 query = get_str_query_from_sequence_iterable(qm.items()) 

1297 elif isinstance(in_query, str): 

1298 qstr: MultiDict[str] = MultiDict(self._parsed_query) 

1299 qstr.update(query_to_pairs(in_query)) 

1300 query = get_str_query_from_iterable(qstr.items()) 

1301 elif isinstance(in_query, (bytes, bytearray, memoryview)): 

1302 msg = "Invalid query type: bytes, bytearray and memoryview are forbidden" 

1303 raise TypeError(msg) 

1304 elif isinstance(in_query, Sequence): 

1305 # We don't expect sequence values if we're given a list of pairs 

1306 # already; only mappings like builtin `dict` which can't have the 

1307 # same key pointing to multiple values are allowed to use 

1308 # `_query_seq_pairs`. 

1309 if TYPE_CHECKING: 

1310 in_query = cast( 

1311 Sequence[tuple[Union[str, istr], SimpleQuery]], in_query 

1312 ) 

1313 qs: MultiDict[SimpleQuery] = MultiDict(self._parsed_query) 

1314 qs.update(in_query) 

1315 query = get_str_query_from_iterable(qs.items()) 

1316 else: 

1317 raise TypeError( 

1318 "Invalid query type: only str, mapping or " 

1319 "sequence of (key, value) pairs is allowed" 

1320 ) 

1321 return from_parts_uncached( 

1322 self._scheme, self._netloc, self._path, query, self._fragment 

1323 ) 

1324 

1325 def without_query_params(self, *query_params: str) -> "URL": 

1326 """Remove some keys from query part and return new URL.""" 

1327 params_to_remove = set(query_params) & self.query.keys() 

1328 if not params_to_remove: 

1329 return self 

1330 return self.with_query( 

1331 tuple( 

1332 (name, value) 

1333 for name, value in self.query.items() 

1334 if name not in params_to_remove 

1335 ) 

1336 ) 

1337 

1338 def with_fragment(self, fragment: str | None) -> "URL": 

1339 """Return a new URL with fragment replaced. 

1340 

1341 Autoencode fragment if needed. 

1342 

1343 Clear fragment to default if None is passed. 

1344 

1345 """ 

1346 # N.B. doesn't cleanup query/fragment 

1347 if fragment is None: 

1348 raw_fragment = "" 

1349 elif not isinstance(fragment, str): 

1350 raise TypeError("Invalid fragment type") 

1351 else: 

1352 raw_fragment = FRAGMENT_QUOTER(fragment) 

1353 if self._fragment == raw_fragment: 

1354 return self 

1355 return from_parts( 

1356 self._scheme, self._netloc, self._path, self._query, raw_fragment 

1357 ) 

1358 

1359 def with_name( 

1360 self, 

1361 name: str, 

1362 *, 

1363 keep_query: bool = False, 

1364 keep_fragment: bool = False, 

1365 ) -> "URL": 

1366 """Return a new URL with name (last part of path) replaced. 

1367 

1368 Query and fragment parts are cleaned up. 

1369 

1370 Name is encoded if needed. 

1371 

1372 """ 

1373 # N.B. DOES cleanup query/fragment 

1374 if not isinstance(name, str): 

1375 raise TypeError("Invalid name type") 

1376 if "/" in name: 

1377 raise ValueError("Slash in name is not allowed") 

1378 name = PATH_QUOTER(name) 

1379 if name in (".", ".."): 

1380 raise ValueError(". and .. values are forbidden") 

1381 parts = list(self.raw_parts) 

1382 if netloc := self._netloc: 

1383 if len(parts) == 1: 

1384 parts.append(name) 

1385 else: 

1386 parts[-1] = name 

1387 parts[0] = "" # replace leading '/' 

1388 else: 

1389 parts[-1] = name 

1390 if parts[0] == "/": 

1391 parts[0] = "" # replace leading '/' 

1392 

1393 query = self._query if keep_query else "" 

1394 fragment = self._fragment if keep_fragment else "" 

1395 return from_parts(self._scheme, netloc, "/".join(parts), query, fragment) 

1396 

1397 def with_suffix( 

1398 self, 

1399 suffix: str, 

1400 *, 

1401 keep_query: bool = False, 

1402 keep_fragment: bool = False, 

1403 ) -> "URL": 

1404 """Return a new URL with suffix (file extension of name) replaced. 

1405 

1406 Query and fragment parts are cleaned up. 

1407 

1408 suffix is encoded if needed. 

1409 """ 

1410 if not isinstance(suffix, str): 

1411 raise TypeError("Invalid suffix type") 

1412 if suffix and not suffix[0] == "." or suffix == "." or "/" in suffix: 

1413 raise ValueError(f"Invalid suffix {suffix!r}") 

1414 name = self.raw_name 

1415 if not name: 

1416 raise ValueError(f"{self!r} has an empty name") 

1417 old_suffix = self.raw_suffix 

1418 suffix = PATH_QUOTER(suffix) 

1419 name = name + suffix if not old_suffix else name[: -len(old_suffix)] + suffix 

1420 if name in (".", ".."): 

1421 raise ValueError(". and .. values are forbidden") 

1422 parts = list(self.raw_parts) 

1423 if netloc := self._netloc: 

1424 if len(parts) == 1: 

1425 parts.append(name) 

1426 else: 

1427 parts[-1] = name 

1428 parts[0] = "" # replace leading '/' 

1429 else: 

1430 parts[-1] = name 

1431 if parts[0] == "/": 

1432 parts[0] = "" # replace leading '/' 

1433 

1434 query = self._query if keep_query else "" 

1435 fragment = self._fragment if keep_fragment else "" 

1436 return from_parts(self._scheme, netloc, "/".join(parts), query, fragment) 

1437 

1438 def join(self, url: "URL") -> "URL": 

1439 """Join URLs 

1440 

1441 Construct a full (“absolute”) URL by combining a “base URL” 

1442 (self) with another URL (url). 

1443 

1444 Informally, this uses components of the base URL, in 

1445 particular the addressing scheme, the network location and 

1446 (part of) the path, to provide missing components in the 

1447 relative URL. 

1448 

1449 """ 

1450 if type(url) is not URL: 

1451 raise TypeError("url should be URL") 

1452 

1453 scheme = url._scheme or self._scheme 

1454 if scheme != self._scheme or scheme not in USES_RELATIVE: 

1455 return url 

1456 

1457 # scheme is in uses_authority as uses_authority is a superset of uses_relative 

1458 if (join_netloc := url._netloc) and scheme in USES_AUTHORITY: 

1459 return from_parts(scheme, join_netloc, url._path, url._query, url._fragment) 

1460 

1461 orig_path = self._path 

1462 if join_path := url._path: 

1463 if join_path[0] == "/": 

1464 path = join_path 

1465 elif not orig_path: 

1466 path = f"/{join_path}" 

1467 elif orig_path[-1] == "/": 

1468 path = f"{orig_path}{join_path}" 

1469 else: 

1470 # … 

1471 # and relativizing ".." 

1472 # parts[0] is / for absolute urls, 

1473 # this join will add a double slash there 

1474 path = "/".join([*self.parts[:-1], ""]) + join_path 

1475 # which has to be removed 

1476 if orig_path[0] == "/": 

1477 path = path[1:] 

1478 path = normalize_path(path) if "." in path else path 

1479 else: 

1480 path = orig_path 

1481 

1482 return from_parts( 

1483 scheme, 

1484 self._netloc, 

1485 path, 

1486 url._query if join_path or url._query else self._query, 

1487 url._fragment if join_path or url._fragment else self._fragment, 

1488 ) 

1489 

1490 def joinpath(self, *other: str, encoded: bool = False) -> "URL": 

1491 """Return a new URL with the elements in other appended to the path.""" 

1492 return self._make_child(other, encoded=encoded) 

1493 

1494 def human_repr(self) -> str: 

1495 """Return decoded human readable string for URL representation.""" 

1496 user = human_quote(self.user, "#/:?@[]\\") 

1497 password = human_quote(self.password, "#/:?@[]\\") 

1498 if (host := self.host) and ":" in host: 

1499 host = f"[{host}]" 

1500 path = human_quote(self.path, "#?") 

1501 if TYPE_CHECKING: 

1502 assert path is not None 

1503 if not self._scheme and not self._netloc: 

1504 path = _encode_relative_scheme_colon(path) 

1505 query_string = "&".join( 

1506 "{}={}".format(human_quote(k, "#&+;="), human_quote(v, "#&+;=")) 

1507 for k, v in self.query.items() 

1508 ) 

1509 fragment = human_quote(self.fragment, "") 

1510 if TYPE_CHECKING: 

1511 assert fragment is not None 

1512 netloc = make_netloc(user, password, host, self.explicit_port) 

1513 return unsplit_result(self._scheme, netloc, path, query_string, fragment) 

1514 

1515 if HAS_PYDANTIC: 

1516 # Borrowed from https://docs.pydantic.dev/latest/concepts/types/#handling-third-party-types 

1517 @classmethod 

1518 def __get_pydantic_json_schema__( 

1519 cls, 

1520 core_schema: "CoreSchema", 

1521 handler: "GetJsonSchemaHandler", 

1522 ) -> "JsonSchemaValue": 

1523 field_schema: dict[str, Any] = {} 

1524 field_schema.update(type="string", format="uri") 

1525 return field_schema 

1526 

1527 @classmethod 

1528 def __get_pydantic_core_schema__( 

1529 cls, 

1530 source_type: type[Self] | type[str], 

1531 handler: "GetCoreSchemaHandler", 

1532 ) -> "CoreSchema": 

1533 # Lazy import: pulling in pydantic_core at module load time 

1534 # increases yarl's import cost 3-7x for users who don't use 

1535 # pydantic. Keep this import function-scoped. 

1536 from pydantic_core import core_schema # noqa: PLC0415 

1537 

1538 from_str_schema = core_schema.chain_schema( 

1539 [ 

1540 core_schema.str_schema(), 

1541 core_schema.no_info_plain_validator_function(URL), 

1542 ] 

1543 ) 

1544 

1545 return core_schema.json_or_python_schema( 

1546 json_schema=from_str_schema, 

1547 python_schema=core_schema.union_schema( 

1548 [ 

1549 # check if it's an instance first before doing any further work 

1550 core_schema.is_instance_schema(URL), 

1551 from_str_schema, 

1552 ] 

1553 ), 

1554 serialization=core_schema.plain_serializer_function_ser_schema(str), 

1555 ) 

1556 

1557 

1558_DEFAULT_IDNA_SIZE = 256 

1559_DEFAULT_ENCODE_SIZE = 512 

1560 

1561 

1562@lru_cache(_DEFAULT_IDNA_SIZE) 

1563def _idna_decode(raw: str) -> str: 

1564 try: 

1565 return idna.decode(raw.encode("ascii")) 

1566 except UnicodeError: # e.g. '::1' 

1567 return raw.encode("ascii").decode("idna") 

1568 

1569 

1570@lru_cache(_DEFAULT_IDNA_SIZE) 

1571def _idna_encode(host: str) -> str: 

1572 try: 

1573 return idna.encode(host, uts46=True).decode("ascii") 

1574 except UnicodeError: 

1575 return host.encode("idna").decode("ascii") 

1576 

1577 

1578@lru_cache(_DEFAULT_ENCODE_SIZE) 

1579def _encode_host(host: str, validate_host: bool) -> str: 

1580 """Encode host part of URL.""" 

1581 # If the host ends with a digit or contains a colon, its likely 

1582 # an IP address. 

1583 if host and (host[-1].isdigit() or ":" in host): 

1584 raw_ip, sep, zone = host.partition("%") 

1585 # If it looks like an IP, we check with _ip_compressed_version 

1586 # and fall-through if its not an IP address. This is a performance 

1587 # optimization to avoid parsing IP addresses as much as possible 

1588 # because it is orders of magnitude slower than almost any other 

1589 # operation this library does. 

1590 # Might be an IP address, check it 

1591 # 

1592 # IP Addresses can look like: 

1593 # https://datatracker.ietf.org/doc/html/rfc3986#section-3.2.2 

1594 # - 127.0.0.1 (last character is a digit) 

1595 # - 2001:db8::ff00:42:8329 (contains a colon) 

1596 # - 2001:db8::ff00:42:8329%eth0 (contains a colon) 

1597 # - [2001:db8::ff00:42:8329] (contains a colon -- brackets should 

1598 # have been removed before it gets here) 

1599 # Rare IP Address formats are not supported per: 

1600 # https://datatracker.ietf.org/doc/html/rfc3986#section-7.4 

1601 # 

1602 # IP parsing is slow, so its wrapped in an LRU 

1603 try: 

1604 ip = ip_address(raw_ip) 

1605 except ValueError: 

1606 pass 

1607 else: 

1608 # These checks should not happen in the 

1609 # LRU to keep the cache size small 

1610 host = ip.compressed 

1611 if ip.version == 6: 

1612 return f"[{host}%{zone}]" if sep else f"[{host}]" 

1613 return f"{host}%{zone}" if sep else host 

1614 

1615 # IDNA encoding is slow, skip it for ASCII-only strings 

1616 if host.isascii(): 

1617 # Check for invalid characters explicitly; _idna_encode() does this 

1618 # for non-ascii host names. 

1619 host = host.lower() 

1620 if validate_host and (invalid := NOT_REG_NAME.search(host)): 

1621 value, pos, extra = invalid.group(), invalid.start(), "" 

1622 if value == "@" or (value == ":" and "@" in host[pos:]): 

1623 # this looks like an authority string 

1624 extra = ( 

1625 ", if the value includes a username or password, " 

1626 "use 'authority' instead of 'host'" 

1627 ) 

1628 raise ValueError( 

1629 f"Host {host!r} cannot contain {value!r} (at position {pos}){extra}" 

1630 ) from None 

1631 return host 

1632 

1633 return _idna_encode(host) 

1634 

1635 

1636@rewrite_module 

1637def cache_clear() -> None: 

1638 """Clear all LRU caches.""" 

1639 _idna_encode.cache_clear() 

1640 _idna_decode.cache_clear() 

1641 _encode_host.cache_clear() 

1642 

1643 

1644@rewrite_module 

1645def cache_info() -> CacheInfo: 

1646 """Report cache statistics.""" 

1647 return { 

1648 "idna_encode": _idna_encode.cache_info(), 

1649 "idna_decode": _idna_decode.cache_info(), 

1650 "ip_address": _encode_host.cache_info(), 

1651 "host_validate": _encode_host.cache_info(), 

1652 "encode_host": _encode_host.cache_info(), 

1653 } 

1654 

1655 

1656@rewrite_module 

1657def cache_configure( 

1658 *, 

1659 idna_encode_size: int | None = _DEFAULT_IDNA_SIZE, 

1660 idna_decode_size: int | None = _DEFAULT_IDNA_SIZE, 

1661 ip_address_size: int | None | UndefinedType = UNDEFINED, 

1662 host_validate_size: int | None | UndefinedType = UNDEFINED, 

1663 encode_host_size: int | None | UndefinedType = UNDEFINED, 

1664) -> None: 

1665 """Configure LRU cache sizes.""" 

1666 global _idna_decode, _idna_encode, _encode_host 

1667 # ip_address_size, host_validate_size are no longer 

1668 # used, but are kept for backwards compatibility. 

1669 if ip_address_size is not UNDEFINED or host_validate_size is not UNDEFINED: 

1670 warnings.warn( 

1671 "cache_configure() no longer accepts the " 

1672 "ip_address_size or host_validate_size arguments, " 

1673 "they are used to set the encode_host_size instead " 

1674 "and will be removed in the future", 

1675 DeprecationWarning, 

1676 stacklevel=2, 

1677 ) 

1678 

1679 if encode_host_size is not None: 

1680 for size in (ip_address_size, host_validate_size): 

1681 if size is None: 

1682 encode_host_size = None 

1683 elif encode_host_size is UNDEFINED: 

1684 if size is not UNDEFINED: 

1685 encode_host_size = size 

1686 elif size is not UNDEFINED: 

1687 if TYPE_CHECKING: 

1688 assert isinstance(size, int) 

1689 assert isinstance(encode_host_size, int) 

1690 encode_host_size = max(size, encode_host_size) 

1691 if encode_host_size is UNDEFINED: 

1692 encode_host_size = _DEFAULT_ENCODE_SIZE 

1693 

1694 _encode_host = lru_cache(encode_host_size)(_encode_host.__wrapped__) 

1695 _idna_decode = lru_cache(idna_decode_size)(_idna_decode.__wrapped__) 

1696 _idna_encode = lru_cache(idna_encode_size)(_idna_encode.__wrapped__)