Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/yarl/_url.py: 45%
604 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:52 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:52 +0000
1import functools
2import math
3import warnings
4from collections.abc import Mapping, Sequence
5from contextlib import suppress
6from ipaddress import ip_address
7from urllib.parse import SplitResult, parse_qsl, quote, urljoin, urlsplit, urlunsplit
9import idna
10from multidict import MultiDict, MultiDictProxy
12from ._quoting import _Quoter, _Unquoter
14DEFAULT_PORTS = {"http": 80, "https": 443, "ws": 80, "wss": 443}
16sentinel = object()
19def rewrite_module(obj: object) -> object:
20 obj.__module__ = "yarl"
21 return obj
24class cached_property:
25 """Use as a class method decorator. It operates almost exactly like
26 the Python `@property` decorator, but it puts the result of the
27 method it decorates into the instance dict after the first call,
28 effectively replacing the function it decorates with an instance
29 variable. It is, in Python parlance, a data descriptor.
31 """
33 def __init__(self, wrapped):
34 self.wrapped = wrapped
35 try:
36 self.__doc__ = wrapped.__doc__
37 except AttributeError: # pragma: no cover
38 self.__doc__ = ""
39 self.name = wrapped.__name__
41 def __get__(self, inst, owner, _sentinel=sentinel):
42 if inst is None:
43 return self
44 val = inst._cache.get(self.name, _sentinel)
45 if val is not _sentinel:
46 return val
47 val = self.wrapped(inst)
48 inst._cache[self.name] = val
49 return val
51 def __set__(self, inst, value):
52 raise AttributeError("cached property is read-only")
55def _normalize_path_segments(segments):
56 """Drop '.' and '..' from a sequence of str segments"""
58 resolved_path = []
60 for seg in segments:
61 if seg == "..":
62 # ignore any .. segments that would otherwise cause an
63 # IndexError when popped from resolved_path if
64 # resolving for rfc3986
65 with suppress(IndexError):
66 resolved_path.pop()
67 elif seg != ".":
68 resolved_path.append(seg)
70 if segments and segments[-1] in (".", ".."):
71 # do some post-processing here.
72 # if the last segment was a relative dir,
73 # then we need to append the trailing '/'
74 resolved_path.append("")
76 return resolved_path
79@rewrite_module
80class URL:
81 # Don't derive from str
82 # follow pathlib.Path design
83 # probably URL will not suffer from pathlib problems:
84 # it's intended for libraries like aiohttp,
85 # not to be passed into standard library functions like os.open etc.
87 # URL grammar (RFC 3986)
88 # pct-encoded = "%" HEXDIG HEXDIG
89 # reserved = gen-delims / sub-delims
90 # gen-delims = ":" / "/" / "?" / "#" / "[" / "]" / "@"
91 # sub-delims = "!" / "$" / "&" / "'" / "(" / ")"
92 # / "*" / "+" / "," / ";" / "="
93 # unreserved = ALPHA / DIGIT / "-" / "." / "_" / "~"
94 # URI = scheme ":" hier-part [ "?" query ] [ "#" fragment ]
95 # hier-part = "//" authority path-abempty
96 # / path-absolute
97 # / path-rootless
98 # / path-empty
99 # scheme = ALPHA *( ALPHA / DIGIT / "+" / "-" / "." )
100 # authority = [ userinfo "@" ] host [ ":" port ]
101 # userinfo = *( unreserved / pct-encoded / sub-delims / ":" )
102 # host = IP-literal / IPv4address / reg-name
103 # IP-literal = "[" ( IPv6address / IPvFuture ) "]"
104 # IPvFuture = "v" 1*HEXDIG "." 1*( unreserved / sub-delims / ":" )
105 # IPv6address = 6( h16 ":" ) ls32
106 # / "::" 5( h16 ":" ) ls32
107 # / [ h16 ] "::" 4( h16 ":" ) ls32
108 # / [ *1( h16 ":" ) h16 ] "::" 3( h16 ":" ) ls32
109 # / [ *2( h16 ":" ) h16 ] "::" 2( h16 ":" ) ls32
110 # / [ *3( h16 ":" ) h16 ] "::" h16 ":" ls32
111 # / [ *4( h16 ":" ) h16 ] "::" ls32
112 # / [ *5( h16 ":" ) h16 ] "::" h16
113 # / [ *6( h16 ":" ) h16 ] "::"
114 # ls32 = ( h16 ":" h16 ) / IPv4address
115 # ; least-significant 32 bits of address
116 # h16 = 1*4HEXDIG
117 # ; 16 bits of address represented in hexadecimal
118 # IPv4address = dec-octet "." dec-octet "." dec-octet "." dec-octet
119 # dec-octet = DIGIT ; 0-9
120 # / %x31-39 DIGIT ; 10-99
121 # / "1" 2DIGIT ; 100-199
122 # / "2" %x30-34 DIGIT ; 200-249
123 # / "25" %x30-35 ; 250-255
124 # reg-name = *( unreserved / pct-encoded / sub-delims )
125 # port = *DIGIT
126 # path = path-abempty ; begins with "/" or is empty
127 # / path-absolute ; begins with "/" but not "//"
128 # / path-noscheme ; begins with a non-colon segment
129 # / path-rootless ; begins with a segment
130 # / path-empty ; zero characters
131 # path-abempty = *( "/" segment )
132 # path-absolute = "/" [ segment-nz *( "/" segment ) ]
133 # path-noscheme = segment-nz-nc *( "/" segment )
134 # path-rootless = segment-nz *( "/" segment )
135 # path-empty = 0<pchar>
136 # segment = *pchar
137 # segment-nz = 1*pchar
138 # segment-nz-nc = 1*( unreserved / pct-encoded / sub-delims / "@" )
139 # ; non-zero-length segment without any colon ":"
140 # pchar = unreserved / pct-encoded / sub-delims / ":" / "@"
141 # query = *( pchar / "/" / "?" )
142 # fragment = *( pchar / "/" / "?" )
143 # URI-reference = URI / relative-ref
144 # relative-ref = relative-part [ "?" query ] [ "#" fragment ]
145 # relative-part = "//" authority path-abempty
146 # / path-absolute
147 # / path-noscheme
148 # / path-empty
149 # absolute-URI = scheme ":" hier-part [ "?" query ]
150 __slots__ = ("_cache", "_val")
152 _QUOTER = _Quoter(requote=False)
153 _REQUOTER = _Quoter()
154 _PATH_QUOTER = _Quoter(safe="@:", protected="/+", requote=False)
155 _PATH_REQUOTER = _Quoter(safe="@:", protected="/+")
156 _QUERY_QUOTER = _Quoter(safe="?/:@", protected="=+&;", qs=True, requote=False)
157 _QUERY_REQUOTER = _Quoter(safe="?/:@", protected="=+&;", qs=True)
158 _QUERY_PART_QUOTER = _Quoter(safe="?/:@", qs=True, requote=False)
159 _FRAGMENT_QUOTER = _Quoter(safe="?/:@", requote=False)
160 _FRAGMENT_REQUOTER = _Quoter(safe="?/:@")
162 _UNQUOTER = _Unquoter()
163 _PATH_UNQUOTER = _Unquoter(unsafe="+")
164 _QS_UNQUOTER = _Unquoter(qs=True)
166 def __new__(cls, val="", *, encoded=False, strict=None):
167 if strict is not None: # pragma: no cover
168 warnings.warn("strict parameter is ignored")
169 if type(val) is cls:
170 return val
171 if type(val) is str:
172 val = urlsplit(val)
173 elif type(val) is SplitResult:
174 if not encoded:
175 raise ValueError("Cannot apply decoding to SplitResult")
176 elif isinstance(val, str):
177 val = urlsplit(str(val))
178 else:
179 raise TypeError("Constructor parameter should be str")
181 if not encoded:
182 if not val[1]: # netloc
183 netloc = ""
184 host = ""
185 else:
186 host = val.hostname
187 if host is None:
188 raise ValueError("Invalid URL: host is required for absolute urls")
190 try:
191 port = val.port
192 except ValueError as e:
193 raise ValueError(
194 "Invalid URL: port can't be converted to integer"
195 ) from e
197 netloc = cls._make_netloc(
198 val.username, val.password, host, port, encode=True, requote=True
199 )
200 path = cls._PATH_REQUOTER(val[2])
201 if netloc:
202 path = cls._normalize_path(path)
204 cls._validate_authority_uri_abs_path(host=host, path=path)
205 query = cls._QUERY_REQUOTER(val[3])
206 fragment = cls._FRAGMENT_REQUOTER(val[4])
207 val = SplitResult(val[0], netloc, path, query, fragment)
209 self = object.__new__(cls)
210 self._val = val
211 self._cache = {}
212 return self
214 @classmethod
215 def build(
216 cls,
217 *,
218 scheme="",
219 authority="",
220 user=None,
221 password=None,
222 host="",
223 port=None,
224 path="",
225 query=None,
226 query_string="",
227 fragment="",
228 encoded=False,
229 ):
230 """Creates and returns a new URL"""
232 if authority and (user or password or host or port):
233 raise ValueError(
234 'Can\'t mix "authority" with "user", "password", "host" or "port".'
235 )
236 if port and not host:
237 raise ValueError('Can\'t build URL with "port" but without "host".')
238 if query and query_string:
239 raise ValueError('Only one of "query" or "query_string" should be passed')
240 if (
241 scheme is None
242 or authority is None
243 or host is None
244 or path is None
245 or query_string is None
246 or fragment is None
247 ):
248 raise TypeError(
249 'NoneType is illegal for "scheme", "authority", "host", "path", '
250 '"query_string", and "fragment" args, use empty string instead.'
251 )
253 if authority:
254 if encoded:
255 netloc = authority
256 else:
257 tmp = SplitResult("", authority, "", "", "")
258 netloc = cls._make_netloc(
259 tmp.username, tmp.password, tmp.hostname, tmp.port, encode=True
260 )
261 elif not user and not password and not host and not port:
262 netloc = ""
263 else:
264 netloc = cls._make_netloc(
265 user, password, host, port, encode=not encoded, encode_host=not encoded
266 )
267 if not encoded:
268 path = cls._PATH_QUOTER(path)
269 if netloc:
270 path = cls._normalize_path(path)
272 cls._validate_authority_uri_abs_path(host=host, path=path)
273 query_string = cls._QUERY_QUOTER(query_string)
274 fragment = cls._FRAGMENT_QUOTER(fragment)
276 url = cls(
277 SplitResult(scheme, netloc, path, query_string, fragment), encoded=True
278 )
280 if query:
281 return url.with_query(query)
282 else:
283 return url
285 def __init_subclass__(cls):
286 raise TypeError(f"Inheriting a class {cls!r} from URL is forbidden")
288 def __str__(self):
289 val = self._val
290 if not val.path and self.is_absolute() and (val.query or val.fragment):
291 val = val._replace(path="/")
292 return urlunsplit(val)
294 def __repr__(self):
295 return f"{self.__class__.__name__}('{str(self)}')"
297 def __bytes__(self):
298 return str(self).encode("ascii")
300 def __eq__(self, other):
301 if not type(other) is URL:
302 return NotImplemented
304 val1 = self._val
305 if not val1.path and self.is_absolute():
306 val1 = val1._replace(path="/")
308 val2 = other._val
309 if not val2.path and other.is_absolute():
310 val2 = val2._replace(path="/")
312 return val1 == val2
314 def __hash__(self):
315 ret = self._cache.get("hash")
316 if ret is None:
317 val = self._val
318 if not val.path and self.is_absolute():
319 val = val._replace(path="/")
320 ret = self._cache["hash"] = hash(val)
321 return ret
323 def __le__(self, other):
324 if not type(other) is URL:
325 return NotImplemented
326 return self._val <= other._val
328 def __lt__(self, other):
329 if not type(other) is URL:
330 return NotImplemented
331 return self._val < other._val
333 def __ge__(self, other):
334 if not type(other) is URL:
335 return NotImplemented
336 return self._val >= other._val
338 def __gt__(self, other):
339 if not type(other) is URL:
340 return NotImplemented
341 return self._val > other._val
343 def __truediv__(self, name):
344 if not type(name) is str:
345 return NotImplemented
346 return self._make_child((name,))
348 def __mod__(self, query):
349 return self.update_query(query)
351 def __bool__(self) -> bool:
352 return bool(
353 self._val.netloc or self._val.path or self._val.query or self._val.fragment
354 )
356 def __getstate__(self):
357 return (self._val,)
359 def __setstate__(self, state):
360 if state[0] is None and isinstance(state[1], dict):
361 # default style pickle
362 self._val = state[1]["_val"]
363 else:
364 self._val, *unused = state
365 self._cache = {}
367 def is_absolute(self):
368 """A check for absolute URLs.
370 Return True for absolute ones (having scheme or starting
371 with //), False otherwise.
373 """
374 return self.raw_host is not None
376 def is_default_port(self):
377 """A check for default port.
379 Return True if port is default for specified scheme,
380 e.g. 'http://python.org' or 'http://python.org:80', False
381 otherwise.
383 """
384 if self.port is None:
385 return False
386 default = DEFAULT_PORTS.get(self.scheme)
387 if default is None:
388 return False
389 return self.port == default
391 def origin(self):
392 """Return an URL with scheme, host and port parts only.
394 user, password, path, query and fragment are removed.
396 """
397 # TODO: add a keyword-only option for keeping user/pass maybe?
398 if not self.is_absolute():
399 raise ValueError("URL should be absolute")
400 if not self._val.scheme:
401 raise ValueError("URL should have scheme")
402 v = self._val
403 netloc = self._make_netloc(None, None, v.hostname, v.port)
404 val = v._replace(netloc=netloc, path="", query="", fragment="")
405 return URL(val, encoded=True)
407 def relative(self):
408 """Return a relative part of the URL.
410 scheme, user, password, host and port are removed.
412 """
413 if not self.is_absolute():
414 raise ValueError("URL should be absolute")
415 val = self._val._replace(scheme="", netloc="")
416 return URL(val, encoded=True)
418 @property
419 def scheme(self):
420 """Scheme for absolute URLs.
422 Empty string for relative URLs or URLs starting with //
424 """
425 return self._val.scheme
427 @property
428 def raw_authority(self):
429 """Encoded authority part of URL.
431 Empty string for relative URLs.
433 """
434 return self._val.netloc
436 @cached_property
437 def authority(self):
438 """Decoded authority part of URL.
440 Empty string for relative URLs.
442 """
443 return self._make_netloc(
444 self.user, self.password, self.host, self.port, encode_host=False
445 )
447 @property
448 def raw_user(self):
449 """Encoded user part of URL.
451 None if user is missing.
453 """
454 # not .username
455 ret = self._val.username
456 if not ret:
457 return None
458 return ret
460 @cached_property
461 def user(self):
462 """Decoded user part of URL.
464 None if user is missing.
466 """
467 return self._UNQUOTER(self.raw_user)
469 @property
470 def raw_password(self):
471 """Encoded password part of URL.
473 None if password is missing.
475 """
476 return self._val.password
478 @cached_property
479 def password(self):
480 """Decoded password part of URL.
482 None if password is missing.
484 """
485 return self._UNQUOTER(self.raw_password)
487 @property
488 def raw_host(self):
489 """Encoded host part of URL.
491 None for relative URLs.
493 """
494 # Use host instead of hostname for sake of shortness
495 # May add .hostname prop later
496 return self._val.hostname
498 @cached_property
499 def host(self):
500 """Decoded host part of URL.
502 None for relative URLs.
504 """
505 raw = self.raw_host
506 if raw is None:
507 return None
508 if "%" in raw:
509 # Hack for scoped IPv6 addresses like
510 # fe80::2%Проверка
511 # presence of '%' sign means only IPv6 address, so idna is useless.
512 return raw
513 return _idna_decode(raw)
515 @property
516 def port(self):
517 """Port part of URL, with scheme-based fallback.
519 None for relative URLs or URLs without explicit port and
520 scheme without default port substitution.
522 """
523 return self._val.port or DEFAULT_PORTS.get(self._val.scheme)
525 @property
526 def explicit_port(self):
527 """Port part of URL, without scheme-based fallback.
529 None for relative URLs or URLs without explicit port.
531 """
532 return self._val.port
534 @property
535 def raw_path(self):
536 """Encoded path of URL.
538 / for absolute URLs without path part.
540 """
541 ret = self._val.path
542 if not ret and self.is_absolute():
543 ret = "/"
544 return ret
546 @cached_property
547 def path(self):
548 """Decoded path of URL.
550 / for absolute URLs without path part.
552 """
553 return self._PATH_UNQUOTER(self.raw_path)
555 @cached_property
556 def query(self):
557 """A MultiDictProxy representing parsed query parameters in decoded
558 representation.
560 Empty value if URL has no query part.
562 """
563 ret = MultiDict(parse_qsl(self.raw_query_string, keep_blank_values=True))
564 return MultiDictProxy(ret)
566 @property
567 def raw_query_string(self):
568 """Encoded query part of URL.
570 Empty string if query is missing.
572 """
573 return self._val.query
575 @cached_property
576 def query_string(self):
577 """Decoded query part of URL.
579 Empty string if query is missing.
581 """
582 return self._QS_UNQUOTER(self.raw_query_string)
584 @cached_property
585 def path_qs(self):
586 """Decoded path of URL with query."""
587 if not self.query_string:
588 return self.path
589 return f"{self.path}?{self.query_string}"
591 @cached_property
592 def raw_path_qs(self):
593 """Encoded path of URL with query."""
594 if not self.raw_query_string:
595 return self.raw_path
596 return f"{self.raw_path}?{self.raw_query_string}"
598 @property
599 def raw_fragment(self):
600 """Encoded fragment part of URL.
602 Empty string if fragment is missing.
604 """
605 return self._val.fragment
607 @cached_property
608 def fragment(self):
609 """Decoded fragment part of URL.
611 Empty string if fragment is missing.
613 """
614 return self._UNQUOTER(self.raw_fragment)
616 @cached_property
617 def raw_parts(self):
618 """A tuple containing encoded *path* parts.
620 ('/',) for absolute URLs if *path* is missing.
622 """
623 path = self._val.path
624 if self.is_absolute():
625 if not path:
626 parts = ["/"]
627 else:
628 parts = ["/"] + path[1:].split("/")
629 else:
630 if path.startswith("/"):
631 parts = ["/"] + path[1:].split("/")
632 else:
633 parts = path.split("/")
634 return tuple(parts)
636 @cached_property
637 def parts(self):
638 """A tuple containing decoded *path* parts.
640 ('/',) for absolute URLs if *path* is missing.
642 """
643 return tuple(self._UNQUOTER(part) for part in self.raw_parts)
645 @cached_property
646 def parent(self):
647 """A new URL with last part of path removed and cleaned up query and
648 fragment.
650 """
651 path = self.raw_path
652 if not path or path == "/":
653 if self.raw_fragment or self.raw_query_string:
654 return URL(self._val._replace(query="", fragment=""), encoded=True)
655 return self
656 parts = path.split("/")
657 val = self._val._replace(path="/".join(parts[:-1]), query="", fragment="")
658 return URL(val, encoded=True)
660 @cached_property
661 def raw_name(self):
662 """The last part of raw_parts."""
663 parts = self.raw_parts
664 if self.is_absolute():
665 parts = parts[1:]
666 if not parts:
667 return ""
668 else:
669 return parts[-1]
670 else:
671 return parts[-1]
673 @cached_property
674 def name(self):
675 """The last part of parts."""
676 return self._UNQUOTER(self.raw_name)
678 @cached_property
679 def raw_suffix(self):
680 name = self.raw_name
681 i = name.rfind(".")
682 if 0 < i < len(name) - 1:
683 return name[i:]
684 else:
685 return ""
687 @cached_property
688 def suffix(self):
689 return self._UNQUOTER(self.raw_suffix)
691 @cached_property
692 def raw_suffixes(self):
693 name = self.raw_name
694 if name.endswith("."):
695 return ()
696 name = name.lstrip(".")
697 return tuple("." + suffix for suffix in name.split(".")[1:])
699 @cached_property
700 def suffixes(self):
701 return tuple(self._UNQUOTER(suffix) for suffix in self.raw_suffixes)
703 @staticmethod
704 def _validate_authority_uri_abs_path(host, path):
705 """Ensure that path in URL with authority starts with a leading slash.
707 Raise ValueError if not.
708 """
709 if len(host) > 0 and len(path) > 0 and not path.startswith("/"):
710 raise ValueError(
711 "Path in a URL with authority should start with a slash ('/') if set"
712 )
714 def _make_child(self, segments, encoded=False):
715 """add segments to self._val.path, accounting for absolute vs relative paths"""
716 parsed = []
717 for seg in reversed(segments):
718 if not seg:
719 continue
720 if seg[0] == "/":
721 raise ValueError(
722 f"Appending path {seg!r} starting from slash is forbidden"
723 )
724 seg = seg if encoded else self._PATH_QUOTER(seg)
725 if "/" in seg:
726 parsed += (
727 sub for sub in reversed(seg.split("/")) if sub and sub != "."
728 )
729 elif seg != ".":
730 parsed.append(seg)
731 parsed.reverse()
732 old_path = self._val.path
733 if old_path:
734 parsed = [*old_path.rstrip("/").split("/"), *parsed]
735 if self.is_absolute():
736 parsed = _normalize_path_segments(parsed)
737 if parsed and parsed[0] != "":
738 # inject a leading slash when adding a path to an absolute URL
739 # where there was none before
740 parsed = ["", *parsed]
741 new_path = "/".join(parsed)
742 return URL(
743 self._val._replace(path=new_path, query="", fragment=""), encoded=True
744 )
746 @classmethod
747 def _normalize_path(cls, path):
748 # Drop '.' and '..' from str path
750 prefix = ""
751 if path.startswith("/"):
752 # preserve the "/" root element of absolute paths, copying it to the
753 # normalised output as per sections 5.2.4 and 6.2.2.3 of rfc3986.
754 prefix = "/"
755 path = path[1:]
757 segments = path.split("/")
758 return prefix + "/".join(_normalize_path_segments(segments))
760 @classmethod
761 def _encode_host(cls, host, human=False):
762 try:
763 ip, sep, zone = host.partition("%")
764 ip = ip_address(ip)
765 except ValueError:
766 host = host.lower()
767 # IDNA encoding is slow,
768 # skip it for ASCII-only strings
769 # Don't move the check into _idna_encode() helper
770 # to reduce the cache size
771 if human or host.isascii():
772 return host
773 host = _idna_encode(host)
774 else:
775 host = ip.compressed
776 if sep:
777 host += "%" + zone
778 if ip.version == 6:
779 host = "[" + host + "]"
780 return host
782 @classmethod
783 def _make_netloc(
784 cls, user, password, host, port, encode=False, encode_host=True, requote=False
785 ):
786 quoter = cls._REQUOTER if requote else cls._QUOTER
787 if encode_host:
788 ret = cls._encode_host(host)
789 else:
790 ret = host
791 if port is not None:
792 ret = ret + ":" + str(port)
793 if password is not None:
794 if not user:
795 user = ""
796 else:
797 if encode:
798 user = quoter(user)
799 if encode:
800 password = quoter(password)
801 user = user + ":" + password
802 elif user and encode:
803 user = quoter(user)
804 if user:
805 ret = user + "@" + ret
806 return ret
808 def with_scheme(self, scheme):
809 """Return a new URL with scheme replaced."""
810 # N.B. doesn't cleanup query/fragment
811 if not isinstance(scheme, str):
812 raise TypeError("Invalid scheme type")
813 if not self.is_absolute():
814 raise ValueError("scheme replacement is not allowed for relative URLs")
815 return URL(self._val._replace(scheme=scheme.lower()), encoded=True)
817 def with_user(self, user):
818 """Return a new URL with user replaced.
820 Autoencode user if needed.
822 Clear user/password if user is None.
824 """
825 # N.B. doesn't cleanup query/fragment
826 val = self._val
827 if user is None:
828 password = None
829 elif isinstance(user, str):
830 user = self._QUOTER(user)
831 password = val.password
832 else:
833 raise TypeError("Invalid user type")
834 if not self.is_absolute():
835 raise ValueError("user replacement is not allowed for relative URLs")
836 return URL(
837 self._val._replace(
838 netloc=self._make_netloc(user, password, val.hostname, val.port)
839 ),
840 encoded=True,
841 )
843 def with_password(self, password):
844 """Return a new URL with password replaced.
846 Autoencode password if needed.
848 Clear password if argument is None.
850 """
851 # N.B. doesn't cleanup query/fragment
852 if password is None:
853 pass
854 elif isinstance(password, str):
855 password = self._QUOTER(password)
856 else:
857 raise TypeError("Invalid password type")
858 if not self.is_absolute():
859 raise ValueError("password replacement is not allowed for relative URLs")
860 val = self._val
861 return URL(
862 self._val._replace(
863 netloc=self._make_netloc(val.username, password, val.hostname, val.port)
864 ),
865 encoded=True,
866 )
868 def with_host(self, host):
869 """Return a new URL with host replaced.
871 Autoencode host if needed.
873 Changing host for relative URLs is not allowed, use .join()
874 instead.
876 """
877 # N.B. doesn't cleanup query/fragment
878 if not isinstance(host, str):
879 raise TypeError("Invalid host type")
880 if not self.is_absolute():
881 raise ValueError("host replacement is not allowed for relative URLs")
882 if not host:
883 raise ValueError("host removing is not allowed")
884 val = self._val
885 return URL(
886 self._val._replace(
887 netloc=self._make_netloc(val.username, val.password, host, val.port)
888 ),
889 encoded=True,
890 )
892 def with_port(self, port):
893 """Return a new URL with port replaced.
895 Clear port to default if None is passed.
897 """
898 # N.B. doesn't cleanup query/fragment
899 if port is not None:
900 if isinstance(port, bool) or not isinstance(port, int):
901 raise TypeError(f"port should be int or None, got {type(port)}")
902 if port < 0 or port > 65535:
903 raise ValueError(f"port must be between 0 and 65535, got {port}")
904 if not self.is_absolute():
905 raise ValueError("port replacement is not allowed for relative URLs")
906 val = self._val
907 return URL(
908 self._val._replace(
909 netloc=self._make_netloc(val.username, val.password, val.hostname, port)
910 ),
911 encoded=True,
912 )
914 def with_path(self, path, *, encoded=False):
915 """Return a new URL with path replaced."""
916 if not encoded:
917 path = self._PATH_QUOTER(path)
918 if self.is_absolute():
919 path = self._normalize_path(path)
920 if len(path) > 0 and path[0] != "/":
921 path = "/" + path
922 return URL(self._val._replace(path=path, query="", fragment=""), encoded=True)
924 @classmethod
925 def _query_seq_pairs(cls, quoter, pairs):
926 for key, val in pairs:
927 if isinstance(val, (list, tuple)):
928 for v in val:
929 yield quoter(key) + "=" + quoter(cls._query_var(v))
930 else:
931 yield quoter(key) + "=" + quoter(cls._query_var(val))
933 @staticmethod
934 def _query_var(v):
935 cls = type(v)
936 if issubclass(cls, str):
937 return v
938 if issubclass(cls, float):
939 if math.isinf(v):
940 raise ValueError("float('inf') is not supported")
941 if math.isnan(v):
942 raise ValueError("float('nan') is not supported")
943 return str(float(v))
944 if issubclass(cls, int) and cls is not bool:
945 return str(int(v))
946 raise TypeError(
947 "Invalid variable type: value "
948 "should be str, int or float, got {!r} "
949 "of type {}".format(v, cls)
950 )
952 def _get_str_query(self, *args, **kwargs):
953 if kwargs:
954 if len(args) > 0:
955 raise ValueError(
956 "Either kwargs or single query parameter must be present"
957 )
958 query = kwargs
959 elif len(args) == 1:
960 query = args[0]
961 else:
962 raise ValueError("Either kwargs or single query parameter must be present")
964 if query is None:
965 query = None
966 elif isinstance(query, Mapping):
967 quoter = self._QUERY_PART_QUOTER
968 query = "&".join(self._query_seq_pairs(quoter, query.items()))
969 elif isinstance(query, str):
970 query = self._QUERY_QUOTER(query)
971 elif isinstance(query, (bytes, bytearray, memoryview)):
972 raise TypeError(
973 "Invalid query type: bytes, bytearray and memoryview are forbidden"
974 )
975 elif isinstance(query, Sequence):
976 quoter = self._QUERY_PART_QUOTER
977 # We don't expect sequence values if we're given a list of pairs
978 # already; only mappings like builtin `dict` which can't have the
979 # same key pointing to multiple values are allowed to use
980 # `_query_seq_pairs`.
981 query = "&".join(
982 quoter(k) + "=" + quoter(self._query_var(v)) for k, v in query
983 )
984 else:
985 raise TypeError(
986 "Invalid query type: only str, mapping or "
987 "sequence of (key, value) pairs is allowed"
988 )
990 return query
992 def with_query(self, *args, **kwargs):
993 """Return a new URL with query part replaced.
995 Accepts any Mapping (e.g. dict, multidict.MultiDict instances)
996 or str, autoencode the argument if needed.
998 A sequence of (key, value) pairs is supported as well.
1000 It also can take an arbitrary number of keyword arguments.
1002 Clear query if None is passed.
1004 """
1005 # N.B. doesn't cleanup query/fragment
1007 new_query = self._get_str_query(*args, **kwargs) or ""
1008 return URL(
1009 self._val._replace(path=self._val.path, query=new_query), encoded=True
1010 )
1012 def update_query(self, *args, **kwargs):
1013 """Return a new URL with query part updated."""
1014 s = self._get_str_query(*args, **kwargs)
1015 query = None
1016 if s is not None:
1017 new_query = MultiDict(parse_qsl(s, keep_blank_values=True))
1018 query = MultiDict(self.query)
1019 query.update(new_query)
1021 return URL(
1022 self._val._replace(query=self._get_str_query(query) or ""), encoded=True
1023 )
1025 def with_fragment(self, fragment):
1026 """Return a new URL with fragment replaced.
1028 Autoencode fragment if needed.
1030 Clear fragment to default if None is passed.
1032 """
1033 # N.B. doesn't cleanup query/fragment
1034 if fragment is None:
1035 raw_fragment = ""
1036 elif not isinstance(fragment, str):
1037 raise TypeError("Invalid fragment type")
1038 else:
1039 raw_fragment = self._FRAGMENT_QUOTER(fragment)
1040 if self.raw_fragment == raw_fragment:
1041 return self
1042 return URL(self._val._replace(fragment=raw_fragment), encoded=True)
1044 def with_name(self, name):
1045 """Return a new URL with name (last part of path) replaced.
1047 Query and fragment parts are cleaned up.
1049 Name is encoded if needed.
1051 """
1052 # N.B. DOES cleanup query/fragment
1053 if not isinstance(name, str):
1054 raise TypeError("Invalid name type")
1055 if "/" in name:
1056 raise ValueError("Slash in name is not allowed")
1057 name = self._PATH_QUOTER(name)
1058 if name in (".", ".."):
1059 raise ValueError(". and .. values are forbidden")
1060 parts = list(self.raw_parts)
1061 if self.is_absolute():
1062 if len(parts) == 1:
1063 parts.append(name)
1064 else:
1065 parts[-1] = name
1066 parts[0] = "" # replace leading '/'
1067 else:
1068 parts[-1] = name
1069 if parts[0] == "/":
1070 parts[0] = "" # replace leading '/'
1071 return URL(
1072 self._val._replace(path="/".join(parts), query="", fragment=""),
1073 encoded=True,
1074 )
1076 def with_suffix(self, suffix):
1077 """Return a new URL with suffix (file extension of name) replaced.
1079 Query and fragment parts are cleaned up.
1081 suffix is encoded if needed.
1082 """
1083 if not isinstance(suffix, str):
1084 raise TypeError("Invalid suffix type")
1085 if suffix and not suffix.startswith(".") or suffix == ".":
1086 raise ValueError(f"Invalid suffix {suffix!r}")
1087 name = self.raw_name
1088 if not name:
1089 raise ValueError(f"{self!r} has an empty name")
1090 old_suffix = self.raw_suffix
1091 if not old_suffix:
1092 name = name + suffix
1093 else:
1094 name = name[: -len(old_suffix)] + suffix
1095 return self.with_name(name)
1097 def join(self, url):
1098 """Join URLs
1100 Construct a full (“absolute”) URL by combining a “base URL”
1101 (self) with another URL (url).
1103 Informally, this uses components of the base URL, in
1104 particular the addressing scheme, the network location and
1105 (part of) the path, to provide missing components in the
1106 relative URL.
1108 """
1109 # See docs for urllib.parse.urljoin
1110 if not isinstance(url, URL):
1111 raise TypeError("url should be URL")
1112 return URL(urljoin(str(self), str(url)), encoded=True)
1114 def joinpath(self, *other, encoded=False):
1115 """Return a new URL with the elements in other appended to the path."""
1116 return self._make_child(other, encoded=encoded)
1118 def human_repr(self):
1119 """Return decoded human readable string for URL representation."""
1120 user = _human_quote(self.user, "#/:?@")
1121 password = _human_quote(self.password, "#/:?@")
1122 host = self.host
1123 if host:
1124 host = self._encode_host(self.host, human=True)
1125 path = _human_quote(self.path, "#?")
1126 query_string = "&".join(
1127 "{}={}".format(_human_quote(k, "#&+;="), _human_quote(v, "#&+;="))
1128 for k, v in self.query.items()
1129 )
1130 fragment = _human_quote(self.fragment, "")
1131 return urlunsplit(
1132 SplitResult(
1133 self.scheme,
1134 self._make_netloc(
1135 user,
1136 password,
1137 host,
1138 self._val.port,
1139 encode_host=False,
1140 ),
1141 path,
1142 query_string,
1143 fragment,
1144 )
1145 )
1148def _human_quote(s, unsafe):
1149 if not s:
1150 return s
1151 for c in "%" + unsafe:
1152 if c in s:
1153 s = s.replace(c, f"%{ord(c):02X}")
1154 if s.isprintable():
1155 return s
1156 return "".join(c if c.isprintable() else quote(c) for c in s)
1159_MAXCACHE = 256
1162@functools.lru_cache(_MAXCACHE)
1163def _idna_decode(raw):
1164 try:
1165 return idna.decode(raw.encode("ascii"))
1166 except UnicodeError: # e.g. '::1'
1167 return raw.encode("ascii").decode("idna")
1170@functools.lru_cache(_MAXCACHE)
1171def _idna_encode(host):
1172 try:
1173 return idna.encode(host, uts46=True).decode("ascii")
1174 except UnicodeError:
1175 return host.encode("idna").decode("ascii")
1178@rewrite_module
1179def cache_clear():
1180 _idna_decode.cache_clear()
1181 _idna_encode.cache_clear()
1184@rewrite_module
1185def cache_info():
1186 return {
1187 "idna_encode": _idna_encode.cache_info(),
1188 "idna_decode": _idna_decode.cache_info(),
1189 }
1192@rewrite_module
1193def cache_configure(*, idna_encode_size=_MAXCACHE, idna_decode_size=_MAXCACHE):
1194 global _idna_decode, _idna_encode
1196 _idna_encode = functools.lru_cache(idna_encode_size)(_idna_encode.__wrapped__)
1197 _idna_decode = functools.lru_cache(idna_decode_size)(_idna_decode.__wrapped__)