Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/yarl/_url.py: 38%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import re
2import sys
3import warnings
4from collections.abc import Mapping, Sequence
5from enum import Enum
6from functools import _CacheInfo, lru_cache
7from ipaddress import ip_address
8from typing import (
9 TYPE_CHECKING,
10 Any,
11 NoReturn,
12 TypedDict,
13 TypeVar,
14 Union,
15 cast,
16 overload,
17)
18from urllib.parse import SplitResult, uses_relative
20import idna
21from multidict import MultiDict, MultiDictProxy, istr
22from propcache.api import under_cached_property as cached_property
24from ._parse import (
25 USES_AUTHORITY,
26 SplitURLType,
27 make_netloc,
28 query_to_pairs,
29 split_netloc,
30 split_url,
31 unsplit_result,
32)
33from ._path import normalize_path, normalize_path_segments
34from ._query import (
35 Query,
36 QueryVariable,
37 SimpleQuery,
38 get_str_query,
39 get_str_query_from_iterable,
40 get_str_query_from_sequence_iterable,
41)
42from ._quoters import (
43 FRAGMENT_QUOTER,
44 FRAGMENT_REQUOTER,
45 PATH_QUOTER,
46 PATH_REQUOTER,
47 PATH_SAFE_UNQUOTER,
48 PATH_UNQUOTER,
49 QS_UNQUOTER,
50 QUERY_QUOTER,
51 QUERY_REQUOTER,
52 QUOTER,
53 REQUOTER,
54 UNQUOTER,
55 human_quote,
56)
58try:
59 from pydantic import GetCoreSchemaHandler, GetJsonSchemaHandler
60 from pydantic.json_schema import JsonSchemaValue
61 from pydantic_core import core_schema
63 HAS_PYDANTIC = True
64except ImportError:
65 HAS_PYDANTIC = False
68DEFAULT_PORTS = {"http": 80, "https": 443, "ws": 80, "wss": 443, "ftp": 21}
69USES_RELATIVE = frozenset(uses_relative)
71# Special schemes https://url.spec.whatwg.org/#special-scheme
72# are not allowed to have an empty host https://url.spec.whatwg.org/#url-representation
73SCHEME_REQUIRES_HOST = frozenset(("http", "https", "ws", "wss", "ftp"))
76# reg-name: unreserved / pct-encoded / sub-delims
77# this pattern matches anything that is *not* in those classes. and is only used
78# on lower-cased ASCII values.
79NOT_REG_NAME = re.compile(
80 r"""
81 # any character not in the unreserved or sub-delims sets, plus %
82 # (validated with the additional check for pct-encoded sequences below)
83 [^a-z0-9\-._~!$&'()*+,;=%]
84 |
85 # % only allowed if it is part of a pct-encoded
86 # sequence of 2 hex digits.
87 %(?![0-9a-f]{2})
88 """,
89 re.VERBOSE,
90)
92_T = TypeVar("_T")
94if sys.version_info >= (3, 11):
95 from typing import Self
96else:
97 Self = Any
100class UndefinedType(Enum):
101 """Singleton type for use with not set sentinel values."""
103 _singleton = 0
106UNDEFINED = UndefinedType._singleton
109class CacheInfo(TypedDict):
110 """Host encoding cache."""
112 idna_encode: _CacheInfo
113 idna_decode: _CacheInfo
114 ip_address: _CacheInfo
115 host_validate: _CacheInfo
116 encode_host: _CacheInfo
119class _InternalURLCache(TypedDict, total=False):
120 _val: SplitURLType
121 _origin: "URL"
122 absolute: bool
123 hash: int
124 scheme: str
125 raw_authority: str
126 authority: str
127 raw_user: str | None
128 user: str | None
129 raw_password: str | None
130 password: str | None
131 raw_host: str | None
132 host: str | None
133 host_subcomponent: str | None
134 host_port_subcomponent: str | None
135 port: int | None
136 explicit_port: int | None
137 raw_path: str
138 path: str
139 _parsed_query: list[tuple[str, str]]
140 query: "MultiDictProxy[str]"
141 raw_query_string: str
142 query_string: str
143 path_qs: str
144 raw_path_qs: str
145 raw_fragment: str
146 fragment: str
147 raw_parts: tuple[str, ...]
148 parts: tuple[str, ...]
149 parent: "URL"
150 raw_name: str
151 name: str
152 raw_suffix: str
153 suffix: str
154 raw_suffixes: tuple[str, ...]
155 suffixes: tuple[str, ...]
158def rewrite_module(obj: _T) -> _T:
159 obj.__module__ = "yarl"
160 return obj
163@lru_cache
164def encode_url(url_str: str) -> "URL":
165 """Parse unencoded URL."""
166 cache: _InternalURLCache = {}
167 host: str | None
168 scheme, netloc, path, query, fragment = split_url(url_str)
169 if not netloc: # netloc
170 host = ""
171 else:
172 if ":" in netloc or "@" in netloc or "[" in netloc:
173 # Complex netloc
174 username, password, host, port = split_netloc(netloc)
175 else:
176 username = password = port = None
177 host = netloc
178 if host is None:
179 if scheme in SCHEME_REQUIRES_HOST:
180 msg = (
181 "Invalid URL: host is required for "
182 f"absolute urls with the {scheme} scheme"
183 )
184 raise ValueError(msg)
185 else:
186 host = ""
187 host = _encode_host(host, validate_host=False)
188 # Remove brackets as host encoder adds back brackets for IPv6 addresses
189 cache["raw_host"] = host[1:-1] if "[" in host else host
190 cache["explicit_port"] = port
191 if password is None and username is None:
192 # Fast path for URLs without user, password
193 netloc = host if port is None else f"{host}:{port}"
194 cache["raw_user"] = None
195 cache["raw_password"] = None
196 else:
197 raw_user = REQUOTER(username) if username else username
198 raw_password = REQUOTER(password) if password else password
199 netloc = make_netloc(raw_user, raw_password, host, port)
200 cache["raw_user"] = raw_user
201 cache["raw_password"] = raw_password
203 if path:
204 path = PATH_REQUOTER(path)
205 if netloc and "." in path:
206 path = normalize_path(path)
207 if query:
208 query = QUERY_REQUOTER(query)
209 if fragment:
210 fragment = FRAGMENT_REQUOTER(fragment)
212 cache["scheme"] = scheme
213 cache["raw_path"] = "/" if not path and netloc else path
214 cache["raw_query_string"] = query
215 cache["raw_fragment"] = fragment
217 self = object.__new__(URL)
218 self._scheme = scheme
219 self._netloc = netloc
220 self._path = path
221 self._query = query
222 self._fragment = fragment
223 self._cache = cache
224 return self
227@lru_cache
228def pre_encoded_url(url_str: str) -> "URL":
229 """Parse pre-encoded URL."""
230 self = object.__new__(URL)
231 val = split_url(url_str)
232 self._scheme, self._netloc, self._path, self._query, self._fragment = val
233 self._cache = {}
234 return self
237@lru_cache
238def build_pre_encoded_url(
239 scheme: str,
240 authority: str,
241 user: str | None,
242 password: str | None,
243 host: str,
244 port: int | None,
245 path: str,
246 query_string: str,
247 fragment: str,
248) -> "URL":
249 """Build a pre-encoded URL from parts."""
250 self = object.__new__(URL)
251 self._scheme = scheme
252 if authority:
253 self._netloc = authority
254 elif host:
255 if port is not None:
256 port = None if port == DEFAULT_PORTS.get(scheme) else port
257 if user is None and password is None:
258 self._netloc = host if port is None else f"{host}:{port}"
259 else:
260 self._netloc = make_netloc(user, password, host, port)
261 else:
262 self._netloc = ""
263 self._path = path
264 self._query = query_string
265 self._fragment = fragment
266 self._cache = {}
267 return self
270def from_parts_uncached(
271 scheme: str, netloc: str, path: str, query: str, fragment: str
272) -> "URL":
273 """Create a new URL from parts."""
274 self = object.__new__(URL)
275 self._scheme = scheme
276 self._netloc = netloc
277 self._path = path
278 self._query = query
279 self._fragment = fragment
280 self._cache = {}
281 return self
284from_parts = lru_cache(from_parts_uncached)
287@rewrite_module
288class URL:
289 # Don't derive from str
290 # follow pathlib.Path design
291 # probably URL will not suffer from pathlib problems:
292 # it's intended for libraries like aiohttp,
293 # not to be passed into standard library functions like os.open etc.
295 # URL grammar (RFC 3986)
296 # pct-encoded = "%" HEXDIG HEXDIG
297 # reserved = gen-delims / sub-delims
298 # gen-delims = ":" / "/" / "?" / "#" / "[" / "]" / "@"
299 # sub-delims = "!" / "$" / "&" / "'" / "(" / ")"
300 # / "*" / "+" / "," / ";" / "="
301 # unreserved = ALPHA / DIGIT / "-" / "." / "_" / "~"
302 # URI = scheme ":" hier-part [ "?" query ] [ "#" fragment ]
303 # hier-part = "//" authority path-abempty
304 # / path-absolute
305 # / path-rootless
306 # / path-empty
307 # scheme = ALPHA *( ALPHA / DIGIT / "+" / "-" / "." )
308 # authority = [ userinfo "@" ] host [ ":" port ]
309 # userinfo = *( unreserved / pct-encoded / sub-delims / ":" )
310 # host = IP-literal / IPv4address / reg-name
311 # IP-literal = "[" ( IPv6address / IPvFuture ) "]"
312 # IPvFuture = "v" 1*HEXDIG "." 1*( unreserved / sub-delims / ":" )
313 # IPv6address = 6( h16 ":" ) ls32
314 # / "::" 5( h16 ":" ) ls32
315 # / [ h16 ] "::" 4( h16 ":" ) ls32
316 # / [ *1( h16 ":" ) h16 ] "::" 3( h16 ":" ) ls32
317 # / [ *2( h16 ":" ) h16 ] "::" 2( h16 ":" ) ls32
318 # / [ *3( h16 ":" ) h16 ] "::" h16 ":" ls32
319 # / [ *4( h16 ":" ) h16 ] "::" ls32
320 # / [ *5( h16 ":" ) h16 ] "::" h16
321 # / [ *6( h16 ":" ) h16 ] "::"
322 # ls32 = ( h16 ":" h16 ) / IPv4address
323 # ; least-significant 32 bits of address
324 # h16 = 1*4HEXDIG
325 # ; 16 bits of address represented in hexadecimal
326 # IPv4address = dec-octet "." dec-octet "." dec-octet "." dec-octet
327 # dec-octet = DIGIT ; 0-9
328 # / %x31-39 DIGIT ; 10-99
329 # / "1" 2DIGIT ; 100-199
330 # / "2" %x30-34 DIGIT ; 200-249
331 # / "25" %x30-35 ; 250-255
332 # reg-name = *( unreserved / pct-encoded / sub-delims )
333 # port = *DIGIT
334 # path = path-abempty ; begins with "/" or is empty
335 # / path-absolute ; begins with "/" but not "//"
336 # / path-noscheme ; begins with a non-colon segment
337 # / path-rootless ; begins with a segment
338 # / path-empty ; zero characters
339 # path-abempty = *( "/" segment )
340 # path-absolute = "/" [ segment-nz *( "/" segment ) ]
341 # path-noscheme = segment-nz-nc *( "/" segment )
342 # path-rootless = segment-nz *( "/" segment )
343 # path-empty = 0<pchar>
344 # segment = *pchar
345 # segment-nz = 1*pchar
346 # segment-nz-nc = 1*( unreserved / pct-encoded / sub-delims / "@" )
347 # ; non-zero-length segment without any colon ":"
348 # pchar = unreserved / pct-encoded / sub-delims / ":" / "@"
349 # query = *( pchar / "/" / "?" )
350 # fragment = *( pchar / "/" / "?" )
351 # URI-reference = URI / relative-ref
352 # relative-ref = relative-part [ "?" query ] [ "#" fragment ]
353 # relative-part = "//" authority path-abempty
354 # / path-absolute
355 # / path-noscheme
356 # / path-empty
357 # absolute-URI = scheme ":" hier-part [ "?" query ]
358 __slots__ = ("_cache", "_scheme", "_netloc", "_path", "_query", "_fragment")
360 _cache: _InternalURLCache
361 _scheme: str
362 _netloc: str
363 _path: str
364 _query: str
365 _fragment: str
367 def __new__(
368 cls,
369 val: Union[str, SplitResult, "URL", UndefinedType] = UNDEFINED,
370 *,
371 encoded: bool = False,
372 strict: bool | None = None,
373 ) -> "URL":
374 if strict is not None: # pragma: no cover
375 warnings.warn("strict parameter is ignored")
376 if type(val) is str:
377 return pre_encoded_url(val) if encoded else encode_url(val)
378 if type(val) is cls:
379 return val
380 if type(val) is SplitResult:
381 if not encoded:
382 raise ValueError("Cannot apply decoding to SplitResult")
383 return from_parts(*val)
384 if isinstance(val, str):
385 return pre_encoded_url(str(val)) if encoded else encode_url(str(val))
386 if val is UNDEFINED:
387 # Special case for UNDEFINED since it might be unpickling and we do
388 # not want to cache as the `__set_state__` call would mutate the URL
389 # object in the `pre_encoded_url` or `encoded_url` caches.
390 self = object.__new__(URL)
391 self._scheme = self._netloc = self._path = self._query = self._fragment = ""
392 self._cache = {}
393 return self
394 raise TypeError("Constructor parameter should be str")
396 @classmethod
397 def build(
398 cls,
399 *,
400 scheme: str = "",
401 authority: str = "",
402 user: str | None = None,
403 password: str | None = None,
404 host: str = "",
405 port: int | None = None,
406 path: str = "",
407 query: Query | None = None,
408 query_string: str = "",
409 fragment: str = "",
410 encoded: bool = False,
411 ) -> "URL":
412 """Creates and returns a new URL"""
414 if authority and (user or password or host or port):
415 raise ValueError(
416 'Can\'t mix "authority" with "user", "password", "host" or "port".'
417 )
418 if port is not None and not isinstance(port, int):
419 raise TypeError(f"The port is required to be int, got {type(port)!r}.")
420 if port and not host:
421 raise ValueError('Can\'t build URL with "port" but without "host".')
422 if query and query_string:
423 raise ValueError('Only one of "query" or "query_string" should be passed')
424 if (
425 scheme is None # type: ignore[redundant-expr]
426 or authority is None # type: ignore[redundant-expr]
427 or host is None # type: ignore[redundant-expr]
428 or path is None # type: ignore[redundant-expr]
429 or query_string is None # type: ignore[redundant-expr]
430 or fragment is None
431 ):
432 raise TypeError(
433 'NoneType is illegal for "scheme", "authority", "host", "path", '
434 '"query_string", and "fragment" args, use empty string instead.'
435 )
437 if query:
438 query_string = get_str_query(query) or ""
440 if encoded:
441 return build_pre_encoded_url(
442 scheme,
443 authority,
444 user,
445 password,
446 host,
447 port,
448 path,
449 query_string,
450 fragment,
451 )
453 self = object.__new__(URL)
454 self._scheme = scheme
455 _host: str | None = None
456 if authority:
457 user, password, _host, port = split_netloc(authority)
458 _host = _encode_host(_host, validate_host=False) if _host else ""
459 elif host:
460 _host = _encode_host(host, validate_host=True)
461 else:
462 self._netloc = ""
464 if _host is not None:
465 if port is not None:
466 port = None if port == DEFAULT_PORTS.get(scheme) else port
467 if user is None and password is None:
468 self._netloc = _host if port is None else f"{_host}:{port}"
469 else:
470 self._netloc = make_netloc(user, password, _host, port, True)
472 path = PATH_QUOTER(path) if path else path
473 if path and self._netloc:
474 if "." in path:
475 path = normalize_path(path)
476 if path[0] != "/":
477 msg = (
478 "Path in a URL with authority should "
479 "start with a slash ('/') if set"
480 )
481 raise ValueError(msg)
483 self._path = path
484 if not query and query_string:
485 query_string = QUERY_QUOTER(query_string)
486 self._query = query_string
487 self._fragment = FRAGMENT_QUOTER(fragment) if fragment else fragment
488 self._cache = {}
489 return self
491 def __init_subclass__(cls) -> NoReturn:
492 raise TypeError(f"Inheriting a class {cls!r} from URL is forbidden")
494 def __str__(self) -> str:
495 if not self._path and self._netloc and (self._query or self._fragment):
496 path = "/"
497 else:
498 path = self._path
499 if (port := self.explicit_port) is not None and port == DEFAULT_PORTS.get(
500 self._scheme
501 ):
502 # port normalization - using None for default ports to remove from rendering
503 # https://datatracker.ietf.org/doc/html/rfc3986.html#section-6.2.3
504 host = self.host_subcomponent
505 netloc = make_netloc(self.raw_user, self.raw_password, host, None)
506 else:
507 netloc = self._netloc
508 return unsplit_result(self._scheme, netloc, path, self._query, self._fragment)
510 def __repr__(self) -> str:
511 return f"{self.__class__.__name__}('{str(self)}')"
513 def __bytes__(self) -> bytes:
514 return str(self).encode("ascii")
516 def __eq__(self, other: object) -> bool:
517 if type(other) is not URL:
518 return NotImplemented
520 path1 = "/" if not self._path and self._netloc else self._path
521 path2 = "/" if not other._path and other._netloc else other._path
522 return (
523 self._scheme == other._scheme
524 and self._netloc == other._netloc
525 and path1 == path2
526 and self._query == other._query
527 and self._fragment == other._fragment
528 )
530 def __hash__(self) -> int:
531 if (ret := self._cache.get("hash")) is None:
532 path = "/" if not self._path and self._netloc else self._path
533 ret = self._cache["hash"] = hash(
534 (self._scheme, self._netloc, path, self._query, self._fragment)
535 )
536 return ret
538 def __le__(self, other: object) -> bool:
539 if type(other) is not URL:
540 return NotImplemented
541 return self._val <= other._val
543 def __lt__(self, other: object) -> bool:
544 if type(other) is not URL:
545 return NotImplemented
546 return self._val < other._val
548 def __ge__(self, other: object) -> bool:
549 if type(other) is not URL:
550 return NotImplemented
551 return self._val >= other._val
553 def __gt__(self, other: object) -> bool:
554 if type(other) is not URL:
555 return NotImplemented
556 return self._val > other._val
558 def __truediv__(self, name: str) -> "URL":
559 if not isinstance(name, str):
560 return NotImplemented # type: ignore[unreachable]
561 return self._make_child((str(name),))
563 def __mod__(self, query: Query) -> "URL":
564 return self.update_query(query)
566 def __bool__(self) -> bool:
567 return bool(self._netloc or self._path or self._query or self._fragment)
569 def __getstate__(self) -> tuple[SplitResult]:
570 return (tuple.__new__(SplitResult, self._val),)
572 def __setstate__(
573 self, state: tuple[SplitURLType] | tuple[None, _InternalURLCache]
574 ) -> None:
575 if state[0] is None and isinstance(state[1], dict):
576 # default style pickle
577 val = state[1]["_val"]
578 else:
579 unused: list[object]
580 val, *unused = state
581 self._scheme, self._netloc, self._path, self._query, self._fragment = val
582 self._cache = {}
584 def _cache_netloc(self) -> None:
585 """Cache the netloc parts of the URL."""
586 c = self._cache
587 split_loc = split_netloc(self._netloc)
588 c["raw_user"], c["raw_password"], c["raw_host"], c["explicit_port"] = split_loc
590 def is_absolute(self) -> bool:
591 """A check for absolute URLs.
593 Return True for absolute ones (having scheme or starting
594 with //), False otherwise.
596 Is is preferred to call the .absolute property instead
597 as it is cached.
598 """
599 return self.absolute
601 def is_default_port(self) -> bool:
602 """A check for default port.
604 Return True if port is default for specified scheme,
605 e.g. 'http://python.org' or 'http://python.org:80', False
606 otherwise.
608 Return False for relative URLs.
610 """
611 if (explicit := self.explicit_port) is None:
612 # If the explicit port is None, then the URL must be
613 # using the default port unless its a relative URL
614 # which does not have an implicit port / default port
615 return self._netloc != ""
616 return explicit == DEFAULT_PORTS.get(self._scheme)
618 def origin(self) -> "URL":
619 """Return an URL with scheme, host and port parts only.
621 user, password, path, query and fragment are removed.
623 """
624 # TODO: add a keyword-only option for keeping user/pass maybe?
625 return self._origin
627 @cached_property
628 def _val(self) -> SplitURLType:
629 return (self._scheme, self._netloc, self._path, self._query, self._fragment)
631 @cached_property
632 def _origin(self) -> "URL":
633 """Return an URL with scheme, host and port parts only.
635 user, password, path, query and fragment are removed.
636 """
637 if not (netloc := self._netloc):
638 raise ValueError("URL should be absolute")
639 if not (scheme := self._scheme):
640 raise ValueError("URL should have scheme")
641 if "@" in netloc:
642 encoded_host = self.host_subcomponent
643 netloc = make_netloc(None, None, encoded_host, self.explicit_port)
644 elif not self._path and not self._query and not self._fragment:
645 return self
646 return from_parts(scheme, netloc, "", "", "")
648 def relative(self) -> "URL":
649 """Return a relative part of the URL.
651 scheme, user, password, host and port are removed.
653 """
654 if not self._netloc:
655 raise ValueError("URL should be absolute")
656 return from_parts("", "", self._path, self._query, self._fragment)
658 @cached_property
659 def absolute(self) -> bool:
660 """A check for absolute URLs.
662 Return True for absolute ones (having scheme or starting
663 with //), False otherwise.
665 """
666 # `netloc`` is an empty string for relative URLs
667 # Checking `netloc` is faster than checking `hostname`
668 # because `hostname` is a property that does some extra work
669 # to parse the host from the `netloc`
670 return self._netloc != ""
672 @cached_property
673 def scheme(self) -> str:
674 """Scheme for absolute URLs.
676 Empty string for relative URLs or URLs starting with //
678 """
679 return self._scheme
681 @cached_property
682 def raw_authority(self) -> str:
683 """Encoded authority part of URL.
685 Empty string for relative URLs.
687 """
688 return self._netloc
690 @cached_property
691 def authority(self) -> str:
692 """Decoded authority part of URL.
694 Empty string for relative URLs.
696 """
697 return make_netloc(self.user, self.password, self.host, self.port)
699 @cached_property
700 def raw_user(self) -> str | None:
701 """Encoded user part of URL.
703 None if user is missing.
705 """
706 # not .username
707 self._cache_netloc()
708 return self._cache["raw_user"]
710 @cached_property
711 def user(self) -> str | None:
712 """Decoded user part of URL.
714 None if user is missing.
716 """
717 if (raw_user := self.raw_user) is None:
718 return None
719 return UNQUOTER(raw_user)
721 @cached_property
722 def raw_password(self) -> str | None:
723 """Encoded password part of URL.
725 None if password is missing.
727 """
728 self._cache_netloc()
729 return self._cache["raw_password"]
731 @cached_property
732 def password(self) -> str | None:
733 """Decoded password part of URL.
735 None if password is missing.
737 """
738 if (raw_password := self.raw_password) is None:
739 return None
740 return UNQUOTER(raw_password)
742 @cached_property
743 def raw_host(self) -> str | None:
744 """Encoded host part of URL.
746 None for relative URLs.
748 When working with IPv6 addresses, use the `host_subcomponent` property instead
749 as it will return the host subcomponent with brackets.
750 """
751 # Use host instead of hostname for sake of shortness
752 # May add .hostname prop later
753 self._cache_netloc()
754 return self._cache["raw_host"]
756 @cached_property
757 def host(self) -> str | None:
758 """Decoded host part of URL.
760 None for relative URLs.
762 """
763 if (raw := self.raw_host) is None:
764 return None
765 if raw and raw[-1].isdigit() or ":" in raw:
766 # IP addresses are never IDNA encoded
767 return raw
768 return _idna_decode(raw)
770 @cached_property
771 def host_subcomponent(self) -> str | None:
772 """Return the host subcomponent part of URL.
774 None for relative URLs.
776 https://datatracker.ietf.org/doc/html/rfc3986#section-3.2.2
778 `IP-literal = "[" ( IPv6address / IPvFuture ) "]"`
780 Examples:
781 - `http://example.com:8080` -> `example.com`
782 - `http://example.com:80` -> `example.com`
783 - `https://127.0.0.1:8443` -> `127.0.0.1`
784 - `https://[::1]:8443` -> `[::1]`
785 - `http://[::1]` -> `[::1]`
787 """
788 if (raw := self.raw_host) is None:
789 return None
790 return f"[{raw}]" if ":" in raw else raw
792 @cached_property
793 def host_port_subcomponent(self) -> str | None:
794 """Return the host and port subcomponent part of URL.
796 Trailing dots are removed from the host part.
798 This value is suitable for use in the Host header of an HTTP request.
800 None for relative URLs.
802 https://datatracker.ietf.org/doc/html/rfc3986#section-3.2.2
803 `IP-literal = "[" ( IPv6address / IPvFuture ) "]"`
804 https://datatracker.ietf.org/doc/html/rfc3986#section-3.2.3
805 port = *DIGIT
807 Examples:
808 - `http://example.com:8080` -> `example.com:8080`
809 - `http://example.com:80` -> `example.com`
810 - `http://example.com.:80` -> `example.com`
811 - `https://127.0.0.1:8443` -> `127.0.0.1:8443`
812 - `https://[::1]:8443` -> `[::1]:8443`
813 - `http://[::1]` -> `[::1]`
815 """
816 if (raw := self.raw_host) is None:
817 return None
818 if raw[-1] == ".":
819 # Remove all trailing dots from the netloc as while
820 # they are valid FQDNs in DNS, TLS validation fails.
821 # See https://github.com/aio-libs/aiohttp/issues/3636.
822 # To avoid string manipulation we only call rstrip if
823 # the last character is a dot.
824 raw = raw.rstrip(".")
825 port = self.explicit_port
826 if port is None or port == DEFAULT_PORTS.get(self._scheme):
827 return f"[{raw}]" if ":" in raw else raw
828 return f"[{raw}]:{port}" if ":" in raw else f"{raw}:{port}"
830 @cached_property
831 def port(self) -> int | None:
832 """Port part of URL, with scheme-based fallback.
834 None for relative URLs or URLs without explicit port and
835 scheme without default port substitution.
837 """
838 if (explicit_port := self.explicit_port) is not None:
839 return explicit_port
840 return DEFAULT_PORTS.get(self._scheme)
842 @cached_property
843 def explicit_port(self) -> int | None:
844 """Port part of URL, without scheme-based fallback.
846 None for relative URLs or URLs without explicit port.
848 """
849 self._cache_netloc()
850 return self._cache["explicit_port"]
852 @cached_property
853 def raw_path(self) -> str:
854 """Encoded path of URL.
856 / for absolute URLs without path part.
858 """
859 return self._path if self._path or not self._netloc else "/"
861 @cached_property
862 def path(self) -> str:
863 """Decoded path of URL.
865 / for absolute URLs without path part.
867 """
868 return PATH_UNQUOTER(self._path) if self._path else "/" if self._netloc else ""
870 @cached_property
871 def path_safe(self) -> str:
872 """Decoded path of URL.
874 / for absolute URLs without path part.
876 / (%2F) and % (%25) are not decoded
878 """
879 if self._path:
880 return PATH_SAFE_UNQUOTER(self._path)
881 return "/" if self._netloc else ""
883 @cached_property
884 def _parsed_query(self) -> list[tuple[str, str]]:
885 """Parse query part of URL."""
886 return query_to_pairs(self._query)
888 @cached_property
889 def query(self) -> "MultiDictProxy[str]":
890 """A MultiDictProxy representing parsed query parameters in decoded
891 representation.
893 Empty value if URL has no query part.
895 """
896 return MultiDictProxy(MultiDict(self._parsed_query))
898 @cached_property
899 def raw_query_string(self) -> str:
900 """Encoded query part of URL.
902 Empty string if query is missing.
904 """
905 return self._query
907 @cached_property
908 def query_string(self) -> str:
909 """Decoded query part of URL.
911 Empty string if query is missing.
913 """
914 return QS_UNQUOTER(self._query) if self._query else ""
916 @cached_property
917 def path_qs(self) -> str:
918 """Decoded path of URL with query."""
919 return self.path if not (q := self.query_string) else f"{self.path}?{q}"
921 @cached_property
922 def raw_path_qs(self) -> str:
923 """Encoded path of URL with query."""
924 if q := self._query:
925 return f"{self._path}?{q}" if self._path or not self._netloc else f"/?{q}"
926 return self._path if self._path or not self._netloc else "/"
928 @cached_property
929 def raw_fragment(self) -> str:
930 """Encoded fragment part of URL.
932 Empty string if fragment is missing.
934 """
935 return self._fragment
937 @cached_property
938 def fragment(self) -> str:
939 """Decoded fragment part of URL.
941 Empty string if fragment is missing.
943 """
944 return UNQUOTER(self._fragment) if self._fragment else ""
946 @cached_property
947 def raw_parts(self) -> tuple[str, ...]:
948 """A tuple containing encoded *path* parts.
950 ('/',) for absolute URLs if *path* is missing.
952 """
953 path = self._path
954 if self._netloc:
955 return ("/", *path[1:].split("/")) if path else ("/",)
956 if path and path[0] == "/":
957 return ("/", *path[1:].split("/"))
958 return tuple(path.split("/"))
960 @cached_property
961 def parts(self) -> tuple[str, ...]:
962 """A tuple containing decoded *path* parts.
964 ('/',) for absolute URLs if *path* is missing.
966 """
967 return tuple(UNQUOTER(part) for part in self.raw_parts)
969 @cached_property
970 def parent(self) -> "URL":
971 """A new URL with last part of path removed and cleaned up query and
972 fragment.
974 """
975 path = self._path
976 if not path or path == "/":
977 if self._fragment or self._query:
978 return from_parts(self._scheme, self._netloc, path, "", "")
979 return self
980 parts = path.split("/")
981 return from_parts(self._scheme, self._netloc, "/".join(parts[:-1]), "", "")
983 @cached_property
984 def raw_name(self) -> str:
985 """The last part of raw_parts."""
986 parts = self.raw_parts
987 if not self._netloc:
988 return parts[-1]
989 parts = parts[1:]
990 return parts[-1] if parts else ""
992 @cached_property
993 def name(self) -> str:
994 """The last part of parts."""
995 return UNQUOTER(self.raw_name)
997 @cached_property
998 def raw_suffix(self) -> str:
999 name = self.raw_name
1000 i = name.rfind(".")
1001 return name[i:] if 0 < i < len(name) - 1 else ""
1003 @cached_property
1004 def suffix(self) -> str:
1005 return UNQUOTER(self.raw_suffix)
1007 @cached_property
1008 def raw_suffixes(self) -> tuple[str, ...]:
1009 name = self.raw_name
1010 if name.endswith("."):
1011 return ()
1012 name = name.lstrip(".")
1013 return tuple("." + suffix for suffix in name.split(".")[1:])
1015 @cached_property
1016 def suffixes(self) -> tuple[str, ...]:
1017 return tuple(UNQUOTER(suffix) for suffix in self.raw_suffixes)
1019 def _make_child(self, paths: "Sequence[str]", encoded: bool = False) -> "URL":
1020 """
1021 add paths to self._path, accounting for absolute vs relative paths,
1022 keep existing, but do not create new, empty segments
1023 """
1024 parsed: list[str] = []
1025 needs_normalize: bool = False
1026 for idx, path in enumerate(reversed(paths)):
1027 # empty segment of last is not removed
1028 last = idx == 0
1029 if path and path[0] == "/":
1030 raise ValueError(
1031 f"Appending path {path!r} starting from slash is forbidden"
1032 )
1033 # We need to quote the path if it is not already encoded
1034 # This cannot be done at the end because the existing
1035 # path is already quoted and we do not want to double quote
1036 # the existing path.
1037 path = path if encoded else PATH_QUOTER(path)
1038 needs_normalize |= "." in path
1039 segments = path.split("/")
1040 segments.reverse()
1041 # remove trailing empty segment for all but the last path
1042 parsed += segments[1:] if not last and segments[0] == "" else segments
1044 if (path := self._path) and (old_segments := path.split("/")):
1045 # If the old path ends with a slash, the last segment is an empty string
1046 # and should be removed before adding the new path segments.
1047 old = old_segments[:-1] if old_segments[-1] == "" else old_segments
1048 old.reverse()
1049 parsed += old
1051 # If the netloc is present, inject a leading slash when adding a
1052 # path to an absolute URL where there was none before.
1053 if (netloc := self._netloc) and parsed and parsed[-1] != "":
1054 parsed.append("")
1056 parsed.reverse()
1057 if not netloc or not needs_normalize:
1058 return from_parts(self._scheme, netloc, "/".join(parsed), "", "")
1060 path = "/".join(normalize_path_segments(parsed))
1061 # If normalizing the path segments removed the leading slash, add it back.
1062 if path and path[0] != "/":
1063 path = f"/{path}"
1064 return from_parts(self._scheme, netloc, path, "", "")
1066 def with_scheme(self, scheme: str) -> "URL":
1067 """Return a new URL with scheme replaced."""
1068 # N.B. doesn't cleanup query/fragment
1069 if not isinstance(scheme, str):
1070 raise TypeError("Invalid scheme type")
1071 lower_scheme = scheme.lower()
1072 netloc = self._netloc
1073 if not netloc and lower_scheme in SCHEME_REQUIRES_HOST:
1074 msg = (
1075 "scheme replacement is not allowed for "
1076 f"relative URLs for the {lower_scheme} scheme"
1077 )
1078 raise ValueError(msg)
1079 return from_parts(lower_scheme, netloc, self._path, self._query, self._fragment)
1081 def with_user(self, user: str | None) -> "URL":
1082 """Return a new URL with user replaced.
1084 Autoencode user if needed.
1086 Clear user/password if user is None.
1088 """
1089 # N.B. doesn't cleanup query/fragment
1090 if user is None:
1091 password = None
1092 elif isinstance(user, str):
1093 user = QUOTER(user)
1094 password = self.raw_password
1095 else:
1096 raise TypeError("Invalid user type")
1097 if not (netloc := self._netloc):
1098 raise ValueError("user replacement is not allowed for relative URLs")
1099 encoded_host = self.host_subcomponent or ""
1100 netloc = make_netloc(user, password, encoded_host, self.explicit_port)
1101 return from_parts(self._scheme, netloc, self._path, self._query, self._fragment)
1103 def with_password(self, password: str | None) -> "URL":
1104 """Return a new URL with password replaced.
1106 Autoencode password if needed.
1108 Clear password if argument is None.
1110 """
1111 # N.B. doesn't cleanup query/fragment
1112 if password is None:
1113 pass
1114 elif isinstance(password, str):
1115 password = QUOTER(password)
1116 else:
1117 raise TypeError("Invalid password type")
1118 if not (netloc := self._netloc):
1119 raise ValueError("password replacement is not allowed for relative URLs")
1120 encoded_host = self.host_subcomponent or ""
1121 port = self.explicit_port
1122 netloc = make_netloc(self.raw_user, password, encoded_host, port)
1123 return from_parts(self._scheme, netloc, self._path, self._query, self._fragment)
1125 def with_host(self, host: str) -> "URL":
1126 """Return a new URL with host replaced.
1128 Autoencode host if needed.
1130 Changing host for relative URLs is not allowed, use .join()
1131 instead.
1133 """
1134 # N.B. doesn't cleanup query/fragment
1135 if not isinstance(host, str):
1136 raise TypeError("Invalid host type")
1137 if not (netloc := self._netloc):
1138 raise ValueError("host replacement is not allowed for relative URLs")
1139 if not host:
1140 raise ValueError("host removing is not allowed")
1141 encoded_host = _encode_host(host, validate_host=True) if host else ""
1142 port = self.explicit_port
1143 netloc = make_netloc(self.raw_user, self.raw_password, encoded_host, port)
1144 return from_parts(self._scheme, netloc, self._path, self._query, self._fragment)
1146 def with_port(self, port: int | None) -> "URL":
1147 """Return a new URL with port replaced.
1149 Clear port to default if None is passed.
1151 """
1152 # N.B. doesn't cleanup query/fragment
1153 if port is not None:
1154 if isinstance(port, bool) or not isinstance(port, int):
1155 raise TypeError(f"port should be int or None, got {type(port)}")
1156 if not (0 <= port <= 65535):
1157 raise ValueError(f"port must be between 0 and 65535, got {port}")
1158 if not (netloc := self._netloc):
1159 raise ValueError("port replacement is not allowed for relative URLs")
1160 encoded_host = self.host_subcomponent or ""
1161 netloc = make_netloc(self.raw_user, self.raw_password, encoded_host, port)
1162 return from_parts(self._scheme, netloc, self._path, self._query, self._fragment)
1164 def with_path(
1165 self,
1166 path: str,
1167 *,
1168 encoded: bool = False,
1169 keep_query: bool = False,
1170 keep_fragment: bool = False,
1171 ) -> "URL":
1172 """Return a new URL with path replaced."""
1173 netloc = self._netloc
1174 if not encoded:
1175 path = PATH_QUOTER(path)
1176 if netloc:
1177 path = normalize_path(path) if "." in path else path
1178 if path and path[0] != "/":
1179 path = f"/{path}"
1180 query = self._query if keep_query else ""
1181 fragment = self._fragment if keep_fragment else ""
1182 return from_parts(self._scheme, netloc, path, query, fragment)
1184 @overload
1185 def with_query(self, query: Query) -> "URL": ...
1187 @overload
1188 def with_query(self, **kwargs: QueryVariable) -> "URL": ...
1190 def with_query(self, *args: Any, **kwargs: Any) -> "URL":
1191 """Return a new URL with query part replaced.
1193 Accepts any Mapping (e.g. dict, multidict.MultiDict instances)
1194 or str, autoencode the argument if needed.
1196 A sequence of (key, value) pairs is supported as well.
1198 It also can take an arbitrary number of keyword arguments.
1200 Clear query if None is passed.
1202 """
1203 # N.B. doesn't cleanup query/fragment
1204 query = get_str_query(*args, **kwargs) or ""
1205 return from_parts_uncached(
1206 self._scheme, self._netloc, self._path, query, self._fragment
1207 )
1209 @overload
1210 def extend_query(self, query: Query) -> "URL": ...
1212 @overload
1213 def extend_query(self, **kwargs: QueryVariable) -> "URL": ...
1215 def extend_query(self, *args: Any, **kwargs: Any) -> "URL":
1216 """Return a new URL with query part combined with the existing.
1218 This method will not remove existing query parameters.
1220 Example:
1221 >>> url = URL('http://example.com/?a=1&b=2')
1222 >>> url.extend_query(a=3, c=4)
1223 URL('http://example.com/?a=1&b=2&a=3&c=4')
1224 """
1225 if not (new_query := get_str_query(*args, **kwargs)):
1226 return self
1227 if query := self._query:
1228 # both strings are already encoded so we can use a simple
1229 # string join
1230 query += new_query if query[-1] == "&" else f"&{new_query}"
1231 else:
1232 query = new_query
1233 return from_parts_uncached(
1234 self._scheme, self._netloc, self._path, query, self._fragment
1235 )
1237 @overload
1238 def update_query(self, query: Query) -> "URL": ...
1240 @overload
1241 def update_query(self, **kwargs: QueryVariable) -> "URL": ...
1243 def update_query(self, *args: Any, **kwargs: Any) -> "URL":
1244 """Return a new URL with query part updated.
1246 This method will overwrite existing query parameters.
1248 Example:
1249 >>> url = URL('http://example.com/?a=1&b=2')
1250 >>> url.update_query(a=3, c=4)
1251 URL('http://example.com/?a=3&b=2&c=4')
1252 """
1253 in_query: (
1254 str
1255 | Mapping[str, QueryVariable]
1256 | Sequence[tuple[str | istr, SimpleQuery]]
1257 | None
1258 )
1259 if kwargs:
1260 if args:
1261 msg = "Either kwargs or single query parameter must be present"
1262 raise ValueError(msg)
1263 in_query = kwargs
1264 elif len(args) == 1:
1265 in_query = args[0]
1266 else:
1267 raise ValueError("Either kwargs or single query parameter must be present")
1269 if in_query is None:
1270 query = ""
1271 elif not in_query:
1272 query = self._query
1273 elif isinstance(in_query, Mapping):
1274 qm: MultiDict[QueryVariable] = MultiDict(self._parsed_query)
1275 qm.update(in_query)
1276 query = get_str_query_from_sequence_iterable(qm.items())
1277 elif isinstance(in_query, str):
1278 qstr: MultiDict[str] = MultiDict(self._parsed_query)
1279 qstr.update(query_to_pairs(in_query))
1280 query = get_str_query_from_iterable(qstr.items())
1281 elif isinstance(in_query, (bytes, bytearray, memoryview)):
1282 msg = "Invalid query type: bytes, bytearray and memoryview are forbidden"
1283 raise TypeError(msg)
1284 elif isinstance(in_query, Sequence):
1285 # We don't expect sequence values if we're given a list of pairs
1286 # already; only mappings like builtin `dict` which can't have the
1287 # same key pointing to multiple values are allowed to use
1288 # `_query_seq_pairs`.
1289 if TYPE_CHECKING:
1290 in_query = cast(
1291 Sequence[tuple[Union[str, istr], SimpleQuery]], in_query
1292 )
1293 qs: MultiDict[SimpleQuery] = MultiDict(self._parsed_query)
1294 qs.update(in_query)
1295 query = get_str_query_from_iterable(qs.items())
1296 else:
1297 raise TypeError(
1298 "Invalid query type: only str, mapping or "
1299 "sequence of (key, value) pairs is allowed"
1300 )
1301 return from_parts_uncached(
1302 self._scheme, self._netloc, self._path, query, self._fragment
1303 )
1305 def without_query_params(self, *query_params: str) -> "URL":
1306 """Remove some keys from query part and return new URL."""
1307 params_to_remove = set(query_params) & self.query.keys()
1308 if not params_to_remove:
1309 return self
1310 return self.with_query(
1311 tuple(
1312 (name, value)
1313 for name, value in self.query.items()
1314 if name not in params_to_remove
1315 )
1316 )
1318 def with_fragment(self, fragment: str | None) -> "URL":
1319 """Return a new URL with fragment replaced.
1321 Autoencode fragment if needed.
1323 Clear fragment to default if None is passed.
1325 """
1326 # N.B. doesn't cleanup query/fragment
1327 if fragment is None:
1328 raw_fragment = ""
1329 elif not isinstance(fragment, str):
1330 raise TypeError("Invalid fragment type")
1331 else:
1332 raw_fragment = FRAGMENT_QUOTER(fragment)
1333 if self._fragment == raw_fragment:
1334 return self
1335 return from_parts(
1336 self._scheme, self._netloc, self._path, self._query, raw_fragment
1337 )
1339 def with_name(
1340 self,
1341 name: str,
1342 *,
1343 keep_query: bool = False,
1344 keep_fragment: bool = False,
1345 ) -> "URL":
1346 """Return a new URL with name (last part of path) replaced.
1348 Query and fragment parts are cleaned up.
1350 Name is encoded if needed.
1352 """
1353 # N.B. DOES cleanup query/fragment
1354 if not isinstance(name, str):
1355 raise TypeError("Invalid name type")
1356 if "/" in name:
1357 raise ValueError("Slash in name is not allowed")
1358 name = PATH_QUOTER(name)
1359 if name in (".", ".."):
1360 raise ValueError(". and .. values are forbidden")
1361 parts = list(self.raw_parts)
1362 if netloc := self._netloc:
1363 if len(parts) == 1:
1364 parts.append(name)
1365 else:
1366 parts[-1] = name
1367 parts[0] = "" # replace leading '/'
1368 else:
1369 parts[-1] = name
1370 if parts[0] == "/":
1371 parts[0] = "" # replace leading '/'
1373 query = self._query if keep_query else ""
1374 fragment = self._fragment if keep_fragment else ""
1375 return from_parts(self._scheme, netloc, "/".join(parts), query, fragment)
1377 def with_suffix(
1378 self,
1379 suffix: str,
1380 *,
1381 keep_query: bool = False,
1382 keep_fragment: bool = False,
1383 ) -> "URL":
1384 """Return a new URL with suffix (file extension of name) replaced.
1386 Query and fragment parts are cleaned up.
1388 suffix is encoded if needed.
1389 """
1390 if not isinstance(suffix, str):
1391 raise TypeError("Invalid suffix type")
1392 if suffix and not suffix[0] == "." or suffix == "." or "/" in suffix:
1393 raise ValueError(f"Invalid suffix {suffix!r}")
1394 name = self.raw_name
1395 if not name:
1396 raise ValueError(f"{self!r} has an empty name")
1397 old_suffix = self.raw_suffix
1398 suffix = PATH_QUOTER(suffix)
1399 name = name + suffix if not old_suffix else name[: -len(old_suffix)] + suffix
1400 if name in (".", ".."):
1401 raise ValueError(". and .. values are forbidden")
1402 parts = list(self.raw_parts)
1403 if netloc := self._netloc:
1404 if len(parts) == 1:
1405 parts.append(name)
1406 else:
1407 parts[-1] = name
1408 parts[0] = "" # replace leading '/'
1409 else:
1410 parts[-1] = name
1411 if parts[0] == "/":
1412 parts[0] = "" # replace leading '/'
1414 query = self._query if keep_query else ""
1415 fragment = self._fragment if keep_fragment else ""
1416 return from_parts(self._scheme, netloc, "/".join(parts), query, fragment)
1418 def join(self, url: "URL") -> "URL":
1419 """Join URLs
1421 Construct a full (“absolute”) URL by combining a “base URL”
1422 (self) with another URL (url).
1424 Informally, this uses components of the base URL, in
1425 particular the addressing scheme, the network location and
1426 (part of) the path, to provide missing components in the
1427 relative URL.
1429 """
1430 if type(url) is not URL:
1431 raise TypeError("url should be URL")
1433 scheme = url._scheme or self._scheme
1434 if scheme != self._scheme or scheme not in USES_RELATIVE:
1435 return url
1437 # scheme is in uses_authority as uses_authority is a superset of uses_relative
1438 if (join_netloc := url._netloc) and scheme in USES_AUTHORITY:
1439 return from_parts(scheme, join_netloc, url._path, url._query, url._fragment)
1441 orig_path = self._path
1442 if join_path := url._path:
1443 if join_path[0] == "/":
1444 path = join_path
1445 elif not orig_path:
1446 path = f"/{join_path}"
1447 elif orig_path[-1] == "/":
1448 path = f"{orig_path}{join_path}"
1449 else:
1450 # …
1451 # and relativizing ".."
1452 # parts[0] is / for absolute urls,
1453 # this join will add a double slash there
1454 path = "/".join([*self.parts[:-1], ""]) + join_path
1455 # which has to be removed
1456 if orig_path[0] == "/":
1457 path = path[1:]
1458 path = normalize_path(path) if "." in path else path
1459 else:
1460 path = orig_path
1462 return from_parts(
1463 scheme,
1464 self._netloc,
1465 path,
1466 url._query if join_path or url._query else self._query,
1467 url._fragment if join_path or url._fragment else self._fragment,
1468 )
1470 def joinpath(self, *other: str, encoded: bool = False) -> "URL":
1471 """Return a new URL with the elements in other appended to the path."""
1472 return self._make_child(other, encoded=encoded)
1474 def human_repr(self) -> str:
1475 """Return decoded human readable string for URL representation."""
1476 user = human_quote(self.user, "#/:?@[]")
1477 password = human_quote(self.password, "#/:?@[]")
1478 if (host := self.host) and ":" in host:
1479 host = f"[{host}]"
1480 path = human_quote(self.path, "#?")
1481 if TYPE_CHECKING:
1482 assert path is not None
1483 query_string = "&".join(
1484 "{}={}".format(human_quote(k, "#&+;="), human_quote(v, "#&+;="))
1485 for k, v in self.query.items()
1486 )
1487 fragment = human_quote(self.fragment, "")
1488 if TYPE_CHECKING:
1489 assert fragment is not None
1490 netloc = make_netloc(user, password, host, self.explicit_port)
1491 return unsplit_result(self._scheme, netloc, path, query_string, fragment)
1493 if HAS_PYDANTIC: # pragma: no cover
1494 # Borrowed from https://docs.pydantic.dev/latest/concepts/types/#handling-third-party-types
1495 @classmethod
1496 def __get_pydantic_json_schema__(
1497 cls, core_schema: core_schema.CoreSchema, handler: GetJsonSchemaHandler
1498 ) -> JsonSchemaValue:
1499 field_schema: dict[str, Any] = {}
1500 field_schema.update(type="string", format="uri")
1501 return field_schema
1503 @classmethod
1504 def __get_pydantic_core_schema__(
1505 cls, source_type: type[Self] | type[str], handler: GetCoreSchemaHandler
1506 ) -> core_schema.CoreSchema:
1507 from_str_schema = core_schema.chain_schema(
1508 [
1509 core_schema.str_schema(),
1510 core_schema.no_info_plain_validator_function(URL),
1511 ]
1512 )
1514 return core_schema.json_or_python_schema(
1515 json_schema=from_str_schema,
1516 python_schema=core_schema.union_schema(
1517 [
1518 # check if it's an instance first before doing any further work
1519 core_schema.is_instance_schema(URL),
1520 from_str_schema,
1521 ]
1522 ),
1523 serialization=core_schema.plain_serializer_function_ser_schema(str),
1524 )
1527_DEFAULT_IDNA_SIZE = 256
1528_DEFAULT_ENCODE_SIZE = 512
1531@lru_cache(_DEFAULT_IDNA_SIZE)
1532def _idna_decode(raw: str) -> str:
1533 try:
1534 return idna.decode(raw.encode("ascii"))
1535 except UnicodeError: # e.g. '::1'
1536 return raw.encode("ascii").decode("idna")
1539@lru_cache(_DEFAULT_IDNA_SIZE)
1540def _idna_encode(host: str) -> str:
1541 try:
1542 return idna.encode(host, uts46=True).decode("ascii")
1543 except UnicodeError:
1544 return host.encode("idna").decode("ascii")
1547@lru_cache(_DEFAULT_ENCODE_SIZE)
1548def _encode_host(host: str, validate_host: bool) -> str:
1549 """Encode host part of URL."""
1550 # If the host ends with a digit or contains a colon, its likely
1551 # an IP address.
1552 if host and (host[-1].isdigit() or ":" in host):
1553 raw_ip, sep, zone = host.partition("%")
1554 # If it looks like an IP, we check with _ip_compressed_version
1555 # and fall-through if its not an IP address. This is a performance
1556 # optimization to avoid parsing IP addresses as much as possible
1557 # because it is orders of magnitude slower than almost any other
1558 # operation this library does.
1559 # Might be an IP address, check it
1560 #
1561 # IP Addresses can look like:
1562 # https://datatracker.ietf.org/doc/html/rfc3986#section-3.2.2
1563 # - 127.0.0.1 (last character is a digit)
1564 # - 2001:db8::ff00:42:8329 (contains a colon)
1565 # - 2001:db8::ff00:42:8329%eth0 (contains a colon)
1566 # - [2001:db8::ff00:42:8329] (contains a colon -- brackets should
1567 # have been removed before it gets here)
1568 # Rare IP Address formats are not supported per:
1569 # https://datatracker.ietf.org/doc/html/rfc3986#section-7.4
1570 #
1571 # IP parsing is slow, so its wrapped in an LRU
1572 try:
1573 ip = ip_address(raw_ip)
1574 except ValueError:
1575 pass
1576 else:
1577 # These checks should not happen in the
1578 # LRU to keep the cache size small
1579 host = ip.compressed
1580 if ip.version == 6:
1581 return f"[{host}%{zone}]" if sep else f"[{host}]"
1582 return f"{host}%{zone}" if sep else host
1584 # IDNA encoding is slow, skip it for ASCII-only strings
1585 if host.isascii():
1586 # Check for invalid characters explicitly; _idna_encode() does this
1587 # for non-ascii host names.
1588 host = host.lower()
1589 if validate_host and (invalid := NOT_REG_NAME.search(host)):
1590 value, pos, extra = invalid.group(), invalid.start(), ""
1591 if value == "@" or (value == ":" and "@" in host[pos:]):
1592 # this looks like an authority string
1593 extra = (
1594 ", if the value includes a username or password, "
1595 "use 'authority' instead of 'host'"
1596 )
1597 raise ValueError(
1598 f"Host {host!r} cannot contain {value!r} (at position {pos}){extra}"
1599 ) from None
1600 return host
1602 return _idna_encode(host)
1605@rewrite_module
1606def cache_clear() -> None:
1607 """Clear all LRU caches."""
1608 _idna_encode.cache_clear()
1609 _idna_decode.cache_clear()
1610 _encode_host.cache_clear()
1613@rewrite_module
1614def cache_info() -> CacheInfo:
1615 """Report cache statistics."""
1616 return {
1617 "idna_encode": _idna_encode.cache_info(),
1618 "idna_decode": _idna_decode.cache_info(),
1619 "ip_address": _encode_host.cache_info(),
1620 "host_validate": _encode_host.cache_info(),
1621 "encode_host": _encode_host.cache_info(),
1622 }
1625@rewrite_module
1626def cache_configure(
1627 *,
1628 idna_encode_size: int | None = _DEFAULT_IDNA_SIZE,
1629 idna_decode_size: int | None = _DEFAULT_IDNA_SIZE,
1630 ip_address_size: int | None | UndefinedType = UNDEFINED,
1631 host_validate_size: int | None | UndefinedType = UNDEFINED,
1632 encode_host_size: int | None | UndefinedType = UNDEFINED,
1633) -> None:
1634 """Configure LRU cache sizes."""
1635 global _idna_decode, _idna_encode, _encode_host
1636 # ip_address_size, host_validate_size are no longer
1637 # used, but are kept for backwards compatibility.
1638 if ip_address_size is not UNDEFINED or host_validate_size is not UNDEFINED:
1639 warnings.warn(
1640 "cache_configure() no longer accepts the "
1641 "ip_address_size or host_validate_size arguments, "
1642 "they are used to set the encode_host_size instead "
1643 "and will be removed in the future",
1644 DeprecationWarning,
1645 stacklevel=2,
1646 )
1648 if encode_host_size is not None:
1649 for size in (ip_address_size, host_validate_size):
1650 if size is None:
1651 encode_host_size = None
1652 elif encode_host_size is UNDEFINED:
1653 if size is not UNDEFINED:
1654 encode_host_size = size
1655 elif size is not UNDEFINED:
1656 if TYPE_CHECKING:
1657 assert isinstance(size, int)
1658 assert isinstance(encode_host_size, int)
1659 encode_host_size = max(size, encode_host_size)
1660 if encode_host_size is UNDEFINED:
1661 encode_host_size = _DEFAULT_ENCODE_SIZE
1663 _encode_host = lru_cache(encode_host_size)(_encode_host.__wrapped__)
1664 _idna_decode = lru_cache(idna_decode_size)(_idna_decode.__wrapped__)
1665 _idna_encode = lru_cache(idna_encode_size)(_idna_encode.__wrapped__)