Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/yarl/_url.py: 45%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import math
2import sys
3import warnings
4from collections.abc import Mapping, Sequence
5from contextlib import suppress
6from functools import _CacheInfo, lru_cache
7from ipaddress import ip_address
8from typing import (
9 TYPE_CHECKING,
10 Any,
11 Callable,
12 Iterable,
13 Iterator,
14 List,
15 Tuple,
16 TypedDict,
17 TypeVar,
18 Union,
19 overload,
20)
21from urllib.parse import (
22 SplitResult,
23 parse_qsl,
24 quote,
25 urlsplit,
26 urlunsplit,
27 uses_netloc,
28 uses_relative,
29)
31import idna
32from multidict import MultiDict, MultiDictProxy
34from ._helpers import cached_property
35from ._quoting import _Quoter, _Unquoter
37DEFAULT_PORTS = {"http": 80, "https": 443, "ws": 80, "wss": 443}
38USES_AUTHORITY = frozenset(uses_netloc)
39USES_RELATIVE = frozenset(uses_relative)
41sentinel = object()
43SimpleQuery = Union[str, int, float]
44QueryVariable = Union[SimpleQuery, "Sequence[SimpleQuery]"]
45Query = Union[
46 None, str, "Mapping[str, QueryVariable]", "Sequence[Tuple[str, QueryVariable]]"
47]
48_T = TypeVar("_T")
50if sys.version_info >= (3, 11):
51 from typing import Self
52else:
53 Self = Any
56class CacheInfo(TypedDict):
57 """Host encoding cache."""
59 idna_encode: _CacheInfo
60 idna_decode: _CacheInfo
61 ip_address: _CacheInfo
64class _SplitResultDict(TypedDict, total=False):
66 scheme: str
67 netloc: str
68 path: str
69 query: str
70 fragment: str
73class _InternalURLCache(TypedDict, total=False):
75 absolute: bool
76 scheme: str
77 raw_authority: str
78 _default_port: Union[int, None]
79 _port_not_default: Union[int, None]
80 authority: str
81 raw_user: Union[str, None]
82 user: Union[str, None]
83 raw_password: Union[str, None]
84 password: Union[str, None]
85 raw_host: Union[str, None]
86 host: Union[str, None]
87 port: Union[int, None]
88 explicit_port: Union[int, None]
89 raw_path: str
90 path: str
91 query: "MultiDictProxy[str]"
92 raw_query_string: str
93 query_string: str
94 path_qs: str
95 raw_path_qs: str
96 raw_fragment: str
97 fragment: str
98 raw_parts: Tuple[str, ...]
99 parts: Tuple[str, ...]
100 parent: "URL"
101 raw_name: str
102 name: str
103 raw_suffix: str
104 suffix: str
105 raw_suffixes: Tuple[str, ...]
106 suffixes: Tuple[str, ...]
109def rewrite_module(obj: _T) -> _T:
110 obj.__module__ = "yarl"
111 return obj
114def _normalize_path_segments(segments: "Sequence[str]") -> List[str]:
115 """Drop '.' and '..' from a sequence of str segments"""
117 resolved_path: List[str] = []
119 for seg in segments:
120 if seg == "..":
121 # ignore any .. segments that would otherwise cause an
122 # IndexError when popped from resolved_path if
123 # resolving for rfc3986
124 with suppress(IndexError):
125 resolved_path.pop()
126 elif seg != ".":
127 resolved_path.append(seg)
129 if segments and segments[-1] in (".", ".."):
130 # do some post-processing here.
131 # if the last segment was a relative dir,
132 # then we need to append the trailing '/'
133 resolved_path.append("")
135 return resolved_path
138@rewrite_module
139class URL:
140 # Don't derive from str
141 # follow pathlib.Path design
142 # probably URL will not suffer from pathlib problems:
143 # it's intended for libraries like aiohttp,
144 # not to be passed into standard library functions like os.open etc.
146 # URL grammar (RFC 3986)
147 # pct-encoded = "%" HEXDIG HEXDIG
148 # reserved = gen-delims / sub-delims
149 # gen-delims = ":" / "/" / "?" / "#" / "[" / "]" / "@"
150 # sub-delims = "!" / "$" / "&" / "'" / "(" / ")"
151 # / "*" / "+" / "," / ";" / "="
152 # unreserved = ALPHA / DIGIT / "-" / "." / "_" / "~"
153 # URI = scheme ":" hier-part [ "?" query ] [ "#" fragment ]
154 # hier-part = "//" authority path-abempty
155 # / path-absolute
156 # / path-rootless
157 # / path-empty
158 # scheme = ALPHA *( ALPHA / DIGIT / "+" / "-" / "." )
159 # authority = [ userinfo "@" ] host [ ":" port ]
160 # userinfo = *( unreserved / pct-encoded / sub-delims / ":" )
161 # host = IP-literal / IPv4address / reg-name
162 # IP-literal = "[" ( IPv6address / IPvFuture ) "]"
163 # IPvFuture = "v" 1*HEXDIG "." 1*( unreserved / sub-delims / ":" )
164 # IPv6address = 6( h16 ":" ) ls32
165 # / "::" 5( h16 ":" ) ls32
166 # / [ h16 ] "::" 4( h16 ":" ) ls32
167 # / [ *1( h16 ":" ) h16 ] "::" 3( h16 ":" ) ls32
168 # / [ *2( h16 ":" ) h16 ] "::" 2( h16 ":" ) ls32
169 # / [ *3( h16 ":" ) h16 ] "::" h16 ":" ls32
170 # / [ *4( h16 ":" ) h16 ] "::" ls32
171 # / [ *5( h16 ":" ) h16 ] "::" h16
172 # / [ *6( h16 ":" ) h16 ] "::"
173 # ls32 = ( h16 ":" h16 ) / IPv4address
174 # ; least-significant 32 bits of address
175 # h16 = 1*4HEXDIG
176 # ; 16 bits of address represented in hexadecimal
177 # IPv4address = dec-octet "." dec-octet "." dec-octet "." dec-octet
178 # dec-octet = DIGIT ; 0-9
179 # / %x31-39 DIGIT ; 10-99
180 # / "1" 2DIGIT ; 100-199
181 # / "2" %x30-34 DIGIT ; 200-249
182 # / "25" %x30-35 ; 250-255
183 # reg-name = *( unreserved / pct-encoded / sub-delims )
184 # port = *DIGIT
185 # path = path-abempty ; begins with "/" or is empty
186 # / path-absolute ; begins with "/" but not "//"
187 # / path-noscheme ; begins with a non-colon segment
188 # / path-rootless ; begins with a segment
189 # / path-empty ; zero characters
190 # path-abempty = *( "/" segment )
191 # path-absolute = "/" [ segment-nz *( "/" segment ) ]
192 # path-noscheme = segment-nz-nc *( "/" segment )
193 # path-rootless = segment-nz *( "/" segment )
194 # path-empty = 0<pchar>
195 # segment = *pchar
196 # segment-nz = 1*pchar
197 # segment-nz-nc = 1*( unreserved / pct-encoded / sub-delims / "@" )
198 # ; non-zero-length segment without any colon ":"
199 # pchar = unreserved / pct-encoded / sub-delims / ":" / "@"
200 # query = *( pchar / "/" / "?" )
201 # fragment = *( pchar / "/" / "?" )
202 # URI-reference = URI / relative-ref
203 # relative-ref = relative-part [ "?" query ] [ "#" fragment ]
204 # relative-part = "//" authority path-abempty
205 # / path-absolute
206 # / path-noscheme
207 # / path-empty
208 # absolute-URI = scheme ":" hier-part [ "?" query ]
209 __slots__ = ("_cache", "_val")
211 _QUOTER = _Quoter(requote=False)
212 _REQUOTER = _Quoter()
213 _PATH_QUOTER = _Quoter(safe="@:", protected="/+", requote=False)
214 _PATH_REQUOTER = _Quoter(safe="@:", protected="/+")
215 _QUERY_QUOTER = _Quoter(safe="?/:@", protected="=+&;", qs=True, requote=False)
216 _QUERY_REQUOTER = _Quoter(safe="?/:@", protected="=+&;", qs=True)
217 _QUERY_PART_QUOTER = _Quoter(safe="?/:@", qs=True, requote=False)
218 _FRAGMENT_QUOTER = _Quoter(safe="?/:@", requote=False)
219 _FRAGMENT_REQUOTER = _Quoter(safe="?/:@")
221 _UNQUOTER = _Unquoter()
222 _PATH_UNQUOTER = _Unquoter(ignore="/", unsafe="+")
223 _QS_UNQUOTER = _Unquoter(qs=True)
225 _val: SplitResult
227 def __new__(
228 cls,
229 val: Union[str, SplitResult, "URL"] = "",
230 *,
231 encoded: bool = False,
232 strict: Union[bool, None] = None,
233 ) -> Self:
234 if strict is not None: # pragma: no cover
235 warnings.warn("strict parameter is ignored")
236 if type(val) is cls:
237 return val
238 if type(val) is str:
239 val = urlsplit(val)
240 elif type(val) is SplitResult:
241 if not encoded:
242 raise ValueError("Cannot apply decoding to SplitResult")
243 elif isinstance(val, str):
244 val = urlsplit(str(val))
245 else:
246 raise TypeError("Constructor parameter should be str")
248 cache: _InternalURLCache = {}
249 if not encoded:
250 host: Union[str, None]
251 scheme, netloc, path, query, fragment = val
252 if not netloc: # netloc
253 host = ""
254 else:
255 username, password, host, port = cls._split_netloc(val[1])
256 if host is None:
257 raise ValueError("Invalid URL: host is required for absolute urls")
258 host = cls._encode_host(host)
259 raw_user = None if username is None else cls._REQUOTER(username)
260 raw_password = None if password is None else cls._REQUOTER(password)
261 netloc = cls._make_netloc(
262 raw_user, raw_password, host, port, encode_host=False
263 )
264 if "[" in host:
265 # Our host encoder adds back brackets for IPv6 addresses
266 # so we need to remove them here to get the raw host
267 _, _, bracketed = host.partition("[")
268 raw_host, _, _ = bracketed.partition("]")
269 else:
270 raw_host = host
271 cache["raw_host"] = raw_host
272 cache["raw_user"] = raw_user
273 cache["raw_password"] = raw_password
274 cache["explicit_port"] = port
276 if path:
277 path = cls._PATH_REQUOTER(path)
278 if netloc:
279 path = cls._normalize_path(path)
281 cls._validate_authority_uri_abs_path(host=host, path=path)
282 query = cls._QUERY_REQUOTER(query) if query else query
283 fragment = cls._FRAGMENT_REQUOTER(fragment) if fragment else fragment
284 cache["scheme"] = scheme
285 cache["raw_query_string"] = query
286 cache["raw_fragment"] = fragment
287 val = SplitResult(scheme, netloc, path, query, fragment)
289 self = object.__new__(cls)
290 self._val = val
291 self._cache = cache
292 return self
294 @classmethod
295 def build(
296 cls,
297 *,
298 scheme: str = "",
299 authority: str = "",
300 user: Union[str, None] = None,
301 password: Union[str, None] = None,
302 host: str = "",
303 port: Union[int, None] = None,
304 path: str = "",
305 query: Union[Query, None] = None,
306 query_string: str = "",
307 fragment: str = "",
308 encoded: bool = False,
309 ) -> "URL":
310 """Creates and returns a new URL"""
312 if authority and (user or password or host or port):
313 raise ValueError(
314 'Can\'t mix "authority" with "user", "password", "host" or "port".'
315 )
316 if not isinstance(port, (int, type(None))):
317 raise TypeError("The port is required to be int.")
318 if port and not host:
319 raise ValueError('Can\'t build URL with "port" but without "host".')
320 if query and query_string:
321 raise ValueError('Only one of "query" or "query_string" should be passed')
322 if (
323 scheme is None
324 or authority is None
325 or host is None
326 or path is None
327 or query_string is None
328 or fragment is None
329 ):
330 raise TypeError(
331 'NoneType is illegal for "scheme", "authority", "host", "path", '
332 '"query_string", and "fragment" args, use empty string instead.'
333 )
335 if authority:
336 if encoded:
337 netloc = authority
338 else:
339 tmp = SplitResult("", authority, "", "", "")
340 port = None if tmp.port == DEFAULT_PORTS.get(scheme) else tmp.port
341 netloc = cls._make_netloc(
342 tmp.username, tmp.password, tmp.hostname, port, encode=True
343 )
344 elif not user and not password and not host and not port:
345 netloc = ""
346 else:
347 port = None if port == DEFAULT_PORTS.get(scheme) else port
348 netloc = cls._make_netloc(
349 user, password, host, port, encode=not encoded, encode_host=not encoded
350 )
351 if not encoded:
352 path = cls._PATH_QUOTER(path) if path else path
353 if path and netloc:
354 path = cls._normalize_path(path)
356 cls._validate_authority_uri_abs_path(host=host, path=path)
357 query_string = (
358 cls._QUERY_QUOTER(query_string) if query_string else query_string
359 )
360 fragment = cls._FRAGMENT_QUOTER(fragment) if fragment else fragment
362 url = cls(
363 SplitResult(scheme, netloc, path, query_string, fragment), encoded=True
364 )
366 if query:
367 return url.with_query(query)
368 return url
370 def __init_subclass__(cls):
371 raise TypeError(f"Inheriting a class {cls!r} from URL is forbidden")
373 def __str__(self) -> str:
374 val = self._val
375 if not val.path and self.absolute and (val.query or val.fragment):
376 val = val._replace(path="/")
377 if (port := self._port_not_default) is None:
378 # port normalization - using None for default ports to remove from rendering
379 # https://datatracker.ietf.org/doc/html/rfc3986.html#section-6.2.3
380 val = val._replace(
381 netloc=self._make_netloc(
382 self.raw_user,
383 self.raw_password,
384 self.raw_host,
385 port,
386 encode_host=False,
387 )
388 )
389 return urlunsplit(val)
391 def __repr__(self) -> str:
392 return f"{self.__class__.__name__}('{str(self)}')"
394 def __bytes__(self) -> bytes:
395 return str(self).encode("ascii")
397 def __eq__(self, other: object) -> bool:
398 if type(other) is not URL:
399 return NotImplemented
401 val1 = self._val
402 if not val1.path and self.absolute:
403 val1 = val1._replace(path="/")
405 val2 = other._val
406 if not val2.path and other.absolute:
407 val2 = val2._replace(path="/")
409 return val1 == val2
411 def __hash__(self) -> int:
412 ret = self._cache.get("hash")
413 if ret is None:
414 val = self._val
415 if not val.path and self.absolute:
416 val = val._replace(path="/")
417 ret = self._cache["hash"] = hash(val)
418 return ret
420 def __le__(self, other: object) -> bool:
421 if type(other) is not URL:
422 return NotImplemented
423 return self._val <= other._val
425 def __lt__(self, other: object) -> bool:
426 if type(other) is not URL:
427 return NotImplemented
428 return self._val < other._val
430 def __ge__(self, other: object) -> bool:
431 if type(other) is not URL:
432 return NotImplemented
433 return self._val >= other._val
435 def __gt__(self, other: object) -> bool:
436 if type(other) is not URL:
437 return NotImplemented
438 return self._val > other._val
440 def __truediv__(self, name: str) -> "URL":
441 if not isinstance(name, str):
442 return NotImplemented
443 return self._make_child((str(name),))
445 def __mod__(self, query: Query) -> "URL":
446 return self.update_query(query)
448 def __bool__(self) -> bool:
449 return bool(
450 self._val.netloc or self._val.path or self._val.query or self._val.fragment
451 )
453 def __getstate__(self) -> Tuple[SplitResult]:
454 return (self._val,)
456 def __setstate__(self, state):
457 if state[0] is None and isinstance(state[1], dict):
458 # default style pickle
459 self._val = state[1]["_val"]
460 else:
461 self._val, *unused = state
462 self._cache = {}
464 def _cache_netloc(self) -> None:
465 """Cache the netloc parts of the URL."""
466 cache = self._cache
467 (
468 cache["raw_user"],
469 cache["raw_password"],
470 cache["raw_host"],
471 cache["explicit_port"],
472 ) = self._split_netloc(self._val.netloc)
474 def is_absolute(self) -> bool:
475 """A check for absolute URLs.
477 Return True for absolute ones (having scheme or starting
478 with //), False otherwise.
480 Is is preferred to call the .absolute property instead
481 as it is cached.
482 """
483 return self.absolute
485 def is_default_port(self) -> bool:
486 """A check for default port.
488 Return True if port is default for specified scheme,
489 e.g. 'http://python.org' or 'http://python.org:80', False
490 otherwise.
492 Return False for relative URLs.
494 """
495 default = self._default_port
496 explicit = self.explicit_port
497 if explicit is None:
498 # A relative URL does not have an implicit port / default port
499 return default is not None
500 return explicit == default
502 def origin(self) -> "URL":
503 """Return an URL with scheme, host and port parts only.
505 user, password, path, query and fragment are removed.
507 """
508 # TODO: add a keyword-only option for keeping user/pass maybe?
509 if not self.absolute:
510 raise ValueError("URL should be absolute")
511 if not self._val.scheme:
512 raise ValueError("URL should have scheme")
513 v = self._val
514 netloc = self._make_netloc(None, None, v.hostname, v.port)
515 val = v._replace(netloc=netloc, path="", query="", fragment="")
516 return URL(val, encoded=True)
518 def relative(self) -> "URL":
519 """Return a relative part of the URL.
521 scheme, user, password, host and port are removed.
523 """
524 if not self.absolute:
525 raise ValueError("URL should be absolute")
526 val = self._val._replace(scheme="", netloc="")
527 return URL(val, encoded=True)
529 @cached_property
530 def absolute(self) -> bool:
531 """A check for absolute URLs.
533 Return True for absolute ones (having scheme or starting
534 with //), False otherwise.
536 """
537 # `netloc`` is an empty string for relative URLs
538 # Checking `netloc` is faster than checking `hostname`
539 # because `hostname` is a property that does some extra work
540 # to parse the host from the `netloc`
541 return self._val.netloc != ""
543 @cached_property
544 def scheme(self) -> str:
545 """Scheme for absolute URLs.
547 Empty string for relative URLs or URLs starting with //
549 """
550 return self._val.scheme
552 @cached_property
553 def raw_authority(self) -> str:
554 """Encoded authority part of URL.
556 Empty string for relative URLs.
558 """
559 return self._val.netloc
561 @cached_property
562 def _default_port(self) -> Union[int, None]:
563 """Default port for the scheme or None if not known."""
564 return DEFAULT_PORTS.get(self.scheme)
566 @cached_property
567 def _port_not_default(self) -> Union[int, None]:
568 """The port part of URL normalized to None if its the default port."""
569 port = self.port
570 if self._default_port == port:
571 return None
572 return port
574 @cached_property
575 def authority(self) -> str:
576 """Decoded authority part of URL.
578 Empty string for relative URLs.
580 """
581 return self._make_netloc(
582 self.user, self.password, self.host, self.port, encode_host=False
583 )
585 @cached_property
586 def raw_user(self) -> Union[str, None]:
587 """Encoded user part of URL.
589 None if user is missing.
591 """
592 # not .username
593 self._cache_netloc()
594 return self._cache["raw_user"]
596 @cached_property
597 def user(self) -> Union[str, None]:
598 """Decoded user part of URL.
600 None if user is missing.
602 """
603 raw_user = self.raw_user
604 if raw_user is None:
605 return None
606 return self._UNQUOTER(raw_user)
608 @cached_property
609 def raw_password(self) -> Union[str, None]:
610 """Encoded password part of URL.
612 None if password is missing.
614 """
615 self._cache_netloc()
616 return self._cache["raw_password"]
618 @cached_property
619 def password(self) -> Union[str, None]:
620 """Decoded password part of URL.
622 None if password is missing.
624 """
625 raw_password = self.raw_password
626 if raw_password is None:
627 return None
628 return self._UNQUOTER(raw_password)
630 @cached_property
631 def raw_host(self) -> Union[str, None]:
632 """Encoded host part of URL.
634 None for relative URLs.
636 """
637 # Use host instead of hostname for sake of shortness
638 # May add .hostname prop later
639 self._cache_netloc()
640 return self._cache["raw_host"]
642 @cached_property
643 def host(self) -> Union[str, None]:
644 """Decoded host part of URL.
646 None for relative URLs.
648 """
649 raw = self.raw_host
650 if raw is None:
651 return None
652 if "%" in raw:
653 # Hack for scoped IPv6 addresses like
654 # fe80::2%Перевірка
655 # presence of '%' sign means only IPv6 address, so idna is useless.
656 return raw
657 return _idna_decode(raw)
659 @cached_property
660 def port(self) -> Union[int, None]:
661 """Port part of URL, with scheme-based fallback.
663 None for relative URLs or URLs without explicit port and
664 scheme without default port substitution.
666 """
667 return self.explicit_port or self._default_port
669 @cached_property
670 def explicit_port(self) -> Union[int, None]:
671 """Port part of URL, without scheme-based fallback.
673 None for relative URLs or URLs without explicit port.
675 """
676 self._cache_netloc()
677 return self._cache["explicit_port"]
679 @cached_property
680 def raw_path(self) -> str:
681 """Encoded path of URL.
683 / for absolute URLs without path part.
685 """
686 ret = self._val.path
687 if not ret and self.absolute:
688 ret = "/"
689 return ret
691 @cached_property
692 def path(self) -> str:
693 """Decoded path of URL.
695 / for absolute URLs without path part.
697 """
698 return self._PATH_UNQUOTER(self.raw_path)
700 @cached_property
701 def query(self) -> "MultiDictProxy[str]":
702 """A MultiDictProxy representing parsed query parameters in decoded
703 representation.
705 Empty value if URL has no query part.
707 """
708 ret = MultiDict(parse_qsl(self.raw_query_string, keep_blank_values=True))
709 return MultiDictProxy(ret)
711 @cached_property
712 def raw_query_string(self) -> str:
713 """Encoded query part of URL.
715 Empty string if query is missing.
717 """
718 return self._val.query
720 @cached_property
721 def query_string(self) -> str:
722 """Decoded query part of URL.
724 Empty string if query is missing.
726 """
727 return self._QS_UNQUOTER(self.raw_query_string)
729 @cached_property
730 def path_qs(self) -> str:
731 """Decoded path of URL with query."""
732 if not self.query_string:
733 return self.path
734 return f"{self.path}?{self.query_string}"
736 @cached_property
737 def raw_path_qs(self) -> str:
738 """Encoded path of URL with query."""
739 if not self.raw_query_string:
740 return self.raw_path
741 return f"{self.raw_path}?{self.raw_query_string}"
743 @cached_property
744 def raw_fragment(self) -> str:
745 """Encoded fragment part of URL.
747 Empty string if fragment is missing.
749 """
750 return self._val.fragment
752 @cached_property
753 def fragment(self) -> str:
754 """Decoded fragment part of URL.
756 Empty string if fragment is missing.
758 """
759 return self._UNQUOTER(self.raw_fragment)
761 @cached_property
762 def raw_parts(self) -> Tuple[str, ...]:
763 """A tuple containing encoded *path* parts.
765 ('/',) for absolute URLs if *path* is missing.
767 """
768 path = self._val.path
769 if self.absolute:
770 if not path:
771 parts = ["/"]
772 else:
773 parts = ["/"] + path[1:].split("/")
774 else:
775 if path.startswith("/"):
776 parts = ["/"] + path[1:].split("/")
777 else:
778 parts = path.split("/")
779 return tuple(parts)
781 @cached_property
782 def parts(self) -> Tuple[str, ...]:
783 """A tuple containing decoded *path* parts.
785 ('/',) for absolute URLs if *path* is missing.
787 """
788 return tuple(self._UNQUOTER(part) for part in self.raw_parts)
790 @cached_property
791 def parent(self) -> "URL":
792 """A new URL with last part of path removed and cleaned up query and
793 fragment.
795 """
796 path = self.raw_path
797 if not path or path == "/":
798 if self.raw_fragment or self.raw_query_string:
799 return URL(self._val._replace(query="", fragment=""), encoded=True)
800 return self
801 parts = path.split("/")
802 val = self._val._replace(path="/".join(parts[:-1]), query="", fragment="")
803 return URL(val, encoded=True)
805 @cached_property
806 def raw_name(self) -> str:
807 """The last part of raw_parts."""
808 parts = self.raw_parts
809 if self.absolute:
810 parts = parts[1:]
811 if not parts:
812 return ""
813 else:
814 return parts[-1]
815 else:
816 return parts[-1]
818 @cached_property
819 def name(self) -> str:
820 """The last part of parts."""
821 return self._UNQUOTER(self.raw_name)
823 @cached_property
824 def raw_suffix(self) -> str:
825 name = self.raw_name
826 i = name.rfind(".")
827 if 0 < i < len(name) - 1:
828 return name[i:]
829 else:
830 return ""
832 @cached_property
833 def suffix(self) -> str:
834 return self._UNQUOTER(self.raw_suffix)
836 @cached_property
837 def raw_suffixes(self) -> Tuple[str, ...]:
838 name = self.raw_name
839 if name.endswith("."):
840 return ()
841 name = name.lstrip(".")
842 return tuple("." + suffix for suffix in name.split(".")[1:])
844 @cached_property
845 def suffixes(self) -> Tuple[str, ...]:
846 return tuple(self._UNQUOTER(suffix) for suffix in self.raw_suffixes)
848 @staticmethod
849 def _validate_authority_uri_abs_path(host: str, path: str) -> None:
850 """Ensure that path in URL with authority starts with a leading slash.
852 Raise ValueError if not.
853 """
854 if host and path and not path.startswith("/"):
855 raise ValueError(
856 "Path in a URL with authority should start with a slash ('/') if set"
857 )
859 def _make_child(self, paths: "Sequence[str]", encoded: bool = False) -> "URL":
860 """
861 add paths to self._val.path, accounting for absolute vs relative paths,
862 keep existing, but do not create new, empty segments
863 """
864 parsed = []
865 for idx, path in enumerate(reversed(paths)):
866 # empty segment of last is not removed
867 last = idx == 0
868 if path and path[0] == "/":
869 raise ValueError(
870 f"Appending path {path!r} starting from slash is forbidden"
871 )
872 path = path if encoded else self._PATH_QUOTER(path)
873 segments = list(reversed(path.split("/")))
874 # remove trailing empty segment for all but the last path
875 segment_slice_start = int(not last and segments[0] == "")
876 parsed += segments[segment_slice_start:]
877 parsed.reverse()
879 if self._val.path and (old_path_segments := self._val.path.split("/")):
880 old_path_cutoff = -1 if old_path_segments[-1] == "" else None
881 parsed = [*old_path_segments[:old_path_cutoff], *parsed]
883 if self.absolute:
884 parsed = _normalize_path_segments(parsed)
885 if parsed and parsed[0] != "":
886 # inject a leading slash when adding a path to an absolute URL
887 # where there was none before
888 parsed = ["", *parsed]
889 new_path = "/".join(parsed)
890 return URL(
891 self._val._replace(path=new_path, query="", fragment=""), encoded=True
892 )
894 @classmethod
895 def _normalize_path(cls, path: str) -> str:
896 # Drop '.' and '..' from str path
898 prefix = ""
899 if path.startswith("/"):
900 # preserve the "/" root element of absolute paths, copying it to the
901 # normalised output as per sections 5.2.4 and 6.2.2.3 of rfc3986.
902 prefix = "/"
903 path = path[1:]
905 segments = path.split("/")
906 return prefix + "/".join(_normalize_path_segments(segments))
908 @classmethod
909 def _encode_host(cls, host: str, human: bool = False) -> str:
910 if "%" in host:
911 raw_ip, sep, zone = host.partition("%")
912 else:
913 raw_ip = host
914 sep = zone = ""
916 if raw_ip and raw_ip[-1].isdigit() or ":" in raw_ip:
917 # Might be an IP address, check it
918 #
919 # IP Addresses can look like:
920 # https://datatracker.ietf.org/doc/html/rfc3986#section-3.2.2
921 # - 127.0.0.1 (last character is a digit)
922 # - 2001:db8::ff00:42:8329 (contains a colon)
923 # - 2001:db8::ff00:42:8329%eth0 (contains a colon)
924 # - [2001:db8::ff00:42:8329] (contains a colon)
925 # Rare IP Address formats are not supported per:
926 # https://datatracker.ietf.org/doc/html/rfc3986#section-7.4
927 #
928 # We try to avoid parsing IP addresses as much as possible
929 # since its orders of magnitude slower than almost any other operation
930 # this library does.
931 #
932 # IP parsing is slow, so its wrapped in an LRU
933 try:
934 ip_compressed_version = _ip_compressed_version(raw_ip)
935 except ValueError:
936 pass
937 else:
938 # These checks should not happen in the
939 # LRU to keep the cache size small
940 host, version = ip_compressed_version
941 if sep:
942 host += "%" + zone
943 if version == 6:
944 return f"[{host}]"
945 return host
947 host = host.lower()
948 # IDNA encoding is slow,
949 # skip it for ASCII-only strings
950 # Don't move the check into _idna_encode() helper
951 # to reduce the cache size
952 if human or host.isascii():
953 return host
954 return _idna_encode(host)
956 @classmethod
957 def _make_netloc(
958 cls,
959 user: Union[str, None],
960 password: Union[str, None],
961 host: Union[str, None],
962 port: Union[int, None],
963 encode: bool = False,
964 encode_host: bool = True,
965 requote: bool = False,
966 ) -> str:
967 if host is None:
968 return ""
969 quoter = cls._REQUOTER if requote else cls._QUOTER
970 if encode_host:
971 ret = cls._encode_host(host)
972 else:
973 ret = host
974 if port is not None:
975 ret = f"{ret}:{port}"
976 if password is not None:
977 if not user:
978 user = ""
979 else:
980 if encode:
981 user = quoter(user)
982 if encode:
983 password = quoter(password)
984 user = user + ":" + password
985 elif user and encode:
986 user = quoter(user)
987 if user:
988 ret = user + "@" + ret
989 return ret
991 @classmethod
992 @lru_cache # match the same size as urlsplit
993 def _split_netloc(
994 cls,
995 netloc: str,
996 ) -> Tuple[Union[str, None], Union[str, None], Union[str, None], Union[int, None]]:
997 """Split netloc into username, password, host and port."""
998 if "@" not in netloc:
999 username: Union[str, None] = None
1000 password: Union[str, None] = None
1001 hostinfo = netloc
1002 else:
1003 userinfo, _, hostinfo = netloc.rpartition("@")
1004 username, have_password, password = userinfo.partition(":")
1005 if not have_password:
1006 password = None
1008 if "[" in hostinfo:
1009 _, _, bracketed = hostinfo.partition("[")
1010 hostname, _, port_str = bracketed.partition("]")
1011 _, _, port_str = port_str.partition(":")
1012 else:
1013 hostname, _, port_str = hostinfo.partition(":")
1015 if not port_str:
1016 port: Union[int, None] = None
1017 else:
1018 try:
1019 port = int(port_str)
1020 except ValueError:
1021 raise ValueError("Invalid URL: port can't be converted to integer")
1022 if not (0 <= port <= 65535):
1023 raise ValueError("Port out of range 0-65535")
1025 return username or None, password, hostname or None, port
1027 def with_scheme(self, scheme: str) -> "URL":
1028 """Return a new URL with scheme replaced."""
1029 # N.B. doesn't cleanup query/fragment
1030 if not isinstance(scheme, str):
1031 raise TypeError("Invalid scheme type")
1032 if not self.absolute:
1033 raise ValueError("scheme replacement is not allowed for relative URLs")
1034 return URL(self._val._replace(scheme=scheme.lower()), encoded=True)
1036 def with_user(self, user: Union[str, None]) -> "URL":
1037 """Return a new URL with user replaced.
1039 Autoencode user if needed.
1041 Clear user/password if user is None.
1043 """
1044 # N.B. doesn't cleanup query/fragment
1045 val = self._val
1046 if user is None:
1047 password = None
1048 elif isinstance(user, str):
1049 user = self._QUOTER(user)
1050 password = val.password
1051 else:
1052 raise TypeError("Invalid user type")
1053 if not self.absolute:
1054 raise ValueError("user replacement is not allowed for relative URLs")
1055 return URL(
1056 self._val._replace(
1057 netloc=self._make_netloc(user, password, val.hostname, val.port)
1058 ),
1059 encoded=True,
1060 )
1062 def with_password(self, password: Union[str, None]) -> "URL":
1063 """Return a new URL with password replaced.
1065 Autoencode password if needed.
1067 Clear password if argument is None.
1069 """
1070 # N.B. doesn't cleanup query/fragment
1071 if password is None:
1072 pass
1073 elif isinstance(password, str):
1074 password = self._QUOTER(password)
1075 else:
1076 raise TypeError("Invalid password type")
1077 if not self.absolute:
1078 raise ValueError("password replacement is not allowed for relative URLs")
1079 val = self._val
1080 return URL(
1081 self._val._replace(
1082 netloc=self._make_netloc(val.username, password, val.hostname, val.port)
1083 ),
1084 encoded=True,
1085 )
1087 def with_host(self, host: str) -> "URL":
1088 """Return a new URL with host replaced.
1090 Autoencode host if needed.
1092 Changing host for relative URLs is not allowed, use .join()
1093 instead.
1095 """
1096 # N.B. doesn't cleanup query/fragment
1097 if not isinstance(host, str):
1098 raise TypeError("Invalid host type")
1099 if not self.absolute:
1100 raise ValueError("host replacement is not allowed for relative URLs")
1101 if not host:
1102 raise ValueError("host removing is not allowed")
1103 val = self._val
1104 return URL(
1105 self._val._replace(
1106 netloc=self._make_netloc(val.username, val.password, host, val.port)
1107 ),
1108 encoded=True,
1109 )
1111 def with_port(self, port: Union[int, None]) -> "URL":
1112 """Return a new URL with port replaced.
1114 Clear port to default if None is passed.
1116 """
1117 # N.B. doesn't cleanup query/fragment
1118 if port is not None:
1119 if isinstance(port, bool) or not isinstance(port, int):
1120 raise TypeError(f"port should be int or None, got {type(port)}")
1121 if port < 0 or port > 65535:
1122 raise ValueError(f"port must be between 0 and 65535, got {port}")
1123 if not self.absolute:
1124 raise ValueError("port replacement is not allowed for relative URLs")
1125 val = self._val
1126 return URL(
1127 self._val._replace(
1128 netloc=self._make_netloc(val.username, val.password, val.hostname, port)
1129 ),
1130 encoded=True,
1131 )
1133 def with_path(self, path: str, *, encoded: bool = False) -> "URL":
1134 """Return a new URL with path replaced."""
1135 if not encoded:
1136 path = self._PATH_QUOTER(path)
1137 if self.absolute:
1138 path = self._normalize_path(path)
1139 if len(path) > 0 and path[0] != "/":
1140 path = "/" + path
1141 return URL(self._val._replace(path=path, query="", fragment=""), encoded=True)
1143 @classmethod
1144 def _query_seq_pairs(
1145 cls, quoter: Callable[[str], str], pairs: Iterable[Tuple[str, QueryVariable]]
1146 ) -> Iterator[str]:
1147 for key, val in pairs:
1148 if isinstance(val, (list, tuple)):
1149 for v in val:
1150 yield quoter(key) + "=" + quoter(cls._query_var(v))
1151 else:
1152 yield quoter(key) + "=" + quoter(cls._query_var(val))
1154 @staticmethod
1155 def _query_var(v: QueryVariable) -> str:
1156 cls = type(v)
1157 if issubclass(cls, str):
1158 if TYPE_CHECKING:
1159 assert isinstance(v, str)
1160 return v
1161 if issubclass(cls, float):
1162 if TYPE_CHECKING:
1163 assert isinstance(v, float)
1164 if math.isinf(v):
1165 raise ValueError("float('inf') is not supported")
1166 if math.isnan(v):
1167 raise ValueError("float('nan') is not supported")
1168 return str(float(v))
1169 if issubclass(cls, int) and cls is not bool:
1170 if TYPE_CHECKING:
1171 assert isinstance(v, int)
1172 return str(int(v))
1173 raise TypeError(
1174 "Invalid variable type: value "
1175 "should be str, int or float, got {!r} "
1176 "of type {}".format(v, cls)
1177 )
1179 def _get_str_query(self, *args: Any, **kwargs: Any) -> Union[str, None]:
1180 query: Union[str, Mapping[str, QueryVariable], None]
1181 if kwargs:
1182 if len(args) > 0:
1183 raise ValueError(
1184 "Either kwargs or single query parameter must be present"
1185 )
1186 query = kwargs
1187 elif len(args) == 1:
1188 query = args[0]
1189 else:
1190 raise ValueError("Either kwargs or single query parameter must be present")
1192 if query is None:
1193 query = None
1194 elif isinstance(query, Mapping):
1195 quoter = self._QUERY_PART_QUOTER
1196 query = "&".join(self._query_seq_pairs(quoter, query.items()))
1197 elif isinstance(query, str):
1198 query = self._QUERY_QUOTER(query)
1199 elif isinstance(query, (bytes, bytearray, memoryview)):
1200 raise TypeError(
1201 "Invalid query type: bytes, bytearray and memoryview are forbidden"
1202 )
1203 elif isinstance(query, Sequence):
1204 quoter = self._QUERY_PART_QUOTER
1205 # We don't expect sequence values if we're given a list of pairs
1206 # already; only mappings like builtin `dict` which can't have the
1207 # same key pointing to multiple values are allowed to use
1208 # `_query_seq_pairs`.
1209 query = "&".join(
1210 quoter(k) + "=" + quoter(self._query_var(v)) for k, v in query
1211 )
1212 else:
1213 raise TypeError(
1214 "Invalid query type: only str, mapping or "
1215 "sequence of (key, value) pairs is allowed"
1216 )
1218 return query
1220 @overload
1221 def with_query(self, query: Query) -> "URL": ...
1223 @overload
1224 def with_query(self, **kwargs: QueryVariable) -> "URL": ...
1226 def with_query(self, *args: Any, **kwargs: Any) -> "URL":
1227 """Return a new URL with query part replaced.
1229 Accepts any Mapping (e.g. dict, multidict.MultiDict instances)
1230 or str, autoencode the argument if needed.
1232 A sequence of (key, value) pairs is supported as well.
1234 It also can take an arbitrary number of keyword arguments.
1236 Clear query if None is passed.
1238 """
1239 # N.B. doesn't cleanup query/fragment
1241 new_query = self._get_str_query(*args, **kwargs) or ""
1242 return URL(
1243 self._val._replace(path=self._val.path, query=new_query), encoded=True
1244 )
1246 @overload
1247 def update_query(self, query: Query) -> "URL": ...
1249 @overload
1250 def update_query(self, **kwargs: QueryVariable) -> "URL": ...
1252 def update_query(self, *args: Any, **kwargs: Any) -> "URL":
1253 """Return a new URL with query part updated."""
1254 s = self._get_str_query(*args, **kwargs)
1255 query = None
1256 if s is not None:
1257 new_query = MultiDict(parse_qsl(s, keep_blank_values=True))
1258 query = MultiDict(self.query)
1259 query.update(new_query)
1261 return URL(
1262 self._val._replace(query=self._get_str_query(query) or ""), encoded=True
1263 )
1265 def without_query_params(self, *query_params: str) -> "URL":
1266 """Remove some keys from query part and return new URL."""
1267 params_to_remove = set(query_params) & self.query.keys()
1268 if not params_to_remove:
1269 return self
1270 return self.with_query(
1271 tuple(
1272 (name, value)
1273 for name, value in self.query.items()
1274 if name not in params_to_remove
1275 )
1276 )
1278 def with_fragment(self, fragment: Union[str, None]) -> "URL":
1279 """Return a new URL with fragment replaced.
1281 Autoencode fragment if needed.
1283 Clear fragment to default if None is passed.
1285 """
1286 # N.B. doesn't cleanup query/fragment
1287 if fragment is None:
1288 raw_fragment = ""
1289 elif not isinstance(fragment, str):
1290 raise TypeError("Invalid fragment type")
1291 else:
1292 raw_fragment = self._FRAGMENT_QUOTER(fragment)
1293 if self.raw_fragment == raw_fragment:
1294 return self
1295 return URL(self._val._replace(fragment=raw_fragment), encoded=True)
1297 def with_name(self, name: str) -> "URL":
1298 """Return a new URL with name (last part of path) replaced.
1300 Query and fragment parts are cleaned up.
1302 Name is encoded if needed.
1304 """
1305 # N.B. DOES cleanup query/fragment
1306 if not isinstance(name, str):
1307 raise TypeError("Invalid name type")
1308 if "/" in name:
1309 raise ValueError("Slash in name is not allowed")
1310 name = self._PATH_QUOTER(name)
1311 if name in (".", ".."):
1312 raise ValueError(". and .. values are forbidden")
1313 parts = list(self.raw_parts)
1314 if self.absolute:
1315 if len(parts) == 1:
1316 parts.append(name)
1317 else:
1318 parts[-1] = name
1319 parts[0] = "" # replace leading '/'
1320 else:
1321 parts[-1] = name
1322 if parts[0] == "/":
1323 parts[0] = "" # replace leading '/'
1324 return URL(
1325 self._val._replace(path="/".join(parts), query="", fragment=""),
1326 encoded=True,
1327 )
1329 def with_suffix(self, suffix: str) -> "URL":
1330 """Return a new URL with suffix (file extension of name) replaced.
1332 Query and fragment parts are cleaned up.
1334 suffix is encoded if needed.
1335 """
1336 if not isinstance(suffix, str):
1337 raise TypeError("Invalid suffix type")
1338 if suffix and not suffix.startswith(".") or suffix == ".":
1339 raise ValueError(f"Invalid suffix {suffix!r}")
1340 name = self.raw_name
1341 if not name:
1342 raise ValueError(f"{self!r} has an empty name")
1343 old_suffix = self.raw_suffix
1344 if not old_suffix:
1345 name = name + suffix
1346 else:
1347 name = name[: -len(old_suffix)] + suffix
1348 return self.with_name(name)
1350 def join(self, url: "URL") -> "URL":
1351 """Join URLs
1353 Construct a full (“absolute”) URL by combining a “base URL”
1354 (self) with another URL (url).
1356 Informally, this uses components of the base URL, in
1357 particular the addressing scheme, the network location and
1358 (part of) the path, to provide missing components in the
1359 relative URL.
1361 """
1362 if type(url) is not URL:
1363 raise TypeError("url should be URL")
1364 val = self._val
1365 other_val = url._val
1366 scheme = other_val.scheme or val.scheme
1368 if scheme != val.scheme or scheme not in USES_RELATIVE:
1369 return url
1371 # scheme is in uses_authority as uses_authority is a superset of uses_relative
1372 if other_val.netloc and scheme in USES_AUTHORITY:
1373 return URL(other_val._replace(scheme=scheme), encoded=True)
1375 parts: _SplitResultDict = {"scheme": scheme}
1376 if other_val.path or other_val.fragment:
1377 parts["fragment"] = other_val.fragment
1378 if other_val.path or other_val.query:
1379 parts["query"] = other_val.query
1381 if not other_val.path:
1382 return URL(val._replace(**parts), encoded=True)
1384 if other_val.path[0] == "/":
1385 path = other_val.path
1386 elif not val.path:
1387 path = f"/{other_val.path}"
1388 elif val.path[-1] == "/":
1389 path = f"{val.path}{other_val.path}"
1390 else:
1391 # …
1392 # and relativizing ".."
1393 # parts[0] is / for absolute urls, this join will add a double slash there
1394 path = "/".join([*self.parts[:-1], ""])
1395 path += other_val.path
1396 # which has to be removed
1397 if val.path[0] == "/":
1398 path = path[1:]
1400 parts["path"] = self._normalize_path(path)
1401 return URL(val._replace(**parts), encoded=True)
1403 def joinpath(self, *other: str, encoded: bool = False) -> "URL":
1404 """Return a new URL with the elements in other appended to the path."""
1405 return self._make_child(other, encoded=encoded)
1407 def human_repr(self) -> str:
1408 """Return decoded human readable string for URL representation."""
1409 user = _human_quote(self.user, "#/:?@[]")
1410 password = _human_quote(self.password, "#/:?@[]")
1411 host = self.host
1412 if host:
1413 host = self._encode_host(host, human=True)
1414 path = _human_quote(self.path, "#?")
1415 if TYPE_CHECKING:
1416 assert path is not None
1417 query_string = "&".join(
1418 "{}={}".format(_human_quote(k, "#&+;="), _human_quote(v, "#&+;="))
1419 for k, v in self.query.items()
1420 )
1421 fragment = _human_quote(self.fragment, "")
1422 if TYPE_CHECKING:
1423 assert fragment is not None
1424 netloc = self._make_netloc(
1425 user, password, host, self.explicit_port, encode_host=False
1426 )
1427 val = SplitResult(self.scheme, netloc, path, query_string, fragment)
1428 return urlunsplit(val)
1431def _human_quote(s: Union[str, None], unsafe: str) -> Union[str, None]:
1432 if not s:
1433 return s
1434 for c in "%" + unsafe:
1435 if c in s:
1436 s = s.replace(c, f"%{ord(c):02X}")
1437 if s.isprintable():
1438 return s
1439 return "".join(c if c.isprintable() else quote(c) for c in s)
1442_MAXCACHE = 256
1445@lru_cache(_MAXCACHE)
1446def _idna_decode(raw: str) -> str:
1447 try:
1448 return idna.decode(raw.encode("ascii"))
1449 except UnicodeError: # e.g. '::1'
1450 return raw.encode("ascii").decode("idna")
1453@lru_cache(_MAXCACHE)
1454def _idna_encode(host: str) -> str:
1455 try:
1456 return idna.encode(host, uts46=True).decode("ascii")
1457 except UnicodeError:
1458 return host.encode("idna").decode("ascii")
1461@lru_cache(_MAXCACHE)
1462def _ip_compressed_version(raw_ip: str) -> Tuple[str, int]:
1463 """Return compressed version of IP address and its version."""
1464 ip = ip_address(raw_ip)
1465 return ip.compressed, ip.version
1468@rewrite_module
1469def cache_clear() -> None:
1470 """Clear all LRU caches."""
1471 _idna_decode.cache_clear()
1472 _idna_encode.cache_clear()
1473 _ip_compressed_version.cache_clear()
1476@rewrite_module
1477def cache_info() -> CacheInfo:
1478 """Report cache statistics."""
1479 return {
1480 "idna_encode": _idna_encode.cache_info(),
1481 "idna_decode": _idna_decode.cache_info(),
1482 "ip_address": _ip_compressed_version.cache_info(),
1483 }
1486@rewrite_module
1487def cache_configure(
1488 *,
1489 idna_encode_size: Union[int, None] = _MAXCACHE,
1490 idna_decode_size: Union[int, None] = _MAXCACHE,
1491 ip_address_size: Union[int, None] = _MAXCACHE,
1492) -> None:
1493 """Configure LRU cache sizes."""
1494 global _idna_decode, _idna_encode, _ip_compressed_version
1496 _idna_encode = lru_cache(idna_encode_size)(_idna_encode.__wrapped__)
1497 _idna_decode = lru_cache(idna_decode_size)(_idna_decode.__wrapped__)
1498 _ip_compressed_version = lru_cache(ip_address_size)(
1499 _ip_compressed_version.__wrapped__
1500 )