Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/yarl/_url.py: 45%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import re
2import sys
3import warnings
4from collections.abc import Mapping, Sequence
5from enum import Enum
6from functools import _CacheInfo, lru_cache
7from importlib.util import find_spec
8from ipaddress import ip_address
9from typing import (
10 TYPE_CHECKING,
11 Any,
12 NoReturn,
13 TypedDict,
14 TypeVar,
15 Union,
16 cast,
17 overload,
18)
19from urllib.parse import SplitResult, scheme_chars, uses_relative
21import idna
22from multidict import MultiDict, MultiDictProxy, istr
23from propcache.api import under_cached_property as cached_property
25from ._parse import (
26 USES_AUTHORITY,
27 SplitURLType,
28 make_netloc,
29 query_to_pairs,
30 split_netloc,
31 split_url,
32 unsplit_result,
33)
34from ._path import normalize_path, normalize_path_segments
35from ._query import (
36 Query,
37 QueryVariable,
38 SimpleQuery,
39 get_str_query,
40 get_str_query_from_iterable,
41 get_str_query_from_sequence_iterable,
42)
43from ._quoters import (
44 FRAGMENT_QUOTER,
45 FRAGMENT_REQUOTER,
46 PATH_QUOTER,
47 PATH_REQUOTER,
48 PATH_SAFE_UNQUOTER,
49 PATH_UNQUOTER,
50 QS_UNQUOTER,
51 QUERY_QUOTER,
52 QUERY_REQUOTER,
53 QUOTER,
54 REQUOTER,
55 UNQUOTER,
56 human_quote,
57)
59# Avoid Pydantic import if not used (increases yarl's import time by 3-7x).
60HAS_PYDANTIC = find_spec("pydantic_core") is not None
61if TYPE_CHECKING:
62 from pydantic import GetCoreSchemaHandler, GetJsonSchemaHandler
63 from pydantic.json_schema import JsonSchemaValue
64 from pydantic_core import CoreSchema
67DEFAULT_PORTS = {"http": 80, "https": 443, "ws": 80, "wss": 443, "ftp": 21}
68USES_RELATIVE = frozenset(uses_relative)
69_SCHEME_CHARS = frozenset(scheme_chars)
71# Special schemes https://url.spec.whatwg.org/#special-scheme
72# are not allowed to have an empty host https://url.spec.whatwg.org/#url-representation
73SCHEME_REQUIRES_HOST = frozenset(("http", "https", "ws", "wss", "ftp"))
76# reg-name: unreserved / pct-encoded / sub-delims
77# this pattern matches anything that is *not* in those classes. and is only used
78# on lower-cased ASCII values.
79NOT_REG_NAME = re.compile(
80 r"""
81 # any character not in the unreserved or sub-delims sets, plus %
82 # (validated with the additional check for pct-encoded sequences below)
83 [^a-z0-9\-._~!$&'()*+,;=%]
84 |
85 # % only allowed if it is part of a pct-encoded
86 # sequence of 2 hex digits.
87 %(?![0-9a-f]{2})
88 """,
89 re.VERBOSE,
90)
92_T = TypeVar("_T")
94if sys.version_info >= (3, 11):
95 from typing import Self
96else:
97 Self = Any
100class UndefinedType(Enum):
101 """Singleton type for use with not set sentinel values."""
103 _singleton = 0
106UNDEFINED = UndefinedType._singleton
109class CacheInfo(TypedDict):
110 """Host encoding cache."""
112 idna_encode: _CacheInfo
113 idna_decode: _CacheInfo
114 ip_address: _CacheInfo
115 host_validate: _CacheInfo
116 encode_host: _CacheInfo
119class _InternalURLCache(TypedDict, total=False):
120 _val: SplitURLType
121 _origin: "URL"
122 absolute: bool
123 hash: int
124 scheme: str
125 raw_authority: str
126 authority: str
127 raw_user: str | None
128 user: str | None
129 raw_password: str | None
130 password: str | None
131 raw_host: str | None
132 host: str | None
133 host_subcomponent: str | None
134 host_port_subcomponent: str | None
135 port: int | None
136 explicit_port: int | None
137 raw_path: str
138 path: str
139 _parsed_query: list[tuple[str, str]]
140 query: "MultiDictProxy[str]"
141 raw_query_string: str
142 query_string: str
143 path_qs: str
144 raw_path_qs: str
145 raw_fragment: str
146 fragment: str
147 raw_parts: tuple[str, ...]
148 parts: tuple[str, ...]
149 parent: "URL"
150 raw_name: str
151 name: str
152 raw_suffix: str
153 suffix: str
154 raw_suffixes: tuple[str, ...]
155 suffixes: tuple[str, ...]
158def rewrite_module(obj: _T) -> _T:
159 obj.__module__ = "yarl"
160 return obj
163def _encode_relative_scheme_colon(path: str) -> str:
164 """Re-encode a scheme-shaped leading ``:`` in a relative path to ``%3A``."""
165 colon_pos = path.find(":")
166 if colon_pos <= 0:
167 return path
168 for c in path[:colon_pos]:
169 if c not in _SCHEME_CHARS:
170 return path
171 return path[:colon_pos] + "%3A" + path[colon_pos + 1 :]
174@lru_cache
175def encode_url(url_str: str) -> "URL":
176 """Parse unencoded URL."""
177 cache: _InternalURLCache = {}
178 host: str | None
179 scheme, netloc, path, query, fragment = split_url(url_str)
180 if not netloc: # netloc
181 host = ""
182 else:
183 if ":" in netloc or "@" in netloc or "[" in netloc:
184 # Complex netloc
185 username, password, host, port = split_netloc(netloc)
186 else:
187 username = password = port = None
188 host = netloc
189 if host is None:
190 if scheme in SCHEME_REQUIRES_HOST:
191 msg = (
192 "Invalid URL: host is required for "
193 f"absolute urls with the {scheme} scheme"
194 )
195 raise ValueError(msg)
196 else:
197 host = ""
198 host = _encode_host(host, validate_host=False)
199 # Remove brackets as host encoder adds back brackets for IPv6 addresses
200 cache["raw_host"] = host[1:-1] if "[" in host else host
201 cache["explicit_port"] = port
202 if password is None and username is None:
203 # Fast path for URLs without user, password
204 netloc = host if port is None else f"{host}:{port}"
205 cache["raw_user"] = None
206 cache["raw_password"] = None
207 else:
208 raw_user = REQUOTER(username) if username else username
209 raw_password = REQUOTER(password) if password else password
210 netloc = make_netloc(raw_user, raw_password, host, port)
211 cache["raw_user"] = raw_user
212 cache["raw_password"] = raw_password
214 if path:
215 path = PATH_REQUOTER(path)
216 if netloc and "." in path:
217 path = normalize_path(path)
218 elif not scheme and not netloc:
219 path = _encode_relative_scheme_colon(path)
220 if query:
221 query = QUERY_REQUOTER(query)
222 if fragment:
223 fragment = FRAGMENT_REQUOTER(fragment)
225 cache["scheme"] = scheme
226 cache["raw_path"] = "/" if not path and netloc else path
227 cache["raw_query_string"] = query
228 cache["raw_fragment"] = fragment
230 self = object.__new__(URL)
231 self._scheme = scheme
232 self._netloc = netloc
233 self._path = path
234 self._query = query
235 self._fragment = fragment
236 self._cache = cache
237 return self
240@lru_cache
241def pre_encoded_url(url_str: str) -> "URL":
242 """Parse pre-encoded URL."""
243 self = object.__new__(URL)
244 val = split_url(url_str)
245 self._scheme, self._netloc, self._path, self._query, self._fragment = val
246 self._cache = {}
247 return self
250@lru_cache
251def build_pre_encoded_url(
252 scheme: str,
253 authority: str,
254 user: str | None,
255 password: str | None,
256 host: str,
257 port: int | None,
258 path: str,
259 query_string: str,
260 fragment: str,
261) -> "URL":
262 """Build a pre-encoded URL from parts."""
263 self = object.__new__(URL)
264 self._scheme = scheme
265 if authority:
266 self._netloc = authority
267 elif host:
268 if port is not None:
269 port = None if port == DEFAULT_PORTS.get(scheme) else port
270 if user is None and password is None:
271 self._netloc = host if port is None else f"{host}:{port}"
272 else:
273 self._netloc = make_netloc(user, password, host, port)
274 else:
275 self._netloc = ""
276 self._path = path
277 self._query = query_string
278 self._fragment = fragment
279 self._cache = {}
280 return self
283def from_parts_uncached(
284 scheme: str, netloc: str, path: str, query: str, fragment: str
285) -> "URL":
286 """Create a new URL from parts."""
287 self = object.__new__(URL)
288 self._scheme = scheme
289 self._netloc = netloc
290 self._path = path
291 self._query = query
292 self._fragment = fragment
293 self._cache = {}
294 return self
297from_parts = lru_cache(from_parts_uncached)
300@rewrite_module
301class URL:
302 # Don't derive from str
303 # follow pathlib.Path design
304 # probably URL will not suffer from pathlib problems:
305 # it's intended for libraries like aiohttp,
306 # not to be passed into standard library functions like os.open etc.
308 # URL grammar (RFC 3986)
309 # pct-encoded = "%" HEXDIG HEXDIG
310 # reserved = gen-delims / sub-delims
311 # gen-delims = ":" / "/" / "?" / "#" / "[" / "]" / "@"
312 # sub-delims = "!" / "$" / "&" / "'" / "(" / ")"
313 # / "*" / "+" / "," / ";" / "="
314 # unreserved = ALPHA / DIGIT / "-" / "." / "_" / "~"
315 # URI = scheme ":" hier-part [ "?" query ] [ "#" fragment ]
316 # hier-part = "//" authority path-abempty
317 # / path-absolute
318 # / path-rootless
319 # / path-empty
320 # scheme = ALPHA *( ALPHA / DIGIT / "+" / "-" / "." )
321 # authority = [ userinfo "@" ] host [ ":" port ]
322 # userinfo = *( unreserved / pct-encoded / sub-delims / ":" )
323 # host = IP-literal / IPv4address / reg-name
324 # IP-literal = "[" ( IPv6address / IPvFuture ) "]"
325 # IPvFuture = "v" 1*HEXDIG "." 1*( unreserved / sub-delims / ":" )
326 # IPv6address = 6( h16 ":" ) ls32
327 # / "::" 5( h16 ":" ) ls32
328 # / [ h16 ] "::" 4( h16 ":" ) ls32
329 # / [ *1( h16 ":" ) h16 ] "::" 3( h16 ":" ) ls32
330 # / [ *2( h16 ":" ) h16 ] "::" 2( h16 ":" ) ls32
331 # / [ *3( h16 ":" ) h16 ] "::" h16 ":" ls32
332 # / [ *4( h16 ":" ) h16 ] "::" ls32
333 # / [ *5( h16 ":" ) h16 ] "::" h16
334 # / [ *6( h16 ":" ) h16 ] "::"
335 # ls32 = ( h16 ":" h16 ) / IPv4address
336 # ; least-significant 32 bits of address
337 # h16 = 1*4HEXDIG
338 # ; 16 bits of address represented in hexadecimal
339 # IPv4address = dec-octet "." dec-octet "." dec-octet "." dec-octet
340 # dec-octet = DIGIT ; 0-9
341 # / %x31-39 DIGIT ; 10-99
342 # / "1" 2DIGIT ; 100-199
343 # / "2" %x30-34 DIGIT ; 200-249
344 # / "25" %x30-35 ; 250-255
345 # reg-name = *( unreserved / pct-encoded / sub-delims )
346 # port = *DIGIT
347 # path = path-abempty ; begins with "/" or is empty
348 # / path-absolute ; begins with "/" but not "//"
349 # / path-noscheme ; begins with a non-colon segment
350 # / path-rootless ; begins with a segment
351 # / path-empty ; zero characters
352 # path-abempty = *( "/" segment )
353 # path-absolute = "/" [ segment-nz *( "/" segment ) ]
354 # path-noscheme = segment-nz-nc *( "/" segment )
355 # path-rootless = segment-nz *( "/" segment )
356 # path-empty = 0<pchar>
357 # segment = *pchar
358 # segment-nz = 1*pchar
359 # segment-nz-nc = 1*( unreserved / pct-encoded / sub-delims / "@" )
360 # ; non-zero-length segment without any colon ":"
361 # pchar = unreserved / pct-encoded / sub-delims / ":" / "@"
362 # query = *( pchar / "/" / "?" )
363 # fragment = *( pchar / "/" / "?" )
364 # URI-reference = URI / relative-ref
365 # relative-ref = relative-part [ "?" query ] [ "#" fragment ]
366 # relative-part = "//" authority path-abempty
367 # / path-absolute
368 # / path-noscheme
369 # / path-empty
370 # absolute-URI = scheme ":" hier-part [ "?" query ]
371 __slots__ = ("_cache", "_scheme", "_netloc", "_path", "_query", "_fragment")
373 _cache: _InternalURLCache
374 _scheme: str
375 _netloc: str
376 _path: str
377 _query: str
378 _fragment: str
380 def __new__(
381 cls,
382 val: Union[str, SplitResult, "URL", UndefinedType] = UNDEFINED,
383 *,
384 encoded: bool = False,
385 strict: bool | None = None,
386 ) -> "URL":
387 if strict is not None: # pragma: no cover
388 warnings.warn("strict parameter is ignored")
389 if type(val) is str:
390 return pre_encoded_url(val) if encoded else encode_url(val)
391 if type(val) is cls:
392 return val
393 if type(val) is SplitResult:
394 if not encoded:
395 raise ValueError("Cannot apply decoding to SplitResult")
396 return from_parts(*val)
397 if isinstance(val, str):
398 return pre_encoded_url(str(val)) if encoded else encode_url(str(val))
399 if val is UNDEFINED:
400 # Special case for UNDEFINED since it might be unpickling and we do
401 # not want to cache as the `__set_state__` call would mutate the URL
402 # object in the `pre_encoded_url` or `encoded_url` caches.
403 self = object.__new__(URL)
404 self._scheme = self._netloc = self._path = self._query = self._fragment = ""
405 self._cache = {}
406 return self
407 raise TypeError("Constructor parameter should be str")
409 @classmethod
410 def build(
411 cls,
412 *,
413 scheme: str = "",
414 authority: str = "",
415 user: str | None = None,
416 password: str | None = None,
417 host: str = "",
418 port: int | None = None,
419 path: str = "",
420 query: Query | None = None,
421 query_string: str = "",
422 fragment: str = "",
423 encoded: bool = False,
424 ) -> "URL":
425 """Creates and returns a new URL"""
427 if authority and (user or password or host or port):
428 raise ValueError(
429 'Can\'t mix "authority" with "user", "password", "host" or "port".'
430 )
431 if port is not None and not isinstance(port, int):
432 raise TypeError(f"The port is required to be int, got {type(port)!r}.")
433 if port and not host:
434 raise ValueError('Can\'t build URL with "port" but without "host".')
435 if query and query_string:
436 raise ValueError('Only one of "query" or "query_string" should be passed')
437 if (
438 scheme is None # type: ignore[redundant-expr]
439 or authority is None # type: ignore[redundant-expr]
440 or host is None # type: ignore[redundant-expr]
441 or path is None # type: ignore[redundant-expr]
442 or query_string is None # type: ignore[redundant-expr]
443 or fragment is None
444 ):
445 raise TypeError(
446 'NoneType is illegal for "scheme", "authority", "host", "path", '
447 '"query_string", and "fragment" args, use empty string instead.'
448 )
450 if query:
451 query_string = get_str_query(query) or ""
453 if encoded:
454 return build_pre_encoded_url(
455 scheme,
456 authority,
457 user,
458 password,
459 host,
460 port,
461 path,
462 query_string,
463 fragment,
464 )
466 self = object.__new__(URL)
467 self._scheme = scheme
468 _host: str | None = None
469 if authority:
470 user, password, _host, port = split_netloc(authority)
471 _host = _encode_host(_host, validate_host=False) if _host else ""
472 elif host:
473 _host = _encode_host(host, validate_host=True)
474 else:
475 self._netloc = ""
477 if _host is not None:
478 if port is not None:
479 port = None if port == DEFAULT_PORTS.get(scheme) else port
480 if user is None and password is None:
481 self._netloc = _host if port is None else f"{_host}:{port}"
482 else:
483 self._netloc = make_netloc(user, password, _host, port, True)
485 path = PATH_QUOTER(path) if path else path
486 if path and self._netloc:
487 if "." in path:
488 path = normalize_path(path)
489 if path[0] != "/":
490 msg = (
491 "Path in a URL with authority should "
492 "start with a slash ('/') if set"
493 )
494 raise ValueError(msg)
496 self._path = path
497 if not query and query_string:
498 query_string = QUERY_QUOTER(query_string)
499 self._query = query_string
500 self._fragment = FRAGMENT_QUOTER(fragment) if fragment else fragment
501 self._cache = {}
502 return self
504 def __init_subclass__(cls) -> NoReturn:
505 raise TypeError(f"Inheriting a class {cls!r} from URL is forbidden")
507 def __str__(self) -> str:
508 if not self._path and self._netloc and (self._query or self._fragment):
509 path = "/"
510 else:
511 path = self._path
512 if (port := self.explicit_port) is not None and port == DEFAULT_PORTS.get(
513 self._scheme
514 ):
515 # port normalization - using None for default ports to remove from rendering
516 # https://datatracker.ietf.org/doc/html/rfc3986.html#section-6.2.3
517 host = self.host_subcomponent
518 netloc = make_netloc(self.raw_user, self.raw_password, host, None)
519 else:
520 netloc = self._netloc
521 return unsplit_result(self._scheme, netloc, path, self._query, self._fragment)
523 def __repr__(self) -> str:
524 return f"{self.__class__.__name__}('{str(self)}')"
526 def __bytes__(self) -> bytes:
527 return str(self).encode("ascii")
529 def __eq__(self, other: object) -> bool:
530 if type(other) is not URL:
531 return NotImplemented
533 path1 = "/" if not self._path and self._netloc else self._path
534 path2 = "/" if not other._path and other._netloc else other._path
535 return (
536 self._scheme == other._scheme
537 and self._netloc == other._netloc
538 and path1 == path2
539 and self._query == other._query
540 and self._fragment == other._fragment
541 )
543 def __hash__(self) -> int:
544 if (ret := self._cache.get("hash")) is None:
545 path = "/" if not self._path and self._netloc else self._path
546 ret = self._cache["hash"] = hash(
547 (self._scheme, self._netloc, path, self._query, self._fragment)
548 )
549 return ret
551 def __le__(self, other: object) -> bool:
552 if type(other) is not URL:
553 return NotImplemented
554 return self._val <= other._val
556 def __lt__(self, other: object) -> bool:
557 if type(other) is not URL:
558 return NotImplemented
559 return self._val < other._val
561 def __ge__(self, other: object) -> bool:
562 if type(other) is not URL:
563 return NotImplemented
564 return self._val >= other._val
566 def __gt__(self, other: object) -> bool:
567 if type(other) is not URL:
568 return NotImplemented
569 return self._val > other._val
571 def __truediv__(self, name: str) -> "URL":
572 if not isinstance(name, str):
573 return NotImplemented
574 return self._make_child((str(name),))
576 def __mod__(self, query: Query) -> "URL":
577 return self.update_query(query)
579 def __bool__(self) -> bool:
580 return bool(self._netloc or self._path or self._query or self._fragment)
582 def __getstate__(self) -> tuple[SplitURLType]:
583 # Return a plain tuple rather than a ``SplitResult``. Constructing a
584 # ``SplitResult`` via ``tuple.__new__`` skips its ``__init__`` and on
585 # Python 3.15+ leaves ``_keep_empty`` unset, which breaks pickling: the
586 # new ``SplitResult.__getstate__`` indexes a state that ends up as
587 # ``None`` (gh-1632). ``__setstate__`` already unpacks both shapes, so
588 # pickles produced by older yarl releases (which embed a real
589 # ``SplitResult``) still load correctly.
590 return (self._val,)
592 def __setstate__(
593 self, state: tuple[SplitURLType] | tuple[None, _InternalURLCache]
594 ) -> None:
595 if state[0] is None and isinstance(state[1], dict):
596 # default style pickle
597 val = state[1]["_val"]
598 else:
599 unused: list[object]
600 val, *unused = state
601 self._scheme, self._netloc, self._path, self._query, self._fragment = val
602 self._cache = {}
604 def _cache_netloc(self) -> None:
605 """Cache the netloc parts of the URL."""
606 c = self._cache
607 split_loc = split_netloc(self._netloc)
608 c["raw_user"], c["raw_password"], c["raw_host"], c["explicit_port"] = split_loc
610 def is_absolute(self) -> bool:
611 """A check for absolute URLs.
613 Return True for absolute ones (having scheme or starting
614 with //), False otherwise.
616 Is is preferred to call the .absolute property instead
617 as it is cached.
618 """
619 return self.absolute
621 def is_default_port(self) -> bool:
622 """A check for default port.
624 Return True if port is default for specified scheme,
625 e.g. 'http://python.org' or 'http://python.org:80', False
626 otherwise.
628 Return False for relative URLs.
630 """
631 if (explicit := self.explicit_port) is None:
632 # If the explicit port is None, then the URL must be
633 # using the default port unless its a relative URL
634 # which does not have an implicit port / default port
635 return self._netloc != ""
636 return explicit == DEFAULT_PORTS.get(self._scheme)
638 def origin(self) -> "URL":
639 """Return an URL with scheme, host and port parts only.
641 user, password, path, query and fragment are removed.
643 """
644 # TODO: add a keyword-only option for keeping user/pass maybe?
645 return self._origin
647 @cached_property
648 def _val(self) -> SplitURLType:
649 return (self._scheme, self._netloc, self._path, self._query, self._fragment)
651 @cached_property
652 def _origin(self) -> "URL":
653 """Return an URL with scheme, host and port parts only.
655 user, password, path, query and fragment are removed.
656 """
657 if not (netloc := self._netloc):
658 raise ValueError("URL should be absolute")
659 if not (scheme := self._scheme):
660 raise ValueError("URL should have scheme")
661 if "@" in netloc:
662 encoded_host = self.host_subcomponent
663 netloc = make_netloc(None, None, encoded_host, self.explicit_port)
664 elif not self._path and not self._query and not self._fragment:
665 return self
666 return from_parts(scheme, netloc, "", "", "")
668 def relative(self) -> "URL":
669 """Return a relative part of the URL.
671 scheme, user, password, host and port are removed.
673 """
674 if not self._netloc:
675 raise ValueError("URL should be absolute")
676 return from_parts("", "", self._path, self._query, self._fragment)
678 @cached_property
679 def absolute(self) -> bool:
680 """A check for absolute URLs.
682 Return True for absolute ones (having scheme or starting
683 with //), False otherwise.
685 """
686 # `netloc`` is an empty string for relative URLs
687 # Checking `netloc` is faster than checking `hostname`
688 # because `hostname` is a property that does some extra work
689 # to parse the host from the `netloc`
690 return self._netloc != ""
692 @cached_property
693 def scheme(self) -> str:
694 """Scheme for absolute URLs.
696 Empty string for relative URLs or URLs starting with //
698 """
699 return self._scheme
701 @cached_property
702 def raw_authority(self) -> str:
703 """Encoded authority part of URL.
705 Empty string for relative URLs.
707 """
708 return self._netloc
710 @cached_property
711 def authority(self) -> str:
712 """Decoded authority part of URL.
714 Empty string for relative URLs.
716 """
717 return make_netloc(self.user, self.password, self.host, self.port)
719 @cached_property
720 def raw_user(self) -> str | None:
721 """Encoded user part of URL.
723 None if user is missing.
725 """
726 # not .username
727 self._cache_netloc()
728 return self._cache["raw_user"]
730 @cached_property
731 def user(self) -> str | None:
732 """Decoded user part of URL.
734 None if user is missing.
736 """
737 if (raw_user := self.raw_user) is None:
738 return None
739 return UNQUOTER(raw_user)
741 @cached_property
742 def raw_password(self) -> str | None:
743 """Encoded password part of URL.
745 None if password is missing.
747 """
748 self._cache_netloc()
749 return self._cache["raw_password"]
751 @cached_property
752 def password(self) -> str | None:
753 """Decoded password part of URL.
755 None if password is missing.
757 """
758 if (raw_password := self.raw_password) is None:
759 return None
760 return UNQUOTER(raw_password)
762 @cached_property
763 def raw_host(self) -> str | None:
764 """Encoded host part of URL.
766 None for relative URLs.
768 When working with IPv6 addresses, use the `host_subcomponent` property instead
769 as it will return the host subcomponent with brackets.
770 """
771 # Use host instead of hostname for sake of shortness
772 # May add .hostname prop later
773 self._cache_netloc()
774 return self._cache["raw_host"]
776 @cached_property
777 def host(self) -> str | None:
778 """Decoded host part of URL.
780 None for relative URLs.
782 """
783 if (raw := self.raw_host) is None:
784 return None
785 if raw and raw[-1].isdigit() or ":" in raw:
786 # IP addresses are never IDNA encoded
787 return raw
788 return _idna_decode(raw)
790 @cached_property
791 def host_subcomponent(self) -> str | None:
792 """Return the host subcomponent part of URL.
794 None for relative URLs.
796 https://datatracker.ietf.org/doc/html/rfc3986#section-3.2.2
798 `IP-literal = "[" ( IPv6address / IPvFuture ) "]"`
800 Examples:
801 - `http://example.com:8080` -> `example.com`
802 - `http://example.com:80` -> `example.com`
803 - `https://127.0.0.1:8443` -> `127.0.0.1`
804 - `https://[::1]:8443` -> `[::1]`
805 - `http://[::1]` -> `[::1]`
807 """
808 if (raw := self.raw_host) is None:
809 return None
810 return f"[{raw}]" if ":" in raw else raw
812 @cached_property
813 def host_port_subcomponent(self) -> str | None:
814 """Return the host and port subcomponent part of URL.
816 Trailing dots are removed from the host part.
818 This value is suitable for use in the Host header of an HTTP request.
820 None for relative URLs.
822 https://datatracker.ietf.org/doc/html/rfc3986#section-3.2.2
823 `IP-literal = "[" ( IPv6address / IPvFuture ) "]"`
824 https://datatracker.ietf.org/doc/html/rfc3986#section-3.2.3
825 port = *DIGIT
827 Examples:
828 - `http://example.com:8080` -> `example.com:8080`
829 - `http://example.com:80` -> `example.com`
830 - `http://example.com.:80` -> `example.com`
831 - `https://127.0.0.1:8443` -> `127.0.0.1:8443`
832 - `https://[::1]:8443` -> `[::1]:8443`
833 - `http://[::1]` -> `[::1]`
835 """
836 if (raw := self.raw_host) is None:
837 return None
838 if raw[-1] == ".":
839 # Remove all trailing dots from the netloc as while
840 # they are valid FQDNs in DNS, TLS validation fails.
841 # See https://github.com/aio-libs/aiohttp/issues/3636.
842 # To avoid string manipulation we only call rstrip if
843 # the last character is a dot.
844 raw = raw.rstrip(".")
845 port = self.explicit_port
846 if port is None or port == DEFAULT_PORTS.get(self._scheme):
847 return f"[{raw}]" if ":" in raw else raw
848 return f"[{raw}]:{port}" if ":" in raw else f"{raw}:{port}"
850 @cached_property
851 def port(self) -> int | None:
852 """Port part of URL, with scheme-based fallback.
854 None for relative URLs or URLs without explicit port and
855 scheme without default port substitution.
857 """
858 if (explicit_port := self.explicit_port) is not None:
859 return explicit_port
860 return DEFAULT_PORTS.get(self._scheme)
862 @cached_property
863 def explicit_port(self) -> int | None:
864 """Port part of URL, without scheme-based fallback.
866 None for relative URLs or URLs without explicit port.
868 """
869 self._cache_netloc()
870 return self._cache["explicit_port"]
872 @cached_property
873 def raw_path(self) -> str:
874 """Encoded path of URL.
876 / for absolute URLs without path part.
878 """
879 return self._path if self._path or not self._netloc else "/"
881 @cached_property
882 def path(self) -> str:
883 """Decoded path of URL.
885 / for absolute URLs without path part.
887 """
888 return PATH_UNQUOTER(self._path) if self._path else "/" if self._netloc else ""
890 @cached_property
891 def path_safe(self) -> str:
892 """Decoded path of URL.
894 / for absolute URLs without path part.
896 / (%2F) and % (%25) are not decoded
898 """
899 if self._path:
900 return PATH_SAFE_UNQUOTER(self._path)
901 return "/" if self._netloc else ""
903 @cached_property
904 def _parsed_query(self) -> list[tuple[str, str]]:
905 """Parse query part of URL."""
906 return query_to_pairs(self._query)
908 @cached_property
909 def query(self) -> "MultiDictProxy[str]":
910 """A MultiDictProxy representing parsed query parameters in decoded
911 representation.
913 Empty value if URL has no query part.
915 """
916 return MultiDictProxy(MultiDict(self._parsed_query))
918 @cached_property
919 def raw_query_string(self) -> str:
920 """Encoded query part of URL.
922 Empty string if query is missing.
924 """
925 return self._query
927 @cached_property
928 def query_string(self) -> str:
929 """Decoded query part of URL.
931 Empty string if query is missing.
933 """
934 return QS_UNQUOTER(self._query) if self._query else ""
936 @cached_property
937 def path_qs(self) -> str:
938 """Decoded path of URL with query."""
939 return self.path if not (q := self.query_string) else f"{self.path}?{q}"
941 @cached_property
942 def raw_path_qs(self) -> str:
943 """Encoded path of URL with query."""
944 if q := self._query:
945 return f"{self._path}?{q}" if self._path or not self._netloc else f"/?{q}"
946 return self._path if self._path or not self._netloc else "/"
948 @cached_property
949 def raw_fragment(self) -> str:
950 """Encoded fragment part of URL.
952 Empty string if fragment is missing.
954 """
955 return self._fragment
957 @cached_property
958 def fragment(self) -> str:
959 """Decoded fragment part of URL.
961 Empty string if fragment is missing.
963 """
964 return UNQUOTER(self._fragment) if self._fragment else ""
966 @cached_property
967 def raw_parts(self) -> tuple[str, ...]:
968 """A tuple containing encoded *path* parts.
970 ('/',) for absolute URLs if *path* is missing.
972 """
973 path = self._path
974 if self._netloc:
975 return ("/", *path[1:].split("/")) if path else ("/",)
976 if path and path[0] == "/":
977 return ("/", *path[1:].split("/"))
978 return tuple(path.split("/"))
980 @cached_property
981 def parts(self) -> tuple[str, ...]:
982 """A tuple containing decoded *path* parts.
984 ('/',) for absolute URLs if *path* is missing.
986 """
987 return tuple(UNQUOTER(part) for part in self.raw_parts)
989 @cached_property
990 def parent(self) -> "URL":
991 """A new URL with last part of path removed and cleaned up query and
992 fragment.
994 """
995 path = self._path
996 if not path or path == "/":
997 if self._fragment or self._query:
998 return from_parts(self._scheme, self._netloc, path, "", "")
999 return self
1000 parts = path.split("/")
1001 return from_parts(self._scheme, self._netloc, "/".join(parts[:-1]), "", "")
1003 @cached_property
1004 def raw_name(self) -> str:
1005 """The last part of raw_parts."""
1006 parts = self.raw_parts
1007 if not self._netloc:
1008 return parts[-1]
1009 parts = parts[1:]
1010 return parts[-1] if parts else ""
1012 @cached_property
1013 def name(self) -> str:
1014 """The last part of parts."""
1015 return UNQUOTER(self.raw_name)
1017 @cached_property
1018 def raw_suffix(self) -> str:
1019 name = self.raw_name
1020 i = name.rfind(".")
1021 return name[i:] if 0 < i < len(name) - 1 else ""
1023 @cached_property
1024 def suffix(self) -> str:
1025 return UNQUOTER(self.raw_suffix)
1027 @cached_property
1028 def raw_suffixes(self) -> tuple[str, ...]:
1029 name = self.raw_name
1030 if name.endswith("."):
1031 return ()
1032 name = name.lstrip(".")
1033 return tuple("." + suffix for suffix in name.split(".")[1:])
1035 @cached_property
1036 def suffixes(self) -> tuple[str, ...]:
1037 return tuple(UNQUOTER(suffix) for suffix in self.raw_suffixes)
1039 def _make_child(self, paths: "Sequence[str]", encoded: bool = False) -> "URL":
1040 """
1041 add paths to self._path, accounting for absolute vs relative paths,
1042 keep existing, but do not create new, empty segments
1043 """
1044 parsed: list[str] = []
1045 needs_normalize: bool = False
1046 for idx, path in enumerate(reversed(paths)):
1047 # empty segment of last is not removed
1048 last = idx == 0
1049 if path and path[0] == "/":
1050 raise ValueError(
1051 f"Appending path {path!r} starting from slash is forbidden"
1052 )
1053 # We need to quote the path if it is not already encoded
1054 # This cannot be done at the end because the existing
1055 # path is already quoted and we do not want to double quote
1056 # the existing path.
1057 path = path if encoded else PATH_QUOTER(path)
1058 needs_normalize |= "." in path
1059 segments = path.split("/")
1060 segments.reverse()
1061 # remove trailing empty segment for all but the last path
1062 parsed += segments[1:] if not last and segments[0] == "" else segments
1064 if (path := self._path) and (old_segments := path.split("/")):
1065 # If the old path ends with a slash, the last segment is an empty string
1066 # and should be removed before adding the new path segments.
1067 old = old_segments[:-1] if old_segments[-1] == "" else old_segments
1068 old.reverse()
1069 parsed += old
1071 # If the netloc is present, inject a leading slash when adding a
1072 # path to an absolute URL where there was none before.
1073 if (netloc := self._netloc) and parsed and parsed[-1] != "":
1074 parsed.append("")
1076 parsed.reverse()
1077 if not netloc or not needs_normalize:
1078 return from_parts(self._scheme, netloc, "/".join(parsed), "", "")
1080 path = "/".join(normalize_path_segments(parsed))
1081 # If normalizing the path segments removed the leading slash, add it back.
1082 if path and path[0] != "/":
1083 path = f"/{path}"
1084 return from_parts(self._scheme, netloc, path, "", "")
1086 def with_scheme(self, scheme: str) -> "URL":
1087 """Return a new URL with scheme replaced."""
1088 # N.B. doesn't cleanup query/fragment
1089 if not isinstance(scheme, str):
1090 raise TypeError("Invalid scheme type")
1091 lower_scheme = scheme.lower()
1092 netloc = self._netloc
1093 if not netloc and lower_scheme in SCHEME_REQUIRES_HOST:
1094 msg = (
1095 "scheme replacement is not allowed for "
1096 f"relative URLs for the {lower_scheme} scheme"
1097 )
1098 raise ValueError(msg)
1099 return from_parts(lower_scheme, netloc, self._path, self._query, self._fragment)
1101 def with_user(self, user: str | None) -> "URL":
1102 """Return a new URL with user replaced.
1104 Autoencode user if needed.
1106 Clear user/password if user is None.
1108 """
1109 # N.B. doesn't cleanup query/fragment
1110 if user is None:
1111 password = None
1112 elif isinstance(user, str):
1113 user = QUOTER(user)
1114 password = self.raw_password
1115 else:
1116 raise TypeError("Invalid user type")
1117 if not (netloc := self._netloc):
1118 raise ValueError("user replacement is not allowed for relative URLs")
1119 encoded_host = self.host_subcomponent or ""
1120 netloc = make_netloc(user, password, encoded_host, self.explicit_port)
1121 return from_parts(self._scheme, netloc, self._path, self._query, self._fragment)
1123 def with_password(self, password: str | None) -> "URL":
1124 """Return a new URL with password replaced.
1126 Autoencode password if needed.
1128 Clear password if argument is None.
1130 """
1131 # N.B. doesn't cleanup query/fragment
1132 if password is None:
1133 pass
1134 elif isinstance(password, str):
1135 password = QUOTER(password)
1136 else:
1137 raise TypeError("Invalid password type")
1138 if not (netloc := self._netloc):
1139 raise ValueError("password replacement is not allowed for relative URLs")
1140 encoded_host = self.host_subcomponent or ""
1141 port = self.explicit_port
1142 netloc = make_netloc(self.raw_user, password, encoded_host, port)
1143 return from_parts(self._scheme, netloc, self._path, self._query, self._fragment)
1145 def with_host(self, host: str) -> "URL":
1146 """Return a new URL with host replaced.
1148 Autoencode host if needed.
1150 Changing host for relative URLs is not allowed, use .join()
1151 instead.
1153 """
1154 # N.B. doesn't cleanup query/fragment
1155 if not isinstance(host, str):
1156 raise TypeError("Invalid host type")
1157 if not (netloc := self._netloc):
1158 raise ValueError("host replacement is not allowed for relative URLs")
1159 if not host:
1160 raise ValueError("host removing is not allowed")
1161 encoded_host = _encode_host(host, validate_host=True) if host else ""
1162 port = self.explicit_port
1163 netloc = make_netloc(self.raw_user, self.raw_password, encoded_host, port)
1164 return from_parts(self._scheme, netloc, self._path, self._query, self._fragment)
1166 def with_port(self, port: int | None) -> "URL":
1167 """Return a new URL with port replaced.
1169 Clear port to default if None is passed.
1171 """
1172 # N.B. doesn't cleanup query/fragment
1173 if port is not None:
1174 if isinstance(port, bool) or not isinstance(port, int):
1175 raise TypeError(f"port should be int or None, got {type(port)}")
1176 if not (0 <= port <= 65535):
1177 raise ValueError(f"port must be between 0 and 65535, got {port}")
1178 if not (netloc := self._netloc):
1179 raise ValueError("port replacement is not allowed for relative URLs")
1180 encoded_host = self.host_subcomponent or ""
1181 netloc = make_netloc(self.raw_user, self.raw_password, encoded_host, port)
1182 return from_parts(self._scheme, netloc, self._path, self._query, self._fragment)
1184 def with_path(
1185 self,
1186 path: str,
1187 *,
1188 encoded: bool = False,
1189 keep_query: bool = False,
1190 keep_fragment: bool = False,
1191 ) -> "URL":
1192 """Return a new URL with path replaced."""
1193 netloc = self._netloc
1194 if not encoded:
1195 path = PATH_QUOTER(path)
1196 if netloc:
1197 path = normalize_path(path) if "." in path else path
1198 if path and path[0] != "/":
1199 path = f"/{path}"
1200 query = self._query if keep_query else ""
1201 fragment = self._fragment if keep_fragment else ""
1202 return from_parts(self._scheme, netloc, path, query, fragment)
1204 @overload
1205 def with_query(self, query: Query) -> "URL": ...
1207 @overload
1208 def with_query(self, **kwargs: QueryVariable) -> "URL": ...
1210 def with_query(self, *args: Any, **kwargs: Any) -> "URL":
1211 """Return a new URL with query part replaced.
1213 Accepts any Mapping (e.g. dict, multidict.MultiDict instances)
1214 or str, autoencode the argument if needed.
1216 A sequence of (key, value) pairs is supported as well.
1218 It also can take an arbitrary number of keyword arguments.
1220 Clear query if None is passed.
1222 """
1223 # N.B. doesn't cleanup query/fragment
1224 query = get_str_query(*args, **kwargs) or ""
1225 return from_parts_uncached(
1226 self._scheme, self._netloc, self._path, query, self._fragment
1227 )
1229 @overload
1230 def extend_query(self, query: Query) -> "URL": ...
1232 @overload
1233 def extend_query(self, **kwargs: QueryVariable) -> "URL": ...
1235 def extend_query(self, *args: Any, **kwargs: Any) -> "URL":
1236 """Return a new URL with query part combined with the existing.
1238 This method will not remove existing query parameters.
1240 Example:
1241 >>> url = URL('http://example.com/?a=1&b=2')
1242 >>> url.extend_query(a=3, c=4)
1243 URL('http://example.com/?a=1&b=2&a=3&c=4')
1244 """
1245 if not (new_query := get_str_query(*args, **kwargs)):
1246 return self
1247 if query := self._query:
1248 # both strings are already encoded so we can use a simple
1249 # string join
1250 query += new_query if query[-1] == "&" else f"&{new_query}"
1251 else:
1252 query = new_query
1253 return from_parts_uncached(
1254 self._scheme, self._netloc, self._path, query, self._fragment
1255 )
1257 @overload
1258 def update_query(self, query: Query) -> "URL": ...
1260 @overload
1261 def update_query(self, **kwargs: QueryVariable) -> "URL": ...
1263 def update_query(self, *args: Any, **kwargs: Any) -> "URL":
1264 """Return a new URL with query part updated.
1266 This method will overwrite existing query parameters.
1268 Example:
1269 >>> url = URL('http://example.com/?a=1&b=2')
1270 >>> url.update_query(a=3, c=4)
1271 URL('http://example.com/?a=3&b=2&c=4')
1272 """
1273 in_query: (
1274 str
1275 | Mapping[str, QueryVariable]
1276 | Sequence[tuple[str | istr, SimpleQuery]]
1277 | None
1278 )
1279 if kwargs:
1280 if args:
1281 msg = "Either kwargs or single query parameter must be present"
1282 raise ValueError(msg)
1283 in_query = kwargs
1284 elif len(args) == 1:
1285 in_query = args[0]
1286 else:
1287 raise ValueError("Either kwargs or single query parameter must be present")
1289 if in_query is None:
1290 query = ""
1291 elif not in_query:
1292 query = self._query
1293 elif isinstance(in_query, Mapping):
1294 qm: MultiDict[QueryVariable] = MultiDict(self._parsed_query)
1295 qm.update(in_query)
1296 query = get_str_query_from_sequence_iterable(qm.items())
1297 elif isinstance(in_query, str):
1298 qstr: MultiDict[str] = MultiDict(self._parsed_query)
1299 qstr.update(query_to_pairs(in_query))
1300 query = get_str_query_from_iterable(qstr.items())
1301 elif isinstance(in_query, (bytes, bytearray, memoryview)):
1302 msg = "Invalid query type: bytes, bytearray and memoryview are forbidden"
1303 raise TypeError(msg)
1304 elif isinstance(in_query, Sequence):
1305 # We don't expect sequence values if we're given a list of pairs
1306 # already; only mappings like builtin `dict` which can't have the
1307 # same key pointing to multiple values are allowed to use
1308 # `_query_seq_pairs`.
1309 if TYPE_CHECKING:
1310 in_query = cast(
1311 Sequence[tuple[Union[str, istr], SimpleQuery]], in_query
1312 )
1313 qs: MultiDict[SimpleQuery] = MultiDict(self._parsed_query)
1314 qs.update(in_query)
1315 query = get_str_query_from_iterable(qs.items())
1316 else:
1317 raise TypeError(
1318 "Invalid query type: only str, mapping or "
1319 "sequence of (key, value) pairs is allowed"
1320 )
1321 return from_parts_uncached(
1322 self._scheme, self._netloc, self._path, query, self._fragment
1323 )
1325 def without_query_params(self, *query_params: str) -> "URL":
1326 """Remove some keys from query part and return new URL."""
1327 params_to_remove = set(query_params) & self.query.keys()
1328 if not params_to_remove:
1329 return self
1330 return self.with_query(
1331 tuple(
1332 (name, value)
1333 for name, value in self.query.items()
1334 if name not in params_to_remove
1335 )
1336 )
1338 def with_fragment(self, fragment: str | None) -> "URL":
1339 """Return a new URL with fragment replaced.
1341 Autoencode fragment if needed.
1343 Clear fragment to default if None is passed.
1345 """
1346 # N.B. doesn't cleanup query/fragment
1347 if fragment is None:
1348 raw_fragment = ""
1349 elif not isinstance(fragment, str):
1350 raise TypeError("Invalid fragment type")
1351 else:
1352 raw_fragment = FRAGMENT_QUOTER(fragment)
1353 if self._fragment == raw_fragment:
1354 return self
1355 return from_parts(
1356 self._scheme, self._netloc, self._path, self._query, raw_fragment
1357 )
1359 def with_name(
1360 self,
1361 name: str,
1362 *,
1363 keep_query: bool = False,
1364 keep_fragment: bool = False,
1365 ) -> "URL":
1366 """Return a new URL with name (last part of path) replaced.
1368 Query and fragment parts are cleaned up.
1370 Name is encoded if needed.
1372 """
1373 # N.B. DOES cleanup query/fragment
1374 if not isinstance(name, str):
1375 raise TypeError("Invalid name type")
1376 if "/" in name:
1377 raise ValueError("Slash in name is not allowed")
1378 name = PATH_QUOTER(name)
1379 if name in (".", ".."):
1380 raise ValueError(". and .. values are forbidden")
1381 parts = list(self.raw_parts)
1382 if netloc := self._netloc:
1383 if len(parts) == 1:
1384 parts.append(name)
1385 else:
1386 parts[-1] = name
1387 parts[0] = "" # replace leading '/'
1388 else:
1389 parts[-1] = name
1390 if parts[0] == "/":
1391 parts[0] = "" # replace leading '/'
1393 query = self._query if keep_query else ""
1394 fragment = self._fragment if keep_fragment else ""
1395 return from_parts(self._scheme, netloc, "/".join(parts), query, fragment)
1397 def with_suffix(
1398 self,
1399 suffix: str,
1400 *,
1401 keep_query: bool = False,
1402 keep_fragment: bool = False,
1403 ) -> "URL":
1404 """Return a new URL with suffix (file extension of name) replaced.
1406 Query and fragment parts are cleaned up.
1408 suffix is encoded if needed.
1409 """
1410 if not isinstance(suffix, str):
1411 raise TypeError("Invalid suffix type")
1412 if suffix and not suffix[0] == "." or suffix == "." or "/" in suffix:
1413 raise ValueError(f"Invalid suffix {suffix!r}")
1414 name = self.raw_name
1415 if not name:
1416 raise ValueError(f"{self!r} has an empty name")
1417 old_suffix = self.raw_suffix
1418 suffix = PATH_QUOTER(suffix)
1419 name = name + suffix if not old_suffix else name[: -len(old_suffix)] + suffix
1420 if name in (".", ".."):
1421 raise ValueError(". and .. values are forbidden")
1422 parts = list(self.raw_parts)
1423 if netloc := self._netloc:
1424 if len(parts) == 1:
1425 parts.append(name)
1426 else:
1427 parts[-1] = name
1428 parts[0] = "" # replace leading '/'
1429 else:
1430 parts[-1] = name
1431 if parts[0] == "/":
1432 parts[0] = "" # replace leading '/'
1434 query = self._query if keep_query else ""
1435 fragment = self._fragment if keep_fragment else ""
1436 return from_parts(self._scheme, netloc, "/".join(parts), query, fragment)
1438 def join(self, url: "URL") -> "URL":
1439 """Join URLs
1441 Construct a full (“absolute”) URL by combining a “base URL”
1442 (self) with another URL (url).
1444 Informally, this uses components of the base URL, in
1445 particular the addressing scheme, the network location and
1446 (part of) the path, to provide missing components in the
1447 relative URL.
1449 """
1450 if type(url) is not URL:
1451 raise TypeError("url should be URL")
1453 scheme = url._scheme or self._scheme
1454 if scheme != self._scheme or scheme not in USES_RELATIVE:
1455 return url
1457 # scheme is in uses_authority as uses_authority is a superset of uses_relative
1458 if (join_netloc := url._netloc) and scheme in USES_AUTHORITY:
1459 return from_parts(scheme, join_netloc, url._path, url._query, url._fragment)
1461 orig_path = self._path
1462 if join_path := url._path:
1463 if join_path[0] == "/":
1464 path = join_path
1465 elif not orig_path:
1466 path = f"/{join_path}"
1467 elif orig_path[-1] == "/":
1468 path = f"{orig_path}{join_path}"
1469 else:
1470 # …
1471 # and relativizing ".."
1472 # parts[0] is / for absolute urls,
1473 # this join will add a double slash there
1474 path = "/".join([*self.parts[:-1], ""]) + join_path
1475 # which has to be removed
1476 if orig_path[0] == "/":
1477 path = path[1:]
1478 path = normalize_path(path) if "." in path else path
1479 else:
1480 path = orig_path
1482 return from_parts(
1483 scheme,
1484 self._netloc,
1485 path,
1486 url._query if join_path or url._query else self._query,
1487 url._fragment if join_path or url._fragment else self._fragment,
1488 )
1490 def joinpath(self, *other: str, encoded: bool = False) -> "URL":
1491 """Return a new URL with the elements in other appended to the path."""
1492 return self._make_child(other, encoded=encoded)
1494 def human_repr(self) -> str:
1495 """Return decoded human readable string for URL representation."""
1496 user = human_quote(self.user, "#/:?@[]\\")
1497 password = human_quote(self.password, "#/:?@[]\\")
1498 if (host := self.host) and ":" in host:
1499 host = f"[{host}]"
1500 path = human_quote(self.path, "#?")
1501 if TYPE_CHECKING:
1502 assert path is not None
1503 if not self._scheme and not self._netloc:
1504 path = _encode_relative_scheme_colon(path)
1505 query_string = "&".join(
1506 "{}={}".format(human_quote(k, "#&+;="), human_quote(v, "#&+;="))
1507 for k, v in self.query.items()
1508 )
1509 fragment = human_quote(self.fragment, "")
1510 if TYPE_CHECKING:
1511 assert fragment is not None
1512 netloc = make_netloc(user, password, host, self.explicit_port)
1513 return unsplit_result(self._scheme, netloc, path, query_string, fragment)
1515 if HAS_PYDANTIC:
1516 # Borrowed from https://docs.pydantic.dev/latest/concepts/types/#handling-third-party-types
1517 @classmethod
1518 def __get_pydantic_json_schema__(
1519 cls,
1520 core_schema: "CoreSchema",
1521 handler: "GetJsonSchemaHandler",
1522 ) -> "JsonSchemaValue":
1523 field_schema: dict[str, Any] = {}
1524 field_schema.update(type="string", format="uri")
1525 return field_schema
1527 @classmethod
1528 def __get_pydantic_core_schema__(
1529 cls,
1530 source_type: type[Self] | type[str],
1531 handler: "GetCoreSchemaHandler",
1532 ) -> "CoreSchema":
1533 # Lazy import: pulling in pydantic_core at module load time
1534 # increases yarl's import cost 3-7x for users who don't use
1535 # pydantic. Keep this import function-scoped.
1536 from pydantic_core import core_schema # noqa: PLC0415
1538 from_str_schema = core_schema.chain_schema(
1539 [
1540 core_schema.str_schema(),
1541 core_schema.no_info_plain_validator_function(URL),
1542 ]
1543 )
1545 return core_schema.json_or_python_schema(
1546 json_schema=from_str_schema,
1547 python_schema=core_schema.union_schema(
1548 [
1549 # check if it's an instance first before doing any further work
1550 core_schema.is_instance_schema(URL),
1551 from_str_schema,
1552 ]
1553 ),
1554 serialization=core_schema.plain_serializer_function_ser_schema(str),
1555 )
1558_DEFAULT_IDNA_SIZE = 256
1559_DEFAULT_ENCODE_SIZE = 512
1562@lru_cache(_DEFAULT_IDNA_SIZE)
1563def _idna_decode(raw: str) -> str:
1564 try:
1565 return idna.decode(raw.encode("ascii"))
1566 except UnicodeError: # e.g. '::1'
1567 return raw.encode("ascii").decode("idna")
1570@lru_cache(_DEFAULT_IDNA_SIZE)
1571def _idna_encode(host: str) -> str:
1572 try:
1573 return idna.encode(host, uts46=True).decode("ascii")
1574 except UnicodeError:
1575 return host.encode("idna").decode("ascii")
1578@lru_cache(_DEFAULT_ENCODE_SIZE)
1579def _encode_host(host: str, validate_host: bool) -> str:
1580 """Encode host part of URL."""
1581 # If the host ends with a digit or contains a colon, its likely
1582 # an IP address.
1583 if host and (host[-1].isdigit() or ":" in host):
1584 raw_ip, sep, zone = host.partition("%")
1585 # If it looks like an IP, we check with _ip_compressed_version
1586 # and fall-through if its not an IP address. This is a performance
1587 # optimization to avoid parsing IP addresses as much as possible
1588 # because it is orders of magnitude slower than almost any other
1589 # operation this library does.
1590 # Might be an IP address, check it
1591 #
1592 # IP Addresses can look like:
1593 # https://datatracker.ietf.org/doc/html/rfc3986#section-3.2.2
1594 # - 127.0.0.1 (last character is a digit)
1595 # - 2001:db8::ff00:42:8329 (contains a colon)
1596 # - 2001:db8::ff00:42:8329%eth0 (contains a colon)
1597 # - [2001:db8::ff00:42:8329] (contains a colon -- brackets should
1598 # have been removed before it gets here)
1599 # Rare IP Address formats are not supported per:
1600 # https://datatracker.ietf.org/doc/html/rfc3986#section-7.4
1601 #
1602 # IP parsing is slow, so its wrapped in an LRU
1603 try:
1604 ip = ip_address(raw_ip)
1605 except ValueError:
1606 pass
1607 else:
1608 # These checks should not happen in the
1609 # LRU to keep the cache size small
1610 host = ip.compressed
1611 if ip.version == 6:
1612 return f"[{host}%{zone}]" if sep else f"[{host}]"
1613 return f"{host}%{zone}" if sep else host
1615 # IDNA encoding is slow, skip it for ASCII-only strings
1616 if host.isascii():
1617 # Check for invalid characters explicitly; _idna_encode() does this
1618 # for non-ascii host names.
1619 host = host.lower()
1620 if validate_host and (invalid := NOT_REG_NAME.search(host)):
1621 value, pos, extra = invalid.group(), invalid.start(), ""
1622 if value == "@" or (value == ":" and "@" in host[pos:]):
1623 # this looks like an authority string
1624 extra = (
1625 ", if the value includes a username or password, "
1626 "use 'authority' instead of 'host'"
1627 )
1628 raise ValueError(
1629 f"Host {host!r} cannot contain {value!r} (at position {pos}){extra}"
1630 ) from None
1631 return host
1633 return _idna_encode(host)
1636@rewrite_module
1637def cache_clear() -> None:
1638 """Clear all LRU caches."""
1639 _idna_encode.cache_clear()
1640 _idna_decode.cache_clear()
1641 _encode_host.cache_clear()
1644@rewrite_module
1645def cache_info() -> CacheInfo:
1646 """Report cache statistics."""
1647 return {
1648 "idna_encode": _idna_encode.cache_info(),
1649 "idna_decode": _idna_decode.cache_info(),
1650 "ip_address": _encode_host.cache_info(),
1651 "host_validate": _encode_host.cache_info(),
1652 "encode_host": _encode_host.cache_info(),
1653 }
1656@rewrite_module
1657def cache_configure(
1658 *,
1659 idna_encode_size: int | None = _DEFAULT_IDNA_SIZE,
1660 idna_decode_size: int | None = _DEFAULT_IDNA_SIZE,
1661 ip_address_size: int | None | UndefinedType = UNDEFINED,
1662 host_validate_size: int | None | UndefinedType = UNDEFINED,
1663 encode_host_size: int | None | UndefinedType = UNDEFINED,
1664) -> None:
1665 """Configure LRU cache sizes."""
1666 global _idna_decode, _idna_encode, _encode_host
1667 # ip_address_size, host_validate_size are no longer
1668 # used, but are kept for backwards compatibility.
1669 if ip_address_size is not UNDEFINED or host_validate_size is not UNDEFINED:
1670 warnings.warn(
1671 "cache_configure() no longer accepts the "
1672 "ip_address_size or host_validate_size arguments, "
1673 "they are used to set the encode_host_size instead "
1674 "and will be removed in the future",
1675 DeprecationWarning,
1676 stacklevel=2,
1677 )
1679 if encode_host_size is not None:
1680 for size in (ip_address_size, host_validate_size):
1681 if size is None:
1682 encode_host_size = None
1683 elif encode_host_size is UNDEFINED:
1684 if size is not UNDEFINED:
1685 encode_host_size = size
1686 elif size is not UNDEFINED:
1687 if TYPE_CHECKING:
1688 assert isinstance(size, int)
1689 assert isinstance(encode_host_size, int)
1690 encode_host_size = max(size, encode_host_size)
1691 if encode_host_size is UNDEFINED:
1692 encode_host_size = _DEFAULT_ENCODE_SIZE
1694 _encode_host = lru_cache(encode_host_size)(_encode_host.__wrapped__)
1695 _idna_decode = lru_cache(idna_decode_size)(_idna_decode.__wrapped__)
1696 _idna_encode = lru_cache(idna_encode_size)(_idna_encode.__wrapped__)