Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/yarl/_url.py: 39%
606 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:40 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:40 +0000
1import functools
2import math
3import warnings
4from collections.abc import Mapping, Sequence
5from contextlib import suppress
6from ipaddress import ip_address
7from urllib.parse import SplitResult, parse_qsl, quote, urljoin, urlsplit, urlunsplit
9import idna
10from multidict import MultiDict, MultiDictProxy
12from ._quoting import _Quoter, _Unquoter
14DEFAULT_PORTS = {"http": 80, "https": 443, "ws": 80, "wss": 443}
16sentinel = object()
19def rewrite_module(obj: object) -> object:
20 obj.__module__ = "yarl"
21 return obj
24class cached_property:
25 """Use as a class method decorator. It operates almost exactly like
26 the Python `@property` decorator, but it puts the result of the
27 method it decorates into the instance dict after the first call,
28 effectively replacing the function it decorates with an instance
29 variable. It is, in Python parlance, a data descriptor.
31 """
33 def __init__(self, wrapped):
34 self.wrapped = wrapped
35 try:
36 self.__doc__ = wrapped.__doc__
37 except AttributeError: # pragma: no cover
38 self.__doc__ = ""
39 self.name = wrapped.__name__
41 def __get__(self, inst, owner, _sentinel=sentinel):
42 if inst is None:
43 return self
44 val = inst._cache.get(self.name, _sentinel)
45 if val is not _sentinel:
46 return val
47 val = self.wrapped(inst)
48 inst._cache[self.name] = val
49 return val
51 def __set__(self, inst, value):
52 raise AttributeError("cached property is read-only")
55def _normalize_path_segments(segments):
56 """Drop '.' and '..' from a sequence of str segments"""
58 resolved_path = []
60 for seg in segments:
61 if seg == "..":
62 # ignore any .. segments that would otherwise cause an
63 # IndexError when popped from resolved_path if
64 # resolving for rfc3986
65 with suppress(IndexError):
66 resolved_path.pop()
67 elif seg != ".":
68 resolved_path.append(seg)
70 if segments and segments[-1] in (".", ".."):
71 # do some post-processing here.
72 # if the last segment was a relative dir,
73 # then we need to append the trailing '/'
74 resolved_path.append("")
76 return resolved_path
79@rewrite_module
80class URL:
81 # Don't derive from str
82 # follow pathlib.Path design
83 # probably URL will not suffer from pathlib problems:
84 # it's intended for libraries like aiohttp,
85 # not to be passed into standard library functions like os.open etc.
87 # URL grammar (RFC 3986)
88 # pct-encoded = "%" HEXDIG HEXDIG
89 # reserved = gen-delims / sub-delims
90 # gen-delims = ":" / "/" / "?" / "#" / "[" / "]" / "@"
91 # sub-delims = "!" / "$" / "&" / "'" / "(" / ")"
92 # / "*" / "+" / "," / ";" / "="
93 # unreserved = ALPHA / DIGIT / "-" / "." / "_" / "~"
94 # URI = scheme ":" hier-part [ "?" query ] [ "#" fragment ]
95 # hier-part = "//" authority path-abempty
96 # / path-absolute
97 # / path-rootless
98 # / path-empty
99 # scheme = ALPHA *( ALPHA / DIGIT / "+" / "-" / "." )
100 # authority = [ userinfo "@" ] host [ ":" port ]
101 # userinfo = *( unreserved / pct-encoded / sub-delims / ":" )
102 # host = IP-literal / IPv4address / reg-name
103 # IP-literal = "[" ( IPv6address / IPvFuture ) "]"
104 # IPvFuture = "v" 1*HEXDIG "." 1*( unreserved / sub-delims / ":" )
105 # IPv6address = 6( h16 ":" ) ls32
106 # / "::" 5( h16 ":" ) ls32
107 # / [ h16 ] "::" 4( h16 ":" ) ls32
108 # / [ *1( h16 ":" ) h16 ] "::" 3( h16 ":" ) ls32
109 # / [ *2( h16 ":" ) h16 ] "::" 2( h16 ":" ) ls32
110 # / [ *3( h16 ":" ) h16 ] "::" h16 ":" ls32
111 # / [ *4( h16 ":" ) h16 ] "::" ls32
112 # / [ *5( h16 ":" ) h16 ] "::" h16
113 # / [ *6( h16 ":" ) h16 ] "::"
114 # ls32 = ( h16 ":" h16 ) / IPv4address
115 # ; least-significant 32 bits of address
116 # h16 = 1*4HEXDIG
117 # ; 16 bits of address represented in hexadecimal
118 # IPv4address = dec-octet "." dec-octet "." dec-octet "." dec-octet
119 # dec-octet = DIGIT ; 0-9
120 # / %x31-39 DIGIT ; 10-99
121 # / "1" 2DIGIT ; 100-199
122 # / "2" %x30-34 DIGIT ; 200-249
123 # / "25" %x30-35 ; 250-255
124 # reg-name = *( unreserved / pct-encoded / sub-delims )
125 # port = *DIGIT
126 # path = path-abempty ; begins with "/" or is empty
127 # / path-absolute ; begins with "/" but not "//"
128 # / path-noscheme ; begins with a non-colon segment
129 # / path-rootless ; begins with a segment
130 # / path-empty ; zero characters
131 # path-abempty = *( "/" segment )
132 # path-absolute = "/" [ segment-nz *( "/" segment ) ]
133 # path-noscheme = segment-nz-nc *( "/" segment )
134 # path-rootless = segment-nz *( "/" segment )
135 # path-empty = 0<pchar>
136 # segment = *pchar
137 # segment-nz = 1*pchar
138 # segment-nz-nc = 1*( unreserved / pct-encoded / sub-delims / "@" )
139 # ; non-zero-length segment without any colon ":"
140 # pchar = unreserved / pct-encoded / sub-delims / ":" / "@"
141 # query = *( pchar / "/" / "?" )
142 # fragment = *( pchar / "/" / "?" )
143 # URI-reference = URI / relative-ref
144 # relative-ref = relative-part [ "?" query ] [ "#" fragment ]
145 # relative-part = "//" authority path-abempty
146 # / path-absolute
147 # / path-noscheme
148 # / path-empty
149 # absolute-URI = scheme ":" hier-part [ "?" query ]
150 __slots__ = ("_cache", "_val")
152 _QUOTER = _Quoter(requote=False)
153 _REQUOTER = _Quoter()
154 _PATH_QUOTER = _Quoter(safe="@:", protected="/+", requote=False)
155 _PATH_REQUOTER = _Quoter(safe="@:", protected="/+")
156 _QUERY_QUOTER = _Quoter(safe="?/:@", protected="=+&;", qs=True, requote=False)
157 _QUERY_REQUOTER = _Quoter(safe="?/:@", protected="=+&;", qs=True)
158 _QUERY_PART_QUOTER = _Quoter(safe="?/:@", qs=True, requote=False)
159 _FRAGMENT_QUOTER = _Quoter(safe="?/:@", requote=False)
160 _FRAGMENT_REQUOTER = _Quoter(safe="?/:@")
162 _UNQUOTER = _Unquoter()
163 _PATH_UNQUOTER = _Unquoter(unsafe="+")
164 _QS_UNQUOTER = _Unquoter(qs=True)
166 def __new__(cls, val="", *, encoded=False, strict=None):
167 if strict is not None: # pragma: no cover
168 warnings.warn("strict parameter is ignored")
169 if type(val) is cls:
170 return val
171 if type(val) is str:
172 val = urlsplit(val)
173 elif type(val) is SplitResult:
174 if not encoded:
175 raise ValueError("Cannot apply decoding to SplitResult")
176 elif isinstance(val, str):
177 val = urlsplit(str(val))
178 else:
179 raise TypeError("Constructor parameter should be str")
181 if not encoded:
182 if not val[1]: # netloc
183 netloc = ""
184 host = ""
185 else:
186 host = val.hostname
187 if host is None:
188 raise ValueError("Invalid URL: host is required for absolute urls")
190 try:
191 port = val.port
192 except ValueError as e:
193 raise ValueError(
194 "Invalid URL: port can't be converted to integer"
195 ) from e
197 netloc = cls._make_netloc(
198 val.username, val.password, host, port, encode=True, requote=True
199 )
200 path = cls._PATH_REQUOTER(val[2])
201 if netloc:
202 path = cls._normalize_path(path)
204 cls._validate_authority_uri_abs_path(host=host, path=path)
205 query = cls._QUERY_REQUOTER(val[3])
206 fragment = cls._FRAGMENT_REQUOTER(val[4])
207 val = SplitResult(val[0], netloc, path, query, fragment)
209 self = object.__new__(cls)
210 self._val = val
211 self._cache = {}
212 return self
214 @classmethod
215 def build(
216 cls,
217 *,
218 scheme="",
219 authority="",
220 user=None,
221 password=None,
222 host="",
223 port=None,
224 path="",
225 query=None,
226 query_string="",
227 fragment="",
228 encoded=False,
229 ):
230 """Creates and returns a new URL"""
232 if authority and (user or password or host or port):
233 raise ValueError(
234 'Can\'t mix "authority" with "user", "password", "host" or "port".'
235 )
236 if not isinstance(port, (int, type(None))):
237 raise TypeError("The port is required to be int.")
238 if port and not host:
239 raise ValueError('Can\'t build URL with "port" but without "host".')
240 if query and query_string:
241 raise ValueError('Only one of "query" or "query_string" should be passed')
242 if (
243 scheme is None
244 or authority is None
245 or host is None
246 or path is None
247 or query_string is None
248 or fragment is None
249 ):
250 raise TypeError(
251 'NoneType is illegal for "scheme", "authority", "host", "path", '
252 '"query_string", and "fragment" args, use empty string instead.'
253 )
255 if authority:
256 if encoded:
257 netloc = authority
258 else:
259 tmp = SplitResult("", authority, "", "", "")
260 netloc = cls._make_netloc(
261 tmp.username, tmp.password, tmp.hostname, tmp.port, encode=True
262 )
263 elif not user and not password and not host and not port:
264 netloc = ""
265 else:
266 netloc = cls._make_netloc(
267 user, password, host, port, encode=not encoded, encode_host=not encoded
268 )
269 if not encoded:
270 path = cls._PATH_QUOTER(path)
271 if netloc:
272 path = cls._normalize_path(path)
274 cls._validate_authority_uri_abs_path(host=host, path=path)
275 query_string = cls._QUERY_QUOTER(query_string)
276 fragment = cls._FRAGMENT_QUOTER(fragment)
278 url = cls(
279 SplitResult(scheme, netloc, path, query_string, fragment), encoded=True
280 )
282 if query:
283 return url.with_query(query)
284 else:
285 return url
287 def __init_subclass__(cls):
288 raise TypeError(f"Inheriting a class {cls!r} from URL is forbidden")
290 def __str__(self):
291 val = self._val
292 if not val.path and self.is_absolute() and (val.query or val.fragment):
293 val = val._replace(path="/")
294 return urlunsplit(val)
296 def __repr__(self):
297 return f"{self.__class__.__name__}('{str(self)}')"
299 def __bytes__(self):
300 return str(self).encode("ascii")
302 def __eq__(self, other):
303 if not type(other) is URL:
304 return NotImplemented
306 val1 = self._val
307 if not val1.path and self.is_absolute():
308 val1 = val1._replace(path="/")
310 val2 = other._val
311 if not val2.path and other.is_absolute():
312 val2 = val2._replace(path="/")
314 return val1 == val2
316 def __hash__(self):
317 ret = self._cache.get("hash")
318 if ret is None:
319 val = self._val
320 if not val.path and self.is_absolute():
321 val = val._replace(path="/")
322 ret = self._cache["hash"] = hash(val)
323 return ret
325 def __le__(self, other):
326 if not type(other) is URL:
327 return NotImplemented
328 return self._val <= other._val
330 def __lt__(self, other):
331 if not type(other) is URL:
332 return NotImplemented
333 return self._val < other._val
335 def __ge__(self, other):
336 if not type(other) is URL:
337 return NotImplemented
338 return self._val >= other._val
340 def __gt__(self, other):
341 if not type(other) is URL:
342 return NotImplemented
343 return self._val > other._val
345 def __truediv__(self, name):
346 if not isinstance(name, str):
347 return NotImplemented
348 return self._make_child((str(name),))
350 def __mod__(self, query):
351 return self.update_query(query)
353 def __bool__(self) -> bool:
354 return bool(
355 self._val.netloc or self._val.path or self._val.query or self._val.fragment
356 )
358 def __getstate__(self):
359 return (self._val,)
361 def __setstate__(self, state):
362 if state[0] is None and isinstance(state[1], dict):
363 # default style pickle
364 self._val = state[1]["_val"]
365 else:
366 self._val, *unused = state
367 self._cache = {}
369 def is_absolute(self):
370 """A check for absolute URLs.
372 Return True for absolute ones (having scheme or starting
373 with //), False otherwise.
375 """
376 return self.raw_host is not None
378 def is_default_port(self):
379 """A check for default port.
381 Return True if port is default for specified scheme,
382 e.g. 'http://python.org' or 'http://python.org:80', False
383 otherwise.
385 """
386 if self.port is None:
387 return False
388 default = DEFAULT_PORTS.get(self.scheme)
389 if default is None:
390 return False
391 return self.port == default
393 def origin(self):
394 """Return an URL with scheme, host and port parts only.
396 user, password, path, query and fragment are removed.
398 """
399 # TODO: add a keyword-only option for keeping user/pass maybe?
400 if not self.is_absolute():
401 raise ValueError("URL should be absolute")
402 if not self._val.scheme:
403 raise ValueError("URL should have scheme")
404 v = self._val
405 netloc = self._make_netloc(None, None, v.hostname, v.port)
406 val = v._replace(netloc=netloc, path="", query="", fragment="")
407 return URL(val, encoded=True)
409 def relative(self):
410 """Return a relative part of the URL.
412 scheme, user, password, host and port are removed.
414 """
415 if not self.is_absolute():
416 raise ValueError("URL should be absolute")
417 val = self._val._replace(scheme="", netloc="")
418 return URL(val, encoded=True)
420 @property
421 def scheme(self):
422 """Scheme for absolute URLs.
424 Empty string for relative URLs or URLs starting with //
426 """
427 return self._val.scheme
429 @property
430 def raw_authority(self):
431 """Encoded authority part of URL.
433 Empty string for relative URLs.
435 """
436 return self._val.netloc
438 @cached_property
439 def authority(self):
440 """Decoded authority part of URL.
442 Empty string for relative URLs.
444 """
445 return self._make_netloc(
446 self.user, self.password, self.host, self.port, encode_host=False
447 )
449 @property
450 def raw_user(self):
451 """Encoded user part of URL.
453 None if user is missing.
455 """
456 # not .username
457 ret = self._val.username
458 if not ret:
459 return None
460 return ret
462 @cached_property
463 def user(self):
464 """Decoded user part of URL.
466 None if user is missing.
468 """
469 return self._UNQUOTER(self.raw_user)
471 @property
472 def raw_password(self):
473 """Encoded password part of URL.
475 None if password is missing.
477 """
478 return self._val.password
480 @cached_property
481 def password(self):
482 """Decoded password part of URL.
484 None if password is missing.
486 """
487 return self._UNQUOTER(self.raw_password)
489 @property
490 def raw_host(self):
491 """Encoded host part of URL.
493 None for relative URLs.
495 """
496 # Use host instead of hostname for sake of shortness
497 # May add .hostname prop later
498 return self._val.hostname
500 @cached_property
501 def host(self):
502 """Decoded host part of URL.
504 None for relative URLs.
506 """
507 raw = self.raw_host
508 if raw is None:
509 return None
510 if "%" in raw:
511 # Hack for scoped IPv6 addresses like
512 # fe80::2%Перевірка
513 # presence of '%' sign means only IPv6 address, so idna is useless.
514 return raw
515 return _idna_decode(raw)
517 @property
518 def port(self):
519 """Port part of URL, with scheme-based fallback.
521 None for relative URLs or URLs without explicit port and
522 scheme without default port substitution.
524 """
525 return self._val.port or DEFAULT_PORTS.get(self._val.scheme)
527 @property
528 def explicit_port(self):
529 """Port part of URL, without scheme-based fallback.
531 None for relative URLs or URLs without explicit port.
533 """
534 return self._val.port
536 @property
537 def raw_path(self):
538 """Encoded path of URL.
540 / for absolute URLs without path part.
542 """
543 ret = self._val.path
544 if not ret and self.is_absolute():
545 ret = "/"
546 return ret
548 @cached_property
549 def path(self):
550 """Decoded path of URL.
552 / for absolute URLs without path part.
554 """
555 return self._PATH_UNQUOTER(self.raw_path)
557 @cached_property
558 def query(self):
559 """A MultiDictProxy representing parsed query parameters in decoded
560 representation.
562 Empty value if URL has no query part.
564 """
565 ret = MultiDict(parse_qsl(self.raw_query_string, keep_blank_values=True))
566 return MultiDictProxy(ret)
568 @property
569 def raw_query_string(self):
570 """Encoded query part of URL.
572 Empty string if query is missing.
574 """
575 return self._val.query
577 @cached_property
578 def query_string(self):
579 """Decoded query part of URL.
581 Empty string if query is missing.
583 """
584 return self._QS_UNQUOTER(self.raw_query_string)
586 @cached_property
587 def path_qs(self):
588 """Decoded path of URL with query."""
589 if not self.query_string:
590 return self.path
591 return f"{self.path}?{self.query_string}"
593 @cached_property
594 def raw_path_qs(self):
595 """Encoded path of URL with query."""
596 if not self.raw_query_string:
597 return self.raw_path
598 return f"{self.raw_path}?{self.raw_query_string}"
600 @property
601 def raw_fragment(self):
602 """Encoded fragment part of URL.
604 Empty string if fragment is missing.
606 """
607 return self._val.fragment
609 @cached_property
610 def fragment(self):
611 """Decoded fragment part of URL.
613 Empty string if fragment is missing.
615 """
616 return self._UNQUOTER(self.raw_fragment)
618 @cached_property
619 def raw_parts(self):
620 """A tuple containing encoded *path* parts.
622 ('/',) for absolute URLs if *path* is missing.
624 """
625 path = self._val.path
626 if self.is_absolute():
627 if not path:
628 parts = ["/"]
629 else:
630 parts = ["/"] + path[1:].split("/")
631 else:
632 if path.startswith("/"):
633 parts = ["/"] + path[1:].split("/")
634 else:
635 parts = path.split("/")
636 return tuple(parts)
638 @cached_property
639 def parts(self):
640 """A tuple containing decoded *path* parts.
642 ('/',) for absolute URLs if *path* is missing.
644 """
645 return tuple(self._UNQUOTER(part) for part in self.raw_parts)
647 @cached_property
648 def parent(self):
649 """A new URL with last part of path removed and cleaned up query and
650 fragment.
652 """
653 path = self.raw_path
654 if not path or path == "/":
655 if self.raw_fragment or self.raw_query_string:
656 return URL(self._val._replace(query="", fragment=""), encoded=True)
657 return self
658 parts = path.split("/")
659 val = self._val._replace(path="/".join(parts[:-1]), query="", fragment="")
660 return URL(val, encoded=True)
662 @cached_property
663 def raw_name(self):
664 """The last part of raw_parts."""
665 parts = self.raw_parts
666 if self.is_absolute():
667 parts = parts[1:]
668 if not parts:
669 return ""
670 else:
671 return parts[-1]
672 else:
673 return parts[-1]
675 @cached_property
676 def name(self):
677 """The last part of parts."""
678 return self._UNQUOTER(self.raw_name)
680 @cached_property
681 def raw_suffix(self):
682 name = self.raw_name
683 i = name.rfind(".")
684 if 0 < i < len(name) - 1:
685 return name[i:]
686 else:
687 return ""
689 @cached_property
690 def suffix(self):
691 return self._UNQUOTER(self.raw_suffix)
693 @cached_property
694 def raw_suffixes(self):
695 name = self.raw_name
696 if name.endswith("."):
697 return ()
698 name = name.lstrip(".")
699 return tuple("." + suffix for suffix in name.split(".")[1:])
701 @cached_property
702 def suffixes(self):
703 return tuple(self._UNQUOTER(suffix) for suffix in self.raw_suffixes)
705 @staticmethod
706 def _validate_authority_uri_abs_path(host, path):
707 """Ensure that path in URL with authority starts with a leading slash.
709 Raise ValueError if not.
710 """
711 if len(host) > 0 and len(path) > 0 and not path.startswith("/"):
712 raise ValueError(
713 "Path in a URL with authority should start with a slash ('/') if set"
714 )
716 def _make_child(self, segments, encoded=False):
717 """add segments to self._val.path, accounting for absolute vs relative paths"""
718 # keep the trailing slash if the last segment ends with /
719 parsed = [""] if segments and segments[-1][-1:] == "/" else []
720 for seg in reversed(segments):
721 if not seg:
722 continue
723 if seg[0] == "/":
724 raise ValueError(
725 f"Appending path {seg!r} starting from slash is forbidden"
726 )
727 seg = seg if encoded else self._PATH_QUOTER(seg)
728 if "/" in seg:
729 parsed += (
730 sub for sub in reversed(seg.split("/")) if sub and sub != "."
731 )
732 elif seg != ".":
733 parsed.append(seg)
734 parsed.reverse()
735 old_path = self._val.path
736 if old_path:
737 parsed = [*old_path.rstrip("/").split("/"), *parsed]
738 if self.is_absolute():
739 parsed = _normalize_path_segments(parsed)
740 if parsed and parsed[0] != "":
741 # inject a leading slash when adding a path to an absolute URL
742 # where there was none before
743 parsed = ["", *parsed]
744 new_path = "/".join(parsed)
745 return URL(
746 self._val._replace(path=new_path, query="", fragment=""), encoded=True
747 )
749 @classmethod
750 def _normalize_path(cls, path):
751 # Drop '.' and '..' from str path
753 prefix = ""
754 if path.startswith("/"):
755 # preserve the "/" root element of absolute paths, copying it to the
756 # normalised output as per sections 5.2.4 and 6.2.2.3 of rfc3986.
757 prefix = "/"
758 path = path[1:]
760 segments = path.split("/")
761 return prefix + "/".join(_normalize_path_segments(segments))
763 @classmethod
764 def _encode_host(cls, host, human=False):
765 try:
766 ip, sep, zone = host.partition("%")
767 ip = ip_address(ip)
768 except ValueError:
769 host = host.lower()
770 # IDNA encoding is slow,
771 # skip it for ASCII-only strings
772 # Don't move the check into _idna_encode() helper
773 # to reduce the cache size
774 if human or host.isascii():
775 return host
776 host = _idna_encode(host)
777 else:
778 host = ip.compressed
779 if sep:
780 host += "%" + zone
781 if ip.version == 6:
782 host = "[" + host + "]"
783 return host
785 @classmethod
786 def _make_netloc(
787 cls, user, password, host, port, encode=False, encode_host=True, requote=False
788 ):
789 quoter = cls._REQUOTER if requote else cls._QUOTER
790 if encode_host:
791 ret = cls._encode_host(host)
792 else:
793 ret = host
794 if port is not None:
795 ret = ret + ":" + str(port)
796 if password is not None:
797 if not user:
798 user = ""
799 else:
800 if encode:
801 user = quoter(user)
802 if encode:
803 password = quoter(password)
804 user = user + ":" + password
805 elif user and encode:
806 user = quoter(user)
807 if user:
808 ret = user + "@" + ret
809 return ret
811 def with_scheme(self, scheme):
812 """Return a new URL with scheme replaced."""
813 # N.B. doesn't cleanup query/fragment
814 if not isinstance(scheme, str):
815 raise TypeError("Invalid scheme type")
816 if not self.is_absolute():
817 raise ValueError("scheme replacement is not allowed for relative URLs")
818 return URL(self._val._replace(scheme=scheme.lower()), encoded=True)
820 def with_user(self, user):
821 """Return a new URL with user replaced.
823 Autoencode user if needed.
825 Clear user/password if user is None.
827 """
828 # N.B. doesn't cleanup query/fragment
829 val = self._val
830 if user is None:
831 password = None
832 elif isinstance(user, str):
833 user = self._QUOTER(user)
834 password = val.password
835 else:
836 raise TypeError("Invalid user type")
837 if not self.is_absolute():
838 raise ValueError("user replacement is not allowed for relative URLs")
839 return URL(
840 self._val._replace(
841 netloc=self._make_netloc(user, password, val.hostname, val.port)
842 ),
843 encoded=True,
844 )
846 def with_password(self, password):
847 """Return a new URL with password replaced.
849 Autoencode password if needed.
851 Clear password if argument is None.
853 """
854 # N.B. doesn't cleanup query/fragment
855 if password is None:
856 pass
857 elif isinstance(password, str):
858 password = self._QUOTER(password)
859 else:
860 raise TypeError("Invalid password type")
861 if not self.is_absolute():
862 raise ValueError("password replacement is not allowed for relative URLs")
863 val = self._val
864 return URL(
865 self._val._replace(
866 netloc=self._make_netloc(val.username, password, val.hostname, val.port)
867 ),
868 encoded=True,
869 )
871 def with_host(self, host):
872 """Return a new URL with host replaced.
874 Autoencode host if needed.
876 Changing host for relative URLs is not allowed, use .join()
877 instead.
879 """
880 # N.B. doesn't cleanup query/fragment
881 if not isinstance(host, str):
882 raise TypeError("Invalid host type")
883 if not self.is_absolute():
884 raise ValueError("host replacement is not allowed for relative URLs")
885 if not host:
886 raise ValueError("host removing is not allowed")
887 val = self._val
888 return URL(
889 self._val._replace(
890 netloc=self._make_netloc(val.username, val.password, host, val.port)
891 ),
892 encoded=True,
893 )
895 def with_port(self, port):
896 """Return a new URL with port replaced.
898 Clear port to default if None is passed.
900 """
901 # N.B. doesn't cleanup query/fragment
902 if port is not None:
903 if isinstance(port, bool) or not isinstance(port, int):
904 raise TypeError(f"port should be int or None, got {type(port)}")
905 if port < 0 or port > 65535:
906 raise ValueError(f"port must be between 0 and 65535, got {port}")
907 if not self.is_absolute():
908 raise ValueError("port replacement is not allowed for relative URLs")
909 val = self._val
910 return URL(
911 self._val._replace(
912 netloc=self._make_netloc(val.username, val.password, val.hostname, port)
913 ),
914 encoded=True,
915 )
917 def with_path(self, path, *, encoded=False):
918 """Return a new URL with path replaced."""
919 if not encoded:
920 path = self._PATH_QUOTER(path)
921 if self.is_absolute():
922 path = self._normalize_path(path)
923 if len(path) > 0 and path[0] != "/":
924 path = "/" + path
925 return URL(self._val._replace(path=path, query="", fragment=""), encoded=True)
927 @classmethod
928 def _query_seq_pairs(cls, quoter, pairs):
929 for key, val in pairs:
930 if isinstance(val, (list, tuple)):
931 for v in val:
932 yield quoter(key) + "=" + quoter(cls._query_var(v))
933 else:
934 yield quoter(key) + "=" + quoter(cls._query_var(val))
936 @staticmethod
937 def _query_var(v):
938 cls = type(v)
939 if issubclass(cls, str):
940 return v
941 if issubclass(cls, float):
942 if math.isinf(v):
943 raise ValueError("float('inf') is not supported")
944 if math.isnan(v):
945 raise ValueError("float('nan') is not supported")
946 return str(float(v))
947 if issubclass(cls, int) and cls is not bool:
948 return str(int(v))
949 raise TypeError(
950 "Invalid variable type: value "
951 "should be str, int or float, got {!r} "
952 "of type {}".format(v, cls)
953 )
955 def _get_str_query(self, *args, **kwargs):
956 if kwargs:
957 if len(args) > 0:
958 raise ValueError(
959 "Either kwargs or single query parameter must be present"
960 )
961 query = kwargs
962 elif len(args) == 1:
963 query = args[0]
964 else:
965 raise ValueError("Either kwargs or single query parameter must be present")
967 if query is None:
968 query = None
969 elif isinstance(query, Mapping):
970 quoter = self._QUERY_PART_QUOTER
971 query = "&".join(self._query_seq_pairs(quoter, query.items()))
972 elif isinstance(query, str):
973 query = self._QUERY_QUOTER(query)
974 elif isinstance(query, (bytes, bytearray, memoryview)):
975 raise TypeError(
976 "Invalid query type: bytes, bytearray and memoryview are forbidden"
977 )
978 elif isinstance(query, Sequence):
979 quoter = self._QUERY_PART_QUOTER
980 # We don't expect sequence values if we're given a list of pairs
981 # already; only mappings like builtin `dict` which can't have the
982 # same key pointing to multiple values are allowed to use
983 # `_query_seq_pairs`.
984 query = "&".join(
985 quoter(k) + "=" + quoter(self._query_var(v)) for k, v in query
986 )
987 else:
988 raise TypeError(
989 "Invalid query type: only str, mapping or "
990 "sequence of (key, value) pairs is allowed"
991 )
993 return query
995 def with_query(self, *args, **kwargs):
996 """Return a new URL with query part replaced.
998 Accepts any Mapping (e.g. dict, multidict.MultiDict instances)
999 or str, autoencode the argument if needed.
1001 A sequence of (key, value) pairs is supported as well.
1003 It also can take an arbitrary number of keyword arguments.
1005 Clear query if None is passed.
1007 """
1008 # N.B. doesn't cleanup query/fragment
1010 new_query = self._get_str_query(*args, **kwargs) or ""
1011 return URL(
1012 self._val._replace(path=self._val.path, query=new_query), encoded=True
1013 )
1015 def update_query(self, *args, **kwargs):
1016 """Return a new URL with query part updated."""
1017 s = self._get_str_query(*args, **kwargs)
1018 query = None
1019 if s is not None:
1020 new_query = MultiDict(parse_qsl(s, keep_blank_values=True))
1021 query = MultiDict(self.query)
1022 query.update(new_query)
1024 return URL(
1025 self._val._replace(query=self._get_str_query(query) or ""), encoded=True
1026 )
1028 def with_fragment(self, fragment):
1029 """Return a new URL with fragment replaced.
1031 Autoencode fragment if needed.
1033 Clear fragment to default if None is passed.
1035 """
1036 # N.B. doesn't cleanup query/fragment
1037 if fragment is None:
1038 raw_fragment = ""
1039 elif not isinstance(fragment, str):
1040 raise TypeError("Invalid fragment type")
1041 else:
1042 raw_fragment = self._FRAGMENT_QUOTER(fragment)
1043 if self.raw_fragment == raw_fragment:
1044 return self
1045 return URL(self._val._replace(fragment=raw_fragment), encoded=True)
1047 def with_name(self, name):
1048 """Return a new URL with name (last part of path) replaced.
1050 Query and fragment parts are cleaned up.
1052 Name is encoded if needed.
1054 """
1055 # N.B. DOES cleanup query/fragment
1056 if not isinstance(name, str):
1057 raise TypeError("Invalid name type")
1058 if "/" in name:
1059 raise ValueError("Slash in name is not allowed")
1060 name = self._PATH_QUOTER(name)
1061 if name in (".", ".."):
1062 raise ValueError(". and .. values are forbidden")
1063 parts = list(self.raw_parts)
1064 if self.is_absolute():
1065 if len(parts) == 1:
1066 parts.append(name)
1067 else:
1068 parts[-1] = name
1069 parts[0] = "" # replace leading '/'
1070 else:
1071 parts[-1] = name
1072 if parts[0] == "/":
1073 parts[0] = "" # replace leading '/'
1074 return URL(
1075 self._val._replace(path="/".join(parts), query="", fragment=""),
1076 encoded=True,
1077 )
1079 def with_suffix(self, suffix):
1080 """Return a new URL with suffix (file extension of name) replaced.
1082 Query and fragment parts are cleaned up.
1084 suffix is encoded if needed.
1085 """
1086 if not isinstance(suffix, str):
1087 raise TypeError("Invalid suffix type")
1088 if suffix and not suffix.startswith(".") or suffix == ".":
1089 raise ValueError(f"Invalid suffix {suffix!r}")
1090 name = self.raw_name
1091 if not name:
1092 raise ValueError(f"{self!r} has an empty name")
1093 old_suffix = self.raw_suffix
1094 if not old_suffix:
1095 name = name + suffix
1096 else:
1097 name = name[: -len(old_suffix)] + suffix
1098 return self.with_name(name)
1100 def join(self, url):
1101 """Join URLs
1103 Construct a full (“absolute”) URL by combining a “base URL”
1104 (self) with another URL (url).
1106 Informally, this uses components of the base URL, in
1107 particular the addressing scheme, the network location and
1108 (part of) the path, to provide missing components in the
1109 relative URL.
1111 """
1112 # See docs for urllib.parse.urljoin
1113 if not isinstance(url, URL):
1114 raise TypeError("url should be URL")
1115 return URL(urljoin(str(self), str(url)), encoded=True)
1117 def joinpath(self, *other, encoded=False):
1118 """Return a new URL with the elements in other appended to the path."""
1119 return self._make_child(other, encoded=encoded)
1121 def human_repr(self):
1122 """Return decoded human readable string for URL representation."""
1123 user = _human_quote(self.user, "#/:?@[]")
1124 password = _human_quote(self.password, "#/:?@[]")
1125 host = self.host
1126 if host:
1127 host = self._encode_host(self.host, human=True)
1128 path = _human_quote(self.path, "#?")
1129 query_string = "&".join(
1130 "{}={}".format(_human_quote(k, "#&+;="), _human_quote(v, "#&+;="))
1131 for k, v in self.query.items()
1132 )
1133 fragment = _human_quote(self.fragment, "")
1134 return urlunsplit(
1135 SplitResult(
1136 self.scheme,
1137 self._make_netloc(
1138 user,
1139 password,
1140 host,
1141 self._val.port,
1142 encode_host=False,
1143 ),
1144 path,
1145 query_string,
1146 fragment,
1147 )
1148 )
1151def _human_quote(s, unsafe):
1152 if not s:
1153 return s
1154 for c in "%" + unsafe:
1155 if c in s:
1156 s = s.replace(c, f"%{ord(c):02X}")
1157 if s.isprintable():
1158 return s
1159 return "".join(c if c.isprintable() else quote(c) for c in s)
1162_MAXCACHE = 256
1165@functools.lru_cache(_MAXCACHE)
1166def _idna_decode(raw):
1167 try:
1168 return idna.decode(raw.encode("ascii"))
1169 except UnicodeError: # e.g. '::1'
1170 return raw.encode("ascii").decode("idna")
1173@functools.lru_cache(_MAXCACHE)
1174def _idna_encode(host):
1175 try:
1176 return idna.encode(host, uts46=True).decode("ascii")
1177 except UnicodeError:
1178 return host.encode("idna").decode("ascii")
1181@rewrite_module
1182def cache_clear():
1183 _idna_decode.cache_clear()
1184 _idna_encode.cache_clear()
1187@rewrite_module
1188def cache_info():
1189 return {
1190 "idna_encode": _idna_encode.cache_info(),
1191 "idna_decode": _idna_decode.cache_info(),
1192 }
1195@rewrite_module
1196def cache_configure(*, idna_encode_size=_MAXCACHE, idna_decode_size=_MAXCACHE):
1197 global _idna_decode, _idna_encode
1199 _idna_encode = functools.lru_cache(idna_encode_size)(_idna_encode.__wrapped__)
1200 _idna_decode = functools.lru_cache(idna_decode_size)(_idna_decode.__wrapped__)