Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/yarl/_url.py: 40%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import re
2import sys
3import warnings
4from collections.abc import Mapping, Sequence
5from enum import Enum
6from functools import _CacheInfo, lru_cache
7from ipaddress import ip_address
8from typing import TYPE_CHECKING, Any, NoReturn, TypedDict, TypeVar, Union, overload
9from urllib.parse import SplitResult, uses_relative
11import idna
12from multidict import MultiDict, MultiDictProxy
13from propcache.api import under_cached_property as cached_property
15from ._parse import (
16 USES_AUTHORITY,
17 SplitURLType,
18 make_netloc,
19 query_to_pairs,
20 split_netloc,
21 split_url,
22 unsplit_result,
23)
24from ._path import normalize_path, normalize_path_segments
25from ._query import (
26 Query,
27 QueryVariable,
28 SimpleQuery,
29 get_str_query,
30 get_str_query_from_iterable,
31 get_str_query_from_sequence_iterable,
32)
33from ._quoters import (
34 FRAGMENT_QUOTER,
35 FRAGMENT_REQUOTER,
36 PATH_QUOTER,
37 PATH_REQUOTER,
38 PATH_SAFE_UNQUOTER,
39 PATH_UNQUOTER,
40 QS_UNQUOTER,
41 QUERY_QUOTER,
42 QUERY_REQUOTER,
43 QUOTER,
44 REQUOTER,
45 UNQUOTER,
46 human_quote,
47)
49DEFAULT_PORTS = {"http": 80, "https": 443, "ws": 80, "wss": 443, "ftp": 21}
50USES_RELATIVE = frozenset(uses_relative)
52# Special schemes https://url.spec.whatwg.org/#special-scheme
53# are not allowed to have an empty host https://url.spec.whatwg.org/#url-representation
54SCHEME_REQUIRES_HOST = frozenset(("http", "https", "ws", "wss", "ftp"))
57# reg-name: unreserved / pct-encoded / sub-delims
58# this pattern matches anything that is *not* in those classes. and is only used
59# on lower-cased ASCII values.
60NOT_REG_NAME = re.compile(
61 r"""
62 # any character not in the unreserved or sub-delims sets, plus %
63 # (validated with the additional check for pct-encoded sequences below)
64 [^a-z0-9\-._~!$&'()*+,;=%]
65 |
66 # % only allowed if it is part of a pct-encoded
67 # sequence of 2 hex digits.
68 %(?![0-9a-f]{2})
69 """,
70 re.VERBOSE,
71)
73_T = TypeVar("_T")
75if sys.version_info >= (3, 11):
76 from typing import Self
77else:
78 Self = Any
81class UndefinedType(Enum):
82 """Singleton type for use with not set sentinel values."""
84 _singleton = 0
87UNDEFINED = UndefinedType._singleton
90class CacheInfo(TypedDict):
91 """Host encoding cache."""
93 idna_encode: _CacheInfo
94 idna_decode: _CacheInfo
95 ip_address: _CacheInfo
96 host_validate: _CacheInfo
97 encode_host: _CacheInfo
100class _InternalURLCache(TypedDict, total=False):
101 _val: SplitURLType
102 _origin: "URL"
103 absolute: bool
104 hash: int
105 scheme: str
106 raw_authority: str
107 authority: str
108 raw_user: Union[str, None]
109 user: Union[str, None]
110 raw_password: Union[str, None]
111 password: Union[str, None]
112 raw_host: Union[str, None]
113 host: Union[str, None]
114 host_subcomponent: Union[str, None]
115 host_port_subcomponent: Union[str, None]
116 port: Union[int, None]
117 explicit_port: Union[int, None]
118 raw_path: str
119 path: str
120 _parsed_query: list[tuple[str, str]]
121 query: "MultiDictProxy[str]"
122 raw_query_string: str
123 query_string: str
124 path_qs: str
125 raw_path_qs: str
126 raw_fragment: str
127 fragment: str
128 raw_parts: tuple[str, ...]
129 parts: tuple[str, ...]
130 parent: "URL"
131 raw_name: str
132 name: str
133 raw_suffix: str
134 suffix: str
135 raw_suffixes: tuple[str, ...]
136 suffixes: tuple[str, ...]
139def rewrite_module(obj: _T) -> _T:
140 obj.__module__ = "yarl"
141 return obj
144@lru_cache
145def encode_url(url_str: str) -> "URL":
146 """Parse unencoded URL."""
147 cache: _InternalURLCache = {}
148 host: Union[str, None]
149 scheme, netloc, path, query, fragment = split_url(url_str)
150 if not netloc: # netloc
151 host = ""
152 else:
153 if ":" in netloc or "@" in netloc or "[" in netloc:
154 # Complex netloc
155 username, password, host, port = split_netloc(netloc)
156 else:
157 username = password = port = None
158 host = netloc
159 if host is None:
160 if scheme in SCHEME_REQUIRES_HOST:
161 msg = (
162 "Invalid URL: host is required for "
163 f"absolute urls with the {scheme} scheme"
164 )
165 raise ValueError(msg)
166 else:
167 host = ""
168 host = _encode_host(host, validate_host=False)
169 # Remove brackets as host encoder adds back brackets for IPv6 addresses
170 cache["raw_host"] = host[1:-1] if "[" in host else host
171 cache["explicit_port"] = port
172 if password is None and username is None:
173 # Fast path for URLs without user, password
174 netloc = host if port is None else f"{host}:{port}"
175 cache["raw_user"] = None
176 cache["raw_password"] = None
177 else:
178 raw_user = REQUOTER(username) if username else username
179 raw_password = REQUOTER(password) if password else password
180 netloc = make_netloc(raw_user, raw_password, host, port)
181 cache["raw_user"] = raw_user
182 cache["raw_password"] = raw_password
184 if path:
185 path = PATH_REQUOTER(path)
186 if netloc and "." in path:
187 path = normalize_path(path)
188 if query:
189 query = QUERY_REQUOTER(query)
190 if fragment:
191 fragment = FRAGMENT_REQUOTER(fragment)
193 cache["scheme"] = scheme
194 cache["raw_path"] = "/" if not path and netloc else path
195 cache["raw_query_string"] = query
196 cache["raw_fragment"] = fragment
198 self = object.__new__(URL)
199 self._scheme = scheme
200 self._netloc = netloc
201 self._path = path
202 self._query = query
203 self._fragment = fragment
204 self._cache = cache
205 return self
208@lru_cache
209def pre_encoded_url(url_str: str) -> "URL":
210 """Parse pre-encoded URL."""
211 self = object.__new__(URL)
212 val = split_url(url_str)
213 self._scheme, self._netloc, self._path, self._query, self._fragment = val
214 self._cache = {}
215 return self
218@lru_cache
219def build_pre_encoded_url(
220 scheme: str,
221 authority: str,
222 user: Union[str, None],
223 password: Union[str, None],
224 host: str,
225 port: Union[int, None],
226 path: str,
227 query_string: str,
228 fragment: str,
229) -> "URL":
230 """Build a pre-encoded URL from parts."""
231 self = object.__new__(URL)
232 self._scheme = scheme
233 if authority:
234 self._netloc = authority
235 elif host:
236 if port is not None:
237 port = None if port == DEFAULT_PORTS.get(scheme) else port
238 if user is None and password is None:
239 self._netloc = host if port is None else f"{host}:{port}"
240 else:
241 self._netloc = make_netloc(user, password, host, port)
242 else:
243 self._netloc = ""
244 self._path = path
245 self._query = query_string
246 self._fragment = fragment
247 self._cache = {}
248 return self
251def from_parts_uncached(
252 scheme: str, netloc: str, path: str, query: str, fragment: str
253) -> "URL":
254 """Create a new URL from parts."""
255 self = object.__new__(URL)
256 self._scheme = scheme
257 self._netloc = netloc
258 self._path = path
259 self._query = query
260 self._fragment = fragment
261 self._cache = {}
262 return self
265from_parts = lru_cache(from_parts_uncached)
268@rewrite_module
269class URL:
270 # Don't derive from str
271 # follow pathlib.Path design
272 # probably URL will not suffer from pathlib problems:
273 # it's intended for libraries like aiohttp,
274 # not to be passed into standard library functions like os.open etc.
276 # URL grammar (RFC 3986)
277 # pct-encoded = "%" HEXDIG HEXDIG
278 # reserved = gen-delims / sub-delims
279 # gen-delims = ":" / "/" / "?" / "#" / "[" / "]" / "@"
280 # sub-delims = "!" / "$" / "&" / "'" / "(" / ")"
281 # / "*" / "+" / "," / ";" / "="
282 # unreserved = ALPHA / DIGIT / "-" / "." / "_" / "~"
283 # URI = scheme ":" hier-part [ "?" query ] [ "#" fragment ]
284 # hier-part = "//" authority path-abempty
285 # / path-absolute
286 # / path-rootless
287 # / path-empty
288 # scheme = ALPHA *( ALPHA / DIGIT / "+" / "-" / "." )
289 # authority = [ userinfo "@" ] host [ ":" port ]
290 # userinfo = *( unreserved / pct-encoded / sub-delims / ":" )
291 # host = IP-literal / IPv4address / reg-name
292 # IP-literal = "[" ( IPv6address / IPvFuture ) "]"
293 # IPvFuture = "v" 1*HEXDIG "." 1*( unreserved / sub-delims / ":" )
294 # IPv6address = 6( h16 ":" ) ls32
295 # / "::" 5( h16 ":" ) ls32
296 # / [ h16 ] "::" 4( h16 ":" ) ls32
297 # / [ *1( h16 ":" ) h16 ] "::" 3( h16 ":" ) ls32
298 # / [ *2( h16 ":" ) h16 ] "::" 2( h16 ":" ) ls32
299 # / [ *3( h16 ":" ) h16 ] "::" h16 ":" ls32
300 # / [ *4( h16 ":" ) h16 ] "::" ls32
301 # / [ *5( h16 ":" ) h16 ] "::" h16
302 # / [ *6( h16 ":" ) h16 ] "::"
303 # ls32 = ( h16 ":" h16 ) / IPv4address
304 # ; least-significant 32 bits of address
305 # h16 = 1*4HEXDIG
306 # ; 16 bits of address represented in hexadecimal
307 # IPv4address = dec-octet "." dec-octet "." dec-octet "." dec-octet
308 # dec-octet = DIGIT ; 0-9
309 # / %x31-39 DIGIT ; 10-99
310 # / "1" 2DIGIT ; 100-199
311 # / "2" %x30-34 DIGIT ; 200-249
312 # / "25" %x30-35 ; 250-255
313 # reg-name = *( unreserved / pct-encoded / sub-delims )
314 # port = *DIGIT
315 # path = path-abempty ; begins with "/" or is empty
316 # / path-absolute ; begins with "/" but not "//"
317 # / path-noscheme ; begins with a non-colon segment
318 # / path-rootless ; begins with a segment
319 # / path-empty ; zero characters
320 # path-abempty = *( "/" segment )
321 # path-absolute = "/" [ segment-nz *( "/" segment ) ]
322 # path-noscheme = segment-nz-nc *( "/" segment )
323 # path-rootless = segment-nz *( "/" segment )
324 # path-empty = 0<pchar>
325 # segment = *pchar
326 # segment-nz = 1*pchar
327 # segment-nz-nc = 1*( unreserved / pct-encoded / sub-delims / "@" )
328 # ; non-zero-length segment without any colon ":"
329 # pchar = unreserved / pct-encoded / sub-delims / ":" / "@"
330 # query = *( pchar / "/" / "?" )
331 # fragment = *( pchar / "/" / "?" )
332 # URI-reference = URI / relative-ref
333 # relative-ref = relative-part [ "?" query ] [ "#" fragment ]
334 # relative-part = "//" authority path-abempty
335 # / path-absolute
336 # / path-noscheme
337 # / path-empty
338 # absolute-URI = scheme ":" hier-part [ "?" query ]
339 __slots__ = ("_cache", "_scheme", "_netloc", "_path", "_query", "_fragment")
341 _cache: _InternalURLCache
342 _scheme: str
343 _netloc: str
344 _path: str
345 _query: str
346 _fragment: str
348 def __new__(
349 cls,
350 val: Union[str, SplitResult, "URL", UndefinedType] = UNDEFINED,
351 *,
352 encoded: bool = False,
353 strict: Union[bool, None] = None,
354 ) -> "URL":
355 if strict is not None: # pragma: no cover
356 warnings.warn("strict parameter is ignored")
357 if type(val) is str:
358 return pre_encoded_url(val) if encoded else encode_url(val)
359 if type(val) is cls:
360 return val
361 if type(val) is SplitResult:
362 if not encoded:
363 raise ValueError("Cannot apply decoding to SplitResult")
364 return from_parts(*val)
365 if isinstance(val, str):
366 return pre_encoded_url(str(val)) if encoded else encode_url(str(val))
367 if val is UNDEFINED:
368 # Special case for UNDEFINED since it might be unpickling and we do
369 # not want to cache as the `__set_state__` call would mutate the URL
370 # object in the `pre_encoded_url` or `encoded_url` caches.
371 self = object.__new__(URL)
372 self._scheme = self._netloc = self._path = self._query = self._fragment = ""
373 self._cache = {}
374 return self
375 raise TypeError("Constructor parameter should be str")
377 @classmethod
378 def build(
379 cls,
380 *,
381 scheme: str = "",
382 authority: str = "",
383 user: Union[str, None] = None,
384 password: Union[str, None] = None,
385 host: str = "",
386 port: Union[int, None] = None,
387 path: str = "",
388 query: Union[Query, None] = None,
389 query_string: str = "",
390 fragment: str = "",
391 encoded: bool = False,
392 ) -> "URL":
393 """Creates and returns a new URL"""
395 if authority and (user or password or host or port):
396 raise ValueError(
397 'Can\'t mix "authority" with "user", "password", "host" or "port".'
398 )
399 if port is not None and not isinstance(port, int):
400 raise TypeError(f"The port is required to be int, got {type(port)!r}.")
401 if port and not host:
402 raise ValueError('Can\'t build URL with "port" but without "host".')
403 if query and query_string:
404 raise ValueError('Only one of "query" or "query_string" should be passed')
405 if (
406 scheme is None # type: ignore[redundant-expr]
407 or authority is None # type: ignore[redundant-expr]
408 or host is None # type: ignore[redundant-expr]
409 or path is None # type: ignore[redundant-expr]
410 or query_string is None # type: ignore[redundant-expr]
411 or fragment is None
412 ):
413 raise TypeError(
414 'NoneType is illegal for "scheme", "authority", "host", "path", '
415 '"query_string", and "fragment" args, use empty string instead.'
416 )
418 if query:
419 query_string = get_str_query(query) or ""
421 if encoded:
422 return build_pre_encoded_url(
423 scheme,
424 authority,
425 user,
426 password,
427 host,
428 port,
429 path,
430 query_string,
431 fragment,
432 )
434 self = object.__new__(URL)
435 self._scheme = scheme
436 _host: Union[str, None] = None
437 if authority:
438 user, password, _host, port = split_netloc(authority)
439 _host = _encode_host(_host, validate_host=False) if _host else ""
440 elif host:
441 _host = _encode_host(host, validate_host=True)
442 else:
443 self._netloc = ""
445 if _host is not None:
446 if port is not None:
447 port = None if port == DEFAULT_PORTS.get(scheme) else port
448 if user is None and password is None:
449 self._netloc = _host if port is None else f"{_host}:{port}"
450 else:
451 self._netloc = make_netloc(user, password, _host, port, True)
453 path = PATH_QUOTER(path) if path else path
454 if path and self._netloc:
455 if "." in path:
456 path = normalize_path(path)
457 if path[0] != "/":
458 msg = (
459 "Path in a URL with authority should "
460 "start with a slash ('/') if set"
461 )
462 raise ValueError(msg)
464 self._path = path
465 if not query and query_string:
466 query_string = QUERY_QUOTER(query_string)
467 self._query = query_string
468 self._fragment = FRAGMENT_QUOTER(fragment) if fragment else fragment
469 self._cache = {}
470 return self
472 def __init_subclass__(cls) -> NoReturn:
473 raise TypeError(f"Inheriting a class {cls!r} from URL is forbidden")
475 def __str__(self) -> str:
476 if not self._path and self._netloc and (self._query or self._fragment):
477 path = "/"
478 else:
479 path = self._path
480 if (port := self.explicit_port) is not None and port == DEFAULT_PORTS.get(
481 self._scheme
482 ):
483 # port normalization - using None for default ports to remove from rendering
484 # https://datatracker.ietf.org/doc/html/rfc3986.html#section-6.2.3
485 host = self.host_subcomponent
486 netloc = make_netloc(self.raw_user, self.raw_password, host, None)
487 else:
488 netloc = self._netloc
489 return unsplit_result(self._scheme, netloc, path, self._query, self._fragment)
491 def __repr__(self) -> str:
492 return f"{self.__class__.__name__}('{str(self)}')"
494 def __bytes__(self) -> bytes:
495 return str(self).encode("ascii")
497 def __eq__(self, other: object) -> bool:
498 if type(other) is not URL:
499 return NotImplemented
501 path1 = "/" if not self._path and self._netloc else self._path
502 path2 = "/" if not other._path and other._netloc else other._path
503 return (
504 self._scheme == other._scheme
505 and self._netloc == other._netloc
506 and path1 == path2
507 and self._query == other._query
508 and self._fragment == other._fragment
509 )
511 def __hash__(self) -> int:
512 if (ret := self._cache.get("hash")) is None:
513 path = "/" if not self._path and self._netloc else self._path
514 ret = self._cache["hash"] = hash(
515 (self._scheme, self._netloc, path, self._query, self._fragment)
516 )
517 return ret
519 def __le__(self, other: object) -> bool:
520 if type(other) is not URL:
521 return NotImplemented
522 return self._val <= other._val
524 def __lt__(self, other: object) -> bool:
525 if type(other) is not URL:
526 return NotImplemented
527 return self._val < other._val
529 def __ge__(self, other: object) -> bool:
530 if type(other) is not URL:
531 return NotImplemented
532 return self._val >= other._val
534 def __gt__(self, other: object) -> bool:
535 if type(other) is not URL:
536 return NotImplemented
537 return self._val > other._val
539 def __truediv__(self, name: str) -> "URL":
540 if not isinstance(name, str):
541 return NotImplemented # type: ignore[unreachable]
542 return self._make_child((str(name),))
544 def __mod__(self, query: Query) -> "URL":
545 return self.update_query(query)
547 def __bool__(self) -> bool:
548 return bool(self._netloc or self._path or self._query or self._fragment)
550 def __getstate__(self) -> tuple[SplitResult]:
551 return (tuple.__new__(SplitResult, self._val),)
553 def __setstate__(
554 self, state: Union[tuple[SplitURLType], tuple[None, _InternalURLCache]]
555 ) -> None:
556 if state[0] is None and isinstance(state[1], dict):
557 # default style pickle
558 val = state[1]["_val"]
559 else:
560 unused: list[object]
561 val, *unused = state
562 self._scheme, self._netloc, self._path, self._query, self._fragment = val
563 self._cache = {}
565 def _cache_netloc(self) -> None:
566 """Cache the netloc parts of the URL."""
567 c = self._cache
568 split_loc = split_netloc(self._netloc)
569 c["raw_user"], c["raw_password"], c["raw_host"], c["explicit_port"] = split_loc
571 def is_absolute(self) -> bool:
572 """A check for absolute URLs.
574 Return True for absolute ones (having scheme or starting
575 with //), False otherwise.
577 Is is preferred to call the .absolute property instead
578 as it is cached.
579 """
580 return self.absolute
582 def is_default_port(self) -> bool:
583 """A check for default port.
585 Return True if port is default for specified scheme,
586 e.g. 'http://python.org' or 'http://python.org:80', False
587 otherwise.
589 Return False for relative URLs.
591 """
592 if (explicit := self.explicit_port) is None:
593 # If the explicit port is None, then the URL must be
594 # using the default port unless its a relative URL
595 # which does not have an implicit port / default port
596 return self._netloc != ""
597 return explicit == DEFAULT_PORTS.get(self._scheme)
599 def origin(self) -> "URL":
600 """Return an URL with scheme, host and port parts only.
602 user, password, path, query and fragment are removed.
604 """
605 # TODO: add a keyword-only option for keeping user/pass maybe?
606 return self._origin
608 @cached_property
609 def _val(self) -> SplitURLType:
610 return (self._scheme, self._netloc, self._path, self._query, self._fragment)
612 @cached_property
613 def _origin(self) -> "URL":
614 """Return an URL with scheme, host and port parts only.
616 user, password, path, query and fragment are removed.
617 """
618 if not (netloc := self._netloc):
619 raise ValueError("URL should be absolute")
620 if not (scheme := self._scheme):
621 raise ValueError("URL should have scheme")
622 if "@" in netloc:
623 encoded_host = self.host_subcomponent
624 netloc = make_netloc(None, None, encoded_host, self.explicit_port)
625 elif not self._path and not self._query and not self._fragment:
626 return self
627 return from_parts(scheme, netloc, "", "", "")
629 def relative(self) -> "URL":
630 """Return a relative part of the URL.
632 scheme, user, password, host and port are removed.
634 """
635 if not self._netloc:
636 raise ValueError("URL should be absolute")
637 return from_parts("", "", self._path, self._query, self._fragment)
639 @cached_property
640 def absolute(self) -> bool:
641 """A check for absolute URLs.
643 Return True for absolute ones (having scheme or starting
644 with //), False otherwise.
646 """
647 # `netloc`` is an empty string for relative URLs
648 # Checking `netloc` is faster than checking `hostname`
649 # because `hostname` is a property that does some extra work
650 # to parse the host from the `netloc`
651 return self._netloc != ""
653 @cached_property
654 def scheme(self) -> str:
655 """Scheme for absolute URLs.
657 Empty string for relative URLs or URLs starting with //
659 """
660 return self._scheme
662 @cached_property
663 def raw_authority(self) -> str:
664 """Encoded authority part of URL.
666 Empty string for relative URLs.
668 """
669 return self._netloc
671 @cached_property
672 def authority(self) -> str:
673 """Decoded authority part of URL.
675 Empty string for relative URLs.
677 """
678 return make_netloc(self.user, self.password, self.host, self.port)
680 @cached_property
681 def raw_user(self) -> Union[str, None]:
682 """Encoded user part of URL.
684 None if user is missing.
686 """
687 # not .username
688 self._cache_netloc()
689 return self._cache["raw_user"]
691 @cached_property
692 def user(self) -> Union[str, None]:
693 """Decoded user part of URL.
695 None if user is missing.
697 """
698 if (raw_user := self.raw_user) is None:
699 return None
700 return UNQUOTER(raw_user)
702 @cached_property
703 def raw_password(self) -> Union[str, None]:
704 """Encoded password part of URL.
706 None if password is missing.
708 """
709 self._cache_netloc()
710 return self._cache["raw_password"]
712 @cached_property
713 def password(self) -> Union[str, None]:
714 """Decoded password part of URL.
716 None if password is missing.
718 """
719 if (raw_password := self.raw_password) is None:
720 return None
721 return UNQUOTER(raw_password)
723 @cached_property
724 def raw_host(self) -> Union[str, None]:
725 """Encoded host part of URL.
727 None for relative URLs.
729 When working with IPv6 addresses, use the `host_subcomponent` property instead
730 as it will return the host subcomponent with brackets.
731 """
732 # Use host instead of hostname for sake of shortness
733 # May add .hostname prop later
734 self._cache_netloc()
735 return self._cache["raw_host"]
737 @cached_property
738 def host(self) -> Union[str, None]:
739 """Decoded host part of URL.
741 None for relative URLs.
743 """
744 if (raw := self.raw_host) is None:
745 return None
746 if raw and raw[-1].isdigit() or ":" in raw:
747 # IP addresses are never IDNA encoded
748 return raw
749 return _idna_decode(raw)
751 @cached_property
752 def host_subcomponent(self) -> Union[str, None]:
753 """Return the host subcomponent part of URL.
755 None for relative URLs.
757 https://datatracker.ietf.org/doc/html/rfc3986#section-3.2.2
759 `IP-literal = "[" ( IPv6address / IPvFuture ) "]"`
761 Examples:
762 - `http://example.com:8080` -> `example.com`
763 - `http://example.com:80` -> `example.com`
764 - `https://127.0.0.1:8443` -> `127.0.0.1`
765 - `https://[::1]:8443` -> `[::1]`
766 - `http://[::1]` -> `[::1]`
768 """
769 if (raw := self.raw_host) is None:
770 return None
771 return f"[{raw}]" if ":" in raw else raw
773 @cached_property
774 def host_port_subcomponent(self) -> Union[str, None]:
775 """Return the host and port subcomponent part of URL.
777 Trailing dots are removed from the host part.
779 This value is suitable for use in the Host header of an HTTP request.
781 None for relative URLs.
783 https://datatracker.ietf.org/doc/html/rfc3986#section-3.2.2
784 `IP-literal = "[" ( IPv6address / IPvFuture ) "]"`
785 https://datatracker.ietf.org/doc/html/rfc3986#section-3.2.3
786 port = *DIGIT
788 Examples:
789 - `http://example.com:8080` -> `example.com:8080`
790 - `http://example.com:80` -> `example.com`
791 - `http://example.com.:80` -> `example.com`
792 - `https://127.0.0.1:8443` -> `127.0.0.1:8443`
793 - `https://[::1]:8443` -> `[::1]:8443`
794 - `http://[::1]` -> `[::1]`
796 """
797 if (raw := self.raw_host) is None:
798 return None
799 if raw[-1] == ".":
800 # Remove all trailing dots from the netloc as while
801 # they are valid FQDNs in DNS, TLS validation fails.
802 # See https://github.com/aio-libs/aiohttp/issues/3636.
803 # To avoid string manipulation we only call rstrip if
804 # the last character is a dot.
805 raw = raw.rstrip(".")
806 port = self.explicit_port
807 if port is None or port == DEFAULT_PORTS.get(self._scheme):
808 return f"[{raw}]" if ":" in raw else raw
809 return f"[{raw}]:{port}" if ":" in raw else f"{raw}:{port}"
811 @cached_property
812 def port(self) -> Union[int, None]:
813 """Port part of URL, with scheme-based fallback.
815 None for relative URLs or URLs without explicit port and
816 scheme without default port substitution.
818 """
819 if (explicit_port := self.explicit_port) is not None:
820 return explicit_port
821 return DEFAULT_PORTS.get(self._scheme)
823 @cached_property
824 def explicit_port(self) -> Union[int, None]:
825 """Port part of URL, without scheme-based fallback.
827 None for relative URLs or URLs without explicit port.
829 """
830 self._cache_netloc()
831 return self._cache["explicit_port"]
833 @cached_property
834 def raw_path(self) -> str:
835 """Encoded path of URL.
837 / for absolute URLs without path part.
839 """
840 return self._path if self._path or not self._netloc else "/"
842 @cached_property
843 def path(self) -> str:
844 """Decoded path of URL.
846 / for absolute URLs without path part.
848 """
849 return PATH_UNQUOTER(self._path) if self._path else "/" if self._netloc else ""
851 @cached_property
852 def path_safe(self) -> str:
853 """Decoded path of URL.
855 / for absolute URLs without path part.
857 / (%2F) and % (%25) are not decoded
859 """
860 if self._path:
861 return PATH_SAFE_UNQUOTER(self._path)
862 return "/" if self._netloc else ""
864 @cached_property
865 def _parsed_query(self) -> list[tuple[str, str]]:
866 """Parse query part of URL."""
867 return query_to_pairs(self._query)
869 @cached_property
870 def query(self) -> "MultiDictProxy[str]":
871 """A MultiDictProxy representing parsed query parameters in decoded
872 representation.
874 Empty value if URL has no query part.
876 """
877 return MultiDictProxy(MultiDict(self._parsed_query))
879 @cached_property
880 def raw_query_string(self) -> str:
881 """Encoded query part of URL.
883 Empty string if query is missing.
885 """
886 return self._query
888 @cached_property
889 def query_string(self) -> str:
890 """Decoded query part of URL.
892 Empty string if query is missing.
894 """
895 return QS_UNQUOTER(self._query) if self._query else ""
897 @cached_property
898 def path_qs(self) -> str:
899 """Decoded path of URL with query."""
900 return self.path if not (q := self.query_string) else f"{self.path}?{q}"
902 @cached_property
903 def raw_path_qs(self) -> str:
904 """Encoded path of URL with query."""
905 if q := self._query:
906 return f"{self._path}?{q}" if self._path or not self._netloc else f"/?{q}"
907 return self._path if self._path or not self._netloc else "/"
909 @cached_property
910 def raw_fragment(self) -> str:
911 """Encoded fragment part of URL.
913 Empty string if fragment is missing.
915 """
916 return self._fragment
918 @cached_property
919 def fragment(self) -> str:
920 """Decoded fragment part of URL.
922 Empty string if fragment is missing.
924 """
925 return UNQUOTER(self._fragment) if self._fragment else ""
927 @cached_property
928 def raw_parts(self) -> tuple[str, ...]:
929 """A tuple containing encoded *path* parts.
931 ('/',) for absolute URLs if *path* is missing.
933 """
934 path = self._path
935 if self._netloc:
936 return ("/", *path[1:].split("/")) if path else ("/",)
937 if path and path[0] == "/":
938 return ("/", *path[1:].split("/"))
939 return tuple(path.split("/"))
941 @cached_property
942 def parts(self) -> tuple[str, ...]:
943 """A tuple containing decoded *path* parts.
945 ('/',) for absolute URLs if *path* is missing.
947 """
948 return tuple(UNQUOTER(part) for part in self.raw_parts)
950 @cached_property
951 def parent(self) -> "URL":
952 """A new URL with last part of path removed and cleaned up query and
953 fragment.
955 """
956 path = self._path
957 if not path or path == "/":
958 if self._fragment or self._query:
959 return from_parts(self._scheme, self._netloc, path, "", "")
960 return self
961 parts = path.split("/")
962 return from_parts(self._scheme, self._netloc, "/".join(parts[:-1]), "", "")
964 @cached_property
965 def raw_name(self) -> str:
966 """The last part of raw_parts."""
967 parts = self.raw_parts
968 if not self._netloc:
969 return parts[-1]
970 parts = parts[1:]
971 return parts[-1] if parts else ""
973 @cached_property
974 def name(self) -> str:
975 """The last part of parts."""
976 return UNQUOTER(self.raw_name)
978 @cached_property
979 def raw_suffix(self) -> str:
980 name = self.raw_name
981 i = name.rfind(".")
982 return name[i:] if 0 < i < len(name) - 1 else ""
984 @cached_property
985 def suffix(self) -> str:
986 return UNQUOTER(self.raw_suffix)
988 @cached_property
989 def raw_suffixes(self) -> tuple[str, ...]:
990 name = self.raw_name
991 if name.endswith("."):
992 return ()
993 name = name.lstrip(".")
994 return tuple("." + suffix for suffix in name.split(".")[1:])
996 @cached_property
997 def suffixes(self) -> tuple[str, ...]:
998 return tuple(UNQUOTER(suffix) for suffix in self.raw_suffixes)
1000 def _make_child(self, paths: "Sequence[str]", encoded: bool = False) -> "URL":
1001 """
1002 add paths to self._path, accounting for absolute vs relative paths,
1003 keep existing, but do not create new, empty segments
1004 """
1005 parsed: list[str] = []
1006 needs_normalize: bool = False
1007 for idx, path in enumerate(reversed(paths)):
1008 # empty segment of last is not removed
1009 last = idx == 0
1010 if path and path[0] == "/":
1011 raise ValueError(
1012 f"Appending path {path!r} starting from slash is forbidden"
1013 )
1014 # We need to quote the path if it is not already encoded
1015 # This cannot be done at the end because the existing
1016 # path is already quoted and we do not want to double quote
1017 # the existing path.
1018 path = path if encoded else PATH_QUOTER(path)
1019 needs_normalize |= "." in path
1020 segments = path.split("/")
1021 segments.reverse()
1022 # remove trailing empty segment for all but the last path
1023 parsed += segments[1:] if not last and segments[0] == "" else segments
1025 if (path := self._path) and (old_segments := path.split("/")):
1026 # If the old path ends with a slash, the last segment is an empty string
1027 # and should be removed before adding the new path segments.
1028 old = old_segments[:-1] if old_segments[-1] == "" else old_segments
1029 old.reverse()
1030 parsed += old
1032 # If the netloc is present, inject a leading slash when adding a
1033 # path to an absolute URL where there was none before.
1034 if (netloc := self._netloc) and parsed and parsed[-1] != "":
1035 parsed.append("")
1037 parsed.reverse()
1038 if not netloc or not needs_normalize:
1039 return from_parts(self._scheme, netloc, "/".join(parsed), "", "")
1041 path = "/".join(normalize_path_segments(parsed))
1042 # If normalizing the path segments removed the leading slash, add it back.
1043 if path and path[0] != "/":
1044 path = f"/{path}"
1045 return from_parts(self._scheme, netloc, path, "", "")
1047 def with_scheme(self, scheme: str) -> "URL":
1048 """Return a new URL with scheme replaced."""
1049 # N.B. doesn't cleanup query/fragment
1050 if not isinstance(scheme, str):
1051 raise TypeError("Invalid scheme type")
1052 lower_scheme = scheme.lower()
1053 netloc = self._netloc
1054 if not netloc and lower_scheme in SCHEME_REQUIRES_HOST:
1055 msg = (
1056 "scheme replacement is not allowed for "
1057 f"relative URLs for the {lower_scheme} scheme"
1058 )
1059 raise ValueError(msg)
1060 return from_parts(lower_scheme, netloc, self._path, self._query, self._fragment)
1062 def with_user(self, user: Union[str, None]) -> "URL":
1063 """Return a new URL with user replaced.
1065 Autoencode user if needed.
1067 Clear user/password if user is None.
1069 """
1070 # N.B. doesn't cleanup query/fragment
1071 if user is None:
1072 password = None
1073 elif isinstance(user, str):
1074 user = QUOTER(user)
1075 password = self.raw_password
1076 else:
1077 raise TypeError("Invalid user type")
1078 if not (netloc := self._netloc):
1079 raise ValueError("user replacement is not allowed for relative URLs")
1080 encoded_host = self.host_subcomponent or ""
1081 netloc = make_netloc(user, password, encoded_host, self.explicit_port)
1082 return from_parts(self._scheme, netloc, self._path, self._query, self._fragment)
1084 def with_password(self, password: Union[str, None]) -> "URL":
1085 """Return a new URL with password replaced.
1087 Autoencode password if needed.
1089 Clear password if argument is None.
1091 """
1092 # N.B. doesn't cleanup query/fragment
1093 if password is None:
1094 pass
1095 elif isinstance(password, str):
1096 password = QUOTER(password)
1097 else:
1098 raise TypeError("Invalid password type")
1099 if not (netloc := self._netloc):
1100 raise ValueError("password replacement is not allowed for relative URLs")
1101 encoded_host = self.host_subcomponent or ""
1102 port = self.explicit_port
1103 netloc = make_netloc(self.raw_user, password, encoded_host, port)
1104 return from_parts(self._scheme, netloc, self._path, self._query, self._fragment)
1106 def with_host(self, host: str) -> "URL":
1107 """Return a new URL with host replaced.
1109 Autoencode host if needed.
1111 Changing host for relative URLs is not allowed, use .join()
1112 instead.
1114 """
1115 # N.B. doesn't cleanup query/fragment
1116 if not isinstance(host, str):
1117 raise TypeError("Invalid host type")
1118 if not (netloc := self._netloc):
1119 raise ValueError("host replacement is not allowed for relative URLs")
1120 if not host:
1121 raise ValueError("host removing is not allowed")
1122 encoded_host = _encode_host(host, validate_host=True) if host else ""
1123 port = self.explicit_port
1124 netloc = make_netloc(self.raw_user, self.raw_password, encoded_host, port)
1125 return from_parts(self._scheme, netloc, self._path, self._query, self._fragment)
1127 def with_port(self, port: Union[int, None]) -> "URL":
1128 """Return a new URL with port replaced.
1130 Clear port to default if None is passed.
1132 """
1133 # N.B. doesn't cleanup query/fragment
1134 if port is not None:
1135 if isinstance(port, bool) or not isinstance(port, int):
1136 raise TypeError(f"port should be int or None, got {type(port)}")
1137 if not (0 <= port <= 65535):
1138 raise ValueError(f"port must be between 0 and 65535, got {port}")
1139 if not (netloc := self._netloc):
1140 raise ValueError("port replacement is not allowed for relative URLs")
1141 encoded_host = self.host_subcomponent or ""
1142 netloc = make_netloc(self.raw_user, self.raw_password, encoded_host, port)
1143 return from_parts(self._scheme, netloc, self._path, self._query, self._fragment)
1145 def with_path(
1146 self,
1147 path: str,
1148 *,
1149 encoded: bool = False,
1150 keep_query: bool = False,
1151 keep_fragment: bool = False,
1152 ) -> "URL":
1153 """Return a new URL with path replaced."""
1154 netloc = self._netloc
1155 if not encoded:
1156 path = PATH_QUOTER(path)
1157 if netloc:
1158 path = normalize_path(path) if "." in path else path
1159 if path and path[0] != "/":
1160 path = f"/{path}"
1161 query = self._query if keep_query else ""
1162 fragment = self._fragment if keep_fragment else ""
1163 return from_parts(self._scheme, netloc, path, query, fragment)
1165 @overload
1166 def with_query(self, query: Query) -> "URL": ...
1168 @overload
1169 def with_query(self, **kwargs: QueryVariable) -> "URL": ...
1171 def with_query(self, *args: Any, **kwargs: Any) -> "URL":
1172 """Return a new URL with query part replaced.
1174 Accepts any Mapping (e.g. dict, multidict.MultiDict instances)
1175 or str, autoencode the argument if needed.
1177 A sequence of (key, value) pairs is supported as well.
1179 It also can take an arbitrary number of keyword arguments.
1181 Clear query if None is passed.
1183 """
1184 # N.B. doesn't cleanup query/fragment
1185 query = get_str_query(*args, **kwargs) or ""
1186 return from_parts_uncached(
1187 self._scheme, self._netloc, self._path, query, self._fragment
1188 )
1190 @overload
1191 def extend_query(self, query: Query) -> "URL": ...
1193 @overload
1194 def extend_query(self, **kwargs: QueryVariable) -> "URL": ...
1196 def extend_query(self, *args: Any, **kwargs: Any) -> "URL":
1197 """Return a new URL with query part combined with the existing.
1199 This method will not remove existing query parameters.
1201 Example:
1202 >>> url = URL('http://example.com/?a=1&b=2')
1203 >>> url.extend_query(a=3, c=4)
1204 URL('http://example.com/?a=1&b=2&a=3&c=4')
1205 """
1206 if not (new_query := get_str_query(*args, **kwargs)):
1207 return self
1208 if query := self._query:
1209 # both strings are already encoded so we can use a simple
1210 # string join
1211 query += new_query if query[-1] == "&" else f"&{new_query}"
1212 else:
1213 query = new_query
1214 return from_parts_uncached(
1215 self._scheme, self._netloc, self._path, query, self._fragment
1216 )
1218 @overload
1219 def update_query(self, query: Query) -> "URL": ...
1221 @overload
1222 def update_query(self, **kwargs: QueryVariable) -> "URL": ...
1224 def update_query(self, *args: Any, **kwargs: Any) -> "URL":
1225 """Return a new URL with query part updated.
1227 This method will overwrite existing query parameters.
1229 Example:
1230 >>> url = URL('http://example.com/?a=1&b=2')
1231 >>> url.update_query(a=3, c=4)
1232 URL('http://example.com/?a=3&b=2&c=4')
1233 """
1234 in_query: Union[str, Mapping[str, QueryVariable], None]
1235 if kwargs:
1236 if args:
1237 msg = "Either kwargs or single query parameter must be present"
1238 raise ValueError(msg)
1239 in_query = kwargs
1240 elif len(args) == 1:
1241 in_query = args[0]
1242 else:
1243 raise ValueError("Either kwargs or single query parameter must be present")
1245 if in_query is None:
1246 query = ""
1247 elif not in_query:
1248 query = self._query
1249 elif isinstance(in_query, Mapping):
1250 qm: MultiDict[QueryVariable] = MultiDict(self._parsed_query)
1251 qm.update(in_query)
1252 query = get_str_query_from_sequence_iterable(qm.items())
1253 elif isinstance(in_query, str):
1254 qstr: MultiDict[str] = MultiDict(self._parsed_query)
1255 qstr.update(query_to_pairs(in_query))
1256 query = get_str_query_from_iterable(qstr.items())
1257 elif isinstance(in_query, (bytes, bytearray, memoryview)): # type: ignore[unreachable]
1258 msg = "Invalid query type: bytes, bytearray and memoryview are forbidden"
1259 raise TypeError(msg)
1260 elif isinstance(in_query, Sequence):
1261 # We don't expect sequence values if we're given a list of pairs
1262 # already; only mappings like builtin `dict` which can't have the
1263 # same key pointing to multiple values are allowed to use
1264 # `_query_seq_pairs`.
1265 qs: MultiDict[SimpleQuery] = MultiDict(self._parsed_query)
1266 qs.update(in_query)
1267 query = get_str_query_from_iterable(qs.items())
1268 else:
1269 raise TypeError(
1270 "Invalid query type: only str, mapping or "
1271 "sequence of (key, value) pairs is allowed"
1272 )
1273 return from_parts_uncached(
1274 self._scheme, self._netloc, self._path, query, self._fragment
1275 )
1277 def without_query_params(self, *query_params: str) -> "URL":
1278 """Remove some keys from query part and return new URL."""
1279 params_to_remove = set(query_params) & self.query.keys()
1280 if not params_to_remove:
1281 return self
1282 return self.with_query(
1283 tuple(
1284 (name, value)
1285 for name, value in self.query.items()
1286 if name not in params_to_remove
1287 )
1288 )
1290 def with_fragment(self, fragment: Union[str, None]) -> "URL":
1291 """Return a new URL with fragment replaced.
1293 Autoencode fragment if needed.
1295 Clear fragment to default if None is passed.
1297 """
1298 # N.B. doesn't cleanup query/fragment
1299 if fragment is None:
1300 raw_fragment = ""
1301 elif not isinstance(fragment, str):
1302 raise TypeError("Invalid fragment type")
1303 else:
1304 raw_fragment = FRAGMENT_QUOTER(fragment)
1305 if self._fragment == raw_fragment:
1306 return self
1307 return from_parts(
1308 self._scheme, self._netloc, self._path, self._query, raw_fragment
1309 )
1311 def with_name(
1312 self,
1313 name: str,
1314 *,
1315 keep_query: bool = False,
1316 keep_fragment: bool = False,
1317 ) -> "URL":
1318 """Return a new URL with name (last part of path) replaced.
1320 Query and fragment parts are cleaned up.
1322 Name is encoded if needed.
1324 """
1325 # N.B. DOES cleanup query/fragment
1326 if not isinstance(name, str):
1327 raise TypeError("Invalid name type")
1328 if "/" in name:
1329 raise ValueError("Slash in name is not allowed")
1330 name = PATH_QUOTER(name)
1331 if name in (".", ".."):
1332 raise ValueError(". and .. values are forbidden")
1333 parts = list(self.raw_parts)
1334 if netloc := self._netloc:
1335 if len(parts) == 1:
1336 parts.append(name)
1337 else:
1338 parts[-1] = name
1339 parts[0] = "" # replace leading '/'
1340 else:
1341 parts[-1] = name
1342 if parts[0] == "/":
1343 parts[0] = "" # replace leading '/'
1345 query = self._query if keep_query else ""
1346 fragment = self._fragment if keep_fragment else ""
1347 return from_parts(self._scheme, netloc, "/".join(parts), query, fragment)
1349 def with_suffix(
1350 self,
1351 suffix: str,
1352 *,
1353 keep_query: bool = False,
1354 keep_fragment: bool = False,
1355 ) -> "URL":
1356 """Return a new URL with suffix (file extension of name) replaced.
1358 Query and fragment parts are cleaned up.
1360 suffix is encoded if needed.
1361 """
1362 if not isinstance(suffix, str):
1363 raise TypeError("Invalid suffix type")
1364 if suffix and not suffix[0] == "." or suffix == "." or "/" in suffix:
1365 raise ValueError(f"Invalid suffix {suffix!r}")
1366 name = self.raw_name
1367 if not name:
1368 raise ValueError(f"{self!r} has an empty name")
1369 old_suffix = self.raw_suffix
1370 suffix = PATH_QUOTER(suffix)
1371 name = name + suffix if not old_suffix else name[: -len(old_suffix)] + suffix
1372 if name in (".", ".."):
1373 raise ValueError(". and .. values are forbidden")
1374 parts = list(self.raw_parts)
1375 if netloc := self._netloc:
1376 if len(parts) == 1:
1377 parts.append(name)
1378 else:
1379 parts[-1] = name
1380 parts[0] = "" # replace leading '/'
1381 else:
1382 parts[-1] = name
1383 if parts[0] == "/":
1384 parts[0] = "" # replace leading '/'
1386 query = self._query if keep_query else ""
1387 fragment = self._fragment if keep_fragment else ""
1388 return from_parts(self._scheme, netloc, "/".join(parts), query, fragment)
1390 def join(self, url: "URL") -> "URL":
1391 """Join URLs
1393 Construct a full (“absolute”) URL by combining a “base URL”
1394 (self) with another URL (url).
1396 Informally, this uses components of the base URL, in
1397 particular the addressing scheme, the network location and
1398 (part of) the path, to provide missing components in the
1399 relative URL.
1401 """
1402 if type(url) is not URL:
1403 raise TypeError("url should be URL")
1405 scheme = url._scheme or self._scheme
1406 if scheme != self._scheme or scheme not in USES_RELATIVE:
1407 return url
1409 # scheme is in uses_authority as uses_authority is a superset of uses_relative
1410 if (join_netloc := url._netloc) and scheme in USES_AUTHORITY:
1411 return from_parts(scheme, join_netloc, url._path, url._query, url._fragment)
1413 orig_path = self._path
1414 if join_path := url._path:
1415 if join_path[0] == "/":
1416 path = join_path
1417 elif not orig_path:
1418 path = f"/{join_path}"
1419 elif orig_path[-1] == "/":
1420 path = f"{orig_path}{join_path}"
1421 else:
1422 # …
1423 # and relativizing ".."
1424 # parts[0] is / for absolute urls,
1425 # this join will add a double slash there
1426 path = "/".join([*self.parts[:-1], ""]) + join_path
1427 # which has to be removed
1428 if orig_path[0] == "/":
1429 path = path[1:]
1430 path = normalize_path(path) if "." in path else path
1431 else:
1432 path = orig_path
1434 return from_parts(
1435 scheme,
1436 self._netloc,
1437 path,
1438 url._query if join_path or url._query else self._query,
1439 url._fragment if join_path or url._fragment else self._fragment,
1440 )
1442 def joinpath(self, *other: str, encoded: bool = False) -> "URL":
1443 """Return a new URL with the elements in other appended to the path."""
1444 return self._make_child(other, encoded=encoded)
1446 def human_repr(self) -> str:
1447 """Return decoded human readable string for URL representation."""
1448 user = human_quote(self.user, "#/:?@[]")
1449 password = human_quote(self.password, "#/:?@[]")
1450 if (host := self.host) and ":" in host:
1451 host = f"[{host}]"
1452 path = human_quote(self.path, "#?")
1453 if TYPE_CHECKING:
1454 assert path is not None
1455 query_string = "&".join(
1456 "{}={}".format(human_quote(k, "#&+;="), human_quote(v, "#&+;="))
1457 for k, v in self.query.items()
1458 )
1459 fragment = human_quote(self.fragment, "")
1460 if TYPE_CHECKING:
1461 assert fragment is not None
1462 netloc = make_netloc(user, password, host, self.explicit_port)
1463 return unsplit_result(self._scheme, netloc, path, query_string, fragment)
1466_DEFAULT_IDNA_SIZE = 256
1467_DEFAULT_ENCODE_SIZE = 512
1470@lru_cache(_DEFAULT_IDNA_SIZE)
1471def _idna_decode(raw: str) -> str:
1472 try:
1473 return idna.decode(raw.encode("ascii"))
1474 except UnicodeError: # e.g. '::1'
1475 return raw.encode("ascii").decode("idna")
1478@lru_cache(_DEFAULT_IDNA_SIZE)
1479def _idna_encode(host: str) -> str:
1480 try:
1481 return idna.encode(host, uts46=True).decode("ascii")
1482 except UnicodeError:
1483 return host.encode("idna").decode("ascii")
1486@lru_cache(_DEFAULT_ENCODE_SIZE)
1487def _encode_host(host: str, validate_host: bool) -> str:
1488 """Encode host part of URL."""
1489 # If the host ends with a digit or contains a colon, its likely
1490 # an IP address.
1491 if host and (host[-1].isdigit() or ":" in host):
1492 raw_ip, sep, zone = host.partition("%")
1493 # If it looks like an IP, we check with _ip_compressed_version
1494 # and fall-through if its not an IP address. This is a performance
1495 # optimization to avoid parsing IP addresses as much as possible
1496 # because it is orders of magnitude slower than almost any other
1497 # operation this library does.
1498 # Might be an IP address, check it
1499 #
1500 # IP Addresses can look like:
1501 # https://datatracker.ietf.org/doc/html/rfc3986#section-3.2.2
1502 # - 127.0.0.1 (last character is a digit)
1503 # - 2001:db8::ff00:42:8329 (contains a colon)
1504 # - 2001:db8::ff00:42:8329%eth0 (contains a colon)
1505 # - [2001:db8::ff00:42:8329] (contains a colon -- brackets should
1506 # have been removed before it gets here)
1507 # Rare IP Address formats are not supported per:
1508 # https://datatracker.ietf.org/doc/html/rfc3986#section-7.4
1509 #
1510 # IP parsing is slow, so its wrapped in an LRU
1511 try:
1512 ip = ip_address(raw_ip)
1513 except ValueError:
1514 pass
1515 else:
1516 # These checks should not happen in the
1517 # LRU to keep the cache size small
1518 host = ip.compressed
1519 if ip.version == 6:
1520 return f"[{host}%{zone}]" if sep else f"[{host}]"
1521 return f"{host}%{zone}" if sep else host
1523 # IDNA encoding is slow, skip it for ASCII-only strings
1524 if host.isascii():
1525 # Check for invalid characters explicitly; _idna_encode() does this
1526 # for non-ascii host names.
1527 host = host.lower()
1528 if validate_host and (invalid := NOT_REG_NAME.search(host)):
1529 value, pos, extra = invalid.group(), invalid.start(), ""
1530 if value == "@" or (value == ":" and "@" in host[pos:]):
1531 # this looks like an authority string
1532 extra = (
1533 ", if the value includes a username or password, "
1534 "use 'authority' instead of 'host'"
1535 )
1536 raise ValueError(
1537 f"Host {host!r} cannot contain {value!r} (at position {pos}){extra}"
1538 ) from None
1539 return host
1541 return _idna_encode(host)
1544@rewrite_module
1545def cache_clear() -> None:
1546 """Clear all LRU caches."""
1547 _idna_encode.cache_clear()
1548 _idna_decode.cache_clear()
1549 _encode_host.cache_clear()
1552@rewrite_module
1553def cache_info() -> CacheInfo:
1554 """Report cache statistics."""
1555 return {
1556 "idna_encode": _idna_encode.cache_info(),
1557 "idna_decode": _idna_decode.cache_info(),
1558 "ip_address": _encode_host.cache_info(),
1559 "host_validate": _encode_host.cache_info(),
1560 "encode_host": _encode_host.cache_info(),
1561 }
1564@rewrite_module
1565def cache_configure(
1566 *,
1567 idna_encode_size: Union[int, None] = _DEFAULT_IDNA_SIZE,
1568 idna_decode_size: Union[int, None] = _DEFAULT_IDNA_SIZE,
1569 ip_address_size: Union[int, None, UndefinedType] = UNDEFINED,
1570 host_validate_size: Union[int, None, UndefinedType] = UNDEFINED,
1571 encode_host_size: Union[int, None, UndefinedType] = UNDEFINED,
1572) -> None:
1573 """Configure LRU cache sizes."""
1574 global _idna_decode, _idna_encode, _encode_host
1575 # ip_address_size, host_validate_size are no longer
1576 # used, but are kept for backwards compatibility.
1577 if ip_address_size is not UNDEFINED or host_validate_size is not UNDEFINED:
1578 warnings.warn(
1579 "cache_configure() no longer accepts the "
1580 "ip_address_size or host_validate_size arguments, "
1581 "they are used to set the encode_host_size instead "
1582 "and will be removed in the future",
1583 DeprecationWarning,
1584 stacklevel=2,
1585 )
1587 if encode_host_size is not None:
1588 for size in (ip_address_size, host_validate_size):
1589 if size is None:
1590 encode_host_size = None
1591 elif encode_host_size is UNDEFINED:
1592 if size is not UNDEFINED:
1593 encode_host_size = size
1594 elif size is not UNDEFINED:
1595 if TYPE_CHECKING:
1596 assert isinstance(size, int)
1597 assert isinstance(encode_host_size, int)
1598 encode_host_size = max(size, encode_host_size)
1599 if encode_host_size is UNDEFINED:
1600 encode_host_size = _DEFAULT_ENCODE_SIZE
1602 _encode_host = lru_cache(encode_host_size)(_encode_host.__wrapped__)
1603 _idna_decode = lru_cache(idna_decode_size)(_idna_decode.__wrapped__)
1604 _idna_encode = lru_cache(idna_encode_size)(_idna_encode.__wrapped__)