1# Copyright (c) 2015 Ian Stapleton Cordasco
2# Licensed under the Apache License, Version 2.0 (the "License");
3# you may not use this file except in compliance with the License.
4# You may obtain a copy of the License at
5#
6# http://www.apache.org/licenses/LICENSE-2.0
7#
8# Unless required by applicable law or agreed to in writing, software
9# distributed under the License is distributed on an "AS IS" BASIS,
10# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
11# implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Module containing the urlparse compatibility logic."""
15import typing as t
16from collections import namedtuple
17
18from . import compat
19from . import exceptions
20from . import misc
21from . import normalizers
22from . import uri
23from ._typing_compat import Self as _Self
24
25__all__ = ("ParseResult", "ParseResultBytes")
26
27PARSED_COMPONENTS = (
28 "scheme",
29 "userinfo",
30 "host",
31 "port",
32 "path",
33 "query",
34 "fragment",
35)
36
37
38class ParseResultMixin(t.Generic[t.AnyStr]):
39 if t.TYPE_CHECKING:
40 userinfo: t.Optional[t.AnyStr]
41 host: t.Optional[t.AnyStr]
42 port: t.Optional[int]
43 query: t.Optional[t.AnyStr]
44 encoding: str
45
46 @property
47 def authority(self) -> t.Optional[t.AnyStr]: ...
48
49 def _generate_authority(
50 self,
51 attributes: t.Dict[str, t.Optional[t.AnyStr]],
52 ) -> t.Optional[str]:
53 # I swear I did not align the comparisons below. That's just how they
54 # happened to align based on pep8 and attribute lengths.
55 userinfo, host, port = (
56 attributes[p] for p in ("userinfo", "host", "port")
57 )
58 if self.userinfo != userinfo or self.host != host or self.port != port:
59 if port:
60 port = f"{port}"
61 return normalizers.normalize_authority(
62 (
63 compat.to_str(userinfo, self.encoding),
64 compat.to_str(host, self.encoding),
65 port,
66 )
67 )
68 if isinstance(self.authority, bytes):
69 return self.authority.decode("utf-8")
70 return self.authority
71
72 def geturl(self) -> t.AnyStr:
73 """Shim to match the standard library method."""
74 return self.unsplit()
75
76 @property
77 def hostname(self) -> t.Optional[t.AnyStr]:
78 """Shim to match the standard library."""
79 return self.host
80
81 @property
82 def netloc(self) -> t.Optional[t.AnyStr]:
83 """Shim to match the standard library."""
84 return self.authority
85
86 @property
87 def params(self) -> t.Optional[t.AnyStr]:
88 """Shim to match the standard library."""
89 return self.query
90
91
92class ParseResult(
93 namedtuple("ParseResult", PARSED_COMPONENTS), ParseResultMixin[str]
94):
95 """Implementation of urlparse compatibility class.
96
97 This uses the URIReference logic to handle compatibility with the
98 urlparse.ParseResult class.
99 """
100
101 scheme: t.Optional[str]
102 userinfo: t.Optional[str]
103 host: t.Optional[str]
104 port: t.Optional[int]
105 path: t.Optional[str]
106 query: t.Optional[str]
107 fragment: t.Optional[str]
108 encoding: str
109 reference: "uri.URIReference"
110
111 def __new__(
112 cls,
113 scheme: t.Optional[str],
114 userinfo: t.Optional[str],
115 host: t.Optional[str],
116 port: t.Optional[int],
117 path: t.Optional[str],
118 query: t.Optional[str],
119 fragment: t.Optional[str],
120 uri_ref: "uri.URIReference",
121 encoding: str = "utf-8",
122 ) -> _Self:
123 """Create a new ParseResult."""
124 parse_result = super().__new__(
125 cls,
126 scheme or None,
127 userinfo or None,
128 host,
129 port or None,
130 path or None,
131 query,
132 fragment,
133 )
134 parse_result.encoding = encoding
135 parse_result.reference = uri_ref
136 return parse_result
137
138 @classmethod
139 def from_parts(
140 cls,
141 scheme: t.Optional[str] = None,
142 userinfo: t.Optional[str] = None,
143 host: t.Optional[str] = None,
144 port: t.Optional[t.Union[int, str]] = None,
145 path: t.Optional[str] = None,
146 query: t.Optional[str] = None,
147 fragment: t.Optional[str] = None,
148 encoding: str = "utf-8",
149 ) -> _Self:
150 """Create a ParseResult instance from its parts."""
151 authority = ""
152 if userinfo is not None:
153 authority += userinfo + "@"
154 if host is not None:
155 authority += host
156 if port is not None:
157 authority += f":{port}"
158 uri_ref = uri.URIReference(
159 scheme=scheme,
160 authority=authority,
161 path=path,
162 query=query,
163 fragment=fragment,
164 encoding=encoding,
165 ).normalize()
166 userinfo, host, port = authority_from(uri_ref, strict=True)
167 return cls(
168 scheme=uri_ref.scheme,
169 userinfo=userinfo,
170 host=host,
171 port=port,
172 path=uri_ref.path,
173 query=uri_ref.query,
174 fragment=uri_ref.fragment,
175 uri_ref=uri_ref,
176 encoding=encoding,
177 )
178
179 @classmethod
180 def from_string(
181 cls,
182 uri_string: t.Union[str, bytes],
183 encoding: str = "utf-8",
184 strict: bool = True,
185 lazy_normalize: bool = True,
186 ) -> _Self:
187 """Parse a URI from the given unicode URI string.
188
189 :param str uri_string: Unicode URI to be parsed into a reference.
190 :param str encoding: The encoding of the string provided
191 :param bool strict: Parse strictly according to :rfc:`3986` if True.
192 If False, parse similarly to the standard library's urlparse
193 function.
194 :returns: :class:`ParseResult` or subclass thereof
195 """
196 reference = uri.URIReference.from_string(uri_string, encoding)
197 if not lazy_normalize:
198 reference = reference.normalize()
199 userinfo, host, port = authority_from(reference, strict)
200
201 return cls(
202 scheme=reference.scheme,
203 userinfo=userinfo,
204 host=host,
205 port=port,
206 path=reference.path,
207 query=reference.query,
208 fragment=reference.fragment,
209 uri_ref=reference,
210 encoding=encoding,
211 )
212
213 @property
214 def authority(self) -> t.Optional[str]:
215 """Return the normalized authority."""
216 return self.reference.authority
217
218 def copy_with(
219 self,
220 scheme: t.Optional[str] = misc.UseExisting,
221 userinfo: t.Optional[str] = misc.UseExisting,
222 host: t.Optional[str] = misc.UseExisting,
223 port: t.Optional[t.Union[int, str]] = misc.UseExisting,
224 path: t.Optional[str] = misc.UseExisting,
225 query: t.Optional[str] = misc.UseExisting,
226 fragment: t.Optional[str] = misc.UseExisting,
227 ) -> "ParseResult":
228 """Create a copy of this instance replacing with specified parts."""
229 attributes = zip(
230 PARSED_COMPONENTS,
231 (scheme, userinfo, host, port, path, query, fragment),
232 )
233 attrs_dict: t.Dict[str, t.Optional[str]] = {}
234 for name, value in attributes:
235 if value is misc.UseExisting:
236 value = getattr(self, name)
237 attrs_dict[name] = value
238 authority = self._generate_authority(attrs_dict)
239 ref = self.reference.copy_with(
240 scheme=attrs_dict["scheme"],
241 authority=authority,
242 path=attrs_dict["path"],
243 query=attrs_dict["query"],
244 fragment=attrs_dict["fragment"],
245 )
246 return ParseResult(uri_ref=ref, encoding=self.encoding, **attrs_dict)
247
248 def encode(self, encoding: t.Optional[str] = None) -> "ParseResultBytes":
249 """Convert to an instance of ParseResultBytes."""
250 encoding = encoding or self.encoding
251 attrs = dict(
252 zip(
253 PARSED_COMPONENTS,
254 (
255 attr.encode(encoding) if hasattr(attr, "encode") else attr
256 for attr in self
257 ),
258 )
259 )
260 return ParseResultBytes(
261 uri_ref=self.reference, encoding=encoding, **attrs
262 )
263
264 def unsplit(self, use_idna: bool = False) -> str:
265 """Create a URI string from the components.
266
267 :returns: The parsed URI reconstituted as a string.
268 :rtype: str
269 """
270 parse_result = self
271 if use_idna and self.host:
272 hostbytes = self.host.encode("idna")
273 host = hostbytes.decode(self.encoding)
274 parse_result = self.copy_with(host=host)
275 return parse_result.reference.unsplit()
276
277
278class ParseResultBytes(
279 namedtuple("ParseResultBytes", PARSED_COMPONENTS), ParseResultMixin[bytes]
280):
281 """Compatibility shim for the urlparse.ParseResultBytes object."""
282
283 scheme: t.Optional[bytes]
284 userinfo: t.Optional[bytes]
285 host: t.Optional[bytes]
286 port: t.Optional[int]
287 path: t.Optional[bytes]
288 query: t.Optional[bytes]
289 fragment: t.Optional[bytes]
290 encoding: str
291 reference: "uri.URIReference"
292 lazy_normalize: bool
293
294 def __new__(
295 cls,
296 scheme: t.Optional[bytes],
297 userinfo: t.Optional[bytes],
298 host: t.Optional[bytes],
299 port: t.Optional[int],
300 path: t.Optional[bytes],
301 query: t.Optional[bytes],
302 fragment: t.Optional[bytes],
303 uri_ref: "uri.URIReference",
304 encoding: str = "utf-8",
305 lazy_normalize: bool = True,
306 ) -> _Self:
307 """Create a new ParseResultBytes instance."""
308 parse_result = super().__new__(
309 cls,
310 scheme or None,
311 userinfo or None,
312 host,
313 port or None,
314 path or None,
315 query or None,
316 fragment or None,
317 )
318 parse_result.encoding = encoding
319 parse_result.reference = uri_ref
320 parse_result.lazy_normalize = lazy_normalize
321 return parse_result
322
323 @classmethod
324 def from_parts(
325 cls,
326 scheme: t.Optional[str] = None,
327 userinfo: t.Optional[str] = None,
328 host: t.Optional[str] = None,
329 port: t.Optional[t.Union[int, str]] = None,
330 path: t.Optional[str] = None,
331 query: t.Optional[str] = None,
332 fragment: t.Optional[str] = None,
333 encoding: str = "utf-8",
334 lazy_normalize: bool = True,
335 ) -> _Self:
336 """Create a ParseResult instance from its parts."""
337 authority = ""
338 if userinfo is not None:
339 authority += userinfo + "@"
340 if host is not None:
341 authority += host
342 if port is not None:
343 authority += f":{int(port)}"
344 uri_ref = uri.URIReference(
345 scheme=scheme,
346 authority=authority,
347 path=path,
348 query=query,
349 fragment=fragment,
350 encoding=encoding,
351 )
352 if not lazy_normalize:
353 uri_ref = uri_ref.normalize()
354 to_bytes = compat.to_bytes
355 userinfo, host, port = authority_from(uri_ref, strict=True)
356 return cls(
357 scheme=to_bytes(scheme, encoding),
358 userinfo=to_bytes(userinfo, encoding),
359 host=to_bytes(host, encoding),
360 port=port,
361 path=to_bytes(path, encoding),
362 query=to_bytes(query, encoding),
363 fragment=to_bytes(fragment, encoding),
364 uri_ref=uri_ref,
365 encoding=encoding,
366 lazy_normalize=lazy_normalize,
367 )
368
369 @classmethod
370 def from_string(
371 cls,
372 uri_string: t.Union[str, bytes],
373 encoding: str = "utf-8",
374 strict: bool = True,
375 lazy_normalize: bool = True,
376 ) -> _Self:
377 """Parse a URI from the given unicode URI string.
378
379 :param str uri_string: Unicode URI to be parsed into a reference.
380 :param str encoding: The encoding of the string provided
381 :param bool strict: Parse strictly according to :rfc:`3986` if True.
382 If False, parse similarly to the standard library's urlparse
383 function.
384 :returns: :class:`ParseResultBytes` or subclass thereof
385 """
386 reference = uri.URIReference.from_string(uri_string, encoding)
387 if not lazy_normalize:
388 reference = reference.normalize()
389 userinfo, host, port = authority_from(reference, strict)
390
391 to_bytes = compat.to_bytes
392 return cls(
393 scheme=to_bytes(reference.scheme, encoding),
394 userinfo=to_bytes(userinfo, encoding),
395 host=to_bytes(host, encoding),
396 port=port,
397 path=to_bytes(reference.path, encoding),
398 query=to_bytes(reference.query, encoding),
399 fragment=to_bytes(reference.fragment, encoding),
400 uri_ref=reference,
401 encoding=encoding,
402 lazy_normalize=lazy_normalize,
403 )
404
405 @property
406 def authority(self) -> bytes:
407 """Return the normalized authority."""
408 return self.reference.authority.encode(self.encoding)
409
410 def copy_with(
411 self,
412 scheme: t.Optional[t.Union[str, bytes]] = misc.UseExisting,
413 userinfo: t.Optional[t.Union[str, bytes]] = misc.UseExisting,
414 host: t.Optional[t.Union[str, bytes]] = misc.UseExisting,
415 port: t.Optional[t.Union[int, str, bytes]] = misc.UseExisting,
416 path: t.Optional[t.Union[str, bytes]] = misc.UseExisting,
417 query: t.Optional[t.Union[str, bytes]] = misc.UseExisting,
418 fragment: t.Optional[t.Union[str, bytes]] = misc.UseExisting,
419 lazy_normalize: bool = True,
420 ) -> "ParseResultBytes":
421 """Create a copy of this instance replacing with specified parts."""
422 attributes = zip(
423 PARSED_COMPONENTS,
424 (scheme, userinfo, host, port, path, query, fragment),
425 )
426 attrs_dict = {}
427 for name, value in attributes:
428 if value is misc.UseExisting:
429 value = getattr(self, name)
430 if not isinstance(value, bytes) and hasattr(value, "encode"):
431 value = value.encode(self.encoding)
432 attrs_dict[name] = value
433
434 if t.TYPE_CHECKING:
435 attrs_dict = t.cast(t.Dict[str, t.Optional[bytes]], attrs_dict)
436
437 authority = self._generate_authority(attrs_dict)
438 to_str = compat.to_str
439 ref = self.reference.copy_with(
440 scheme=to_str(attrs_dict["scheme"], self.encoding),
441 authority=to_str(authority, self.encoding),
442 path=to_str(attrs_dict["path"], self.encoding),
443 query=to_str(attrs_dict["query"], self.encoding),
444 fragment=to_str(attrs_dict["fragment"], self.encoding),
445 )
446 if not lazy_normalize:
447 ref = ref.normalize()
448 return ParseResultBytes(
449 uri_ref=ref,
450 encoding=self.encoding,
451 lazy_normalize=lazy_normalize,
452 **attrs_dict,
453 )
454
455 def unsplit(self, use_idna: bool = False) -> bytes:
456 """Create a URI bytes object from the components.
457
458 :returns: The parsed URI reconstituted as a string.
459 :rtype: bytes
460 """
461 parse_result = self
462 if use_idna and self.host:
463 # self.host is bytes, to encode to idna, we need to decode it
464 # first
465 host = self.host.decode(self.encoding)
466 hostbytes = host.encode("idna")
467 parse_result = self.copy_with(host=hostbytes)
468 if self.lazy_normalize:
469 parse_result = parse_result.copy_with(lazy_normalize=False)
470 uri = parse_result.reference.unsplit()
471 return uri.encode(self.encoding)
472
473
474def split_authority(
475 authority: str,
476) -> t.Tuple[t.Optional[str], t.Optional[str], t.Optional[str]]:
477 # Initialize our expected return values
478 userinfo = host = port = None
479 # Initialize an extra var we may need to use
480 extra_host = None
481 # Set-up rest in case there is no userinfo portion
482 rest = authority
483
484 if "@" in authority:
485 userinfo, rest = authority.rsplit("@", 1)
486
487 # Handle IPv6 host addresses
488 if rest.startswith("["):
489 host, rest = rest.split("]", 1)
490 host += "]"
491
492 if ":" in rest:
493 extra_host, port = rest.split(":", 1)
494 elif not host and rest:
495 host = rest
496
497 if extra_host and not host:
498 host = extra_host
499
500 return userinfo, host, port
501
502
503def authority_from(
504 reference: "uri.URIReference",
505 strict: bool,
506) -> t.Tuple[t.Optional[str], t.Optional[str], t.Optional[int]]:
507 try:
508 subauthority = reference.authority_info()
509 except exceptions.InvalidAuthority:
510 if strict:
511 raise
512 userinfo, host, port = split_authority(reference.authority)
513 else:
514 # Thanks to Richard Barrell for this idea:
515 # https://twitter.com/0x2ba22e11/status/617338811975139328
516 userinfo = subauthority.get("userinfo")
517 host = subauthority.get("host")
518 port = subauthority.get("port")
519
520 if port:
521 if port.isascii() and port.isdigit():
522 port = int(port)
523 else:
524 raise exceptions.InvalidPort(port)
525 return userinfo, host, port