Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/w3lib/_url.py: 67%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import dataclasses
4import functools
5import ipaddress
6import os
7import re
8import string
9import sys
10import unicodedata
11from typing import TYPE_CHECKING
12from urllib.parse import ParseResult, scheme_chars, uses_netloc, uses_params
14from w3lib._infra import _ASCII_TAB_OR_NEWLINE, _C0_CONTROL_OR_SPACE
16if TYPE_CHECKING:
17 from collections.abc import Generator
18 from urllib.parse import _QueryType
20_IS_WINDOWS = os.name == "nt"
23_FS_ENCODING = sys.getfilesystemencoding()
24_FS_ERRORS = sys.getfilesystemencodeerrors()
26# https://url.spec.whatwg.org/
27# https://url.spec.whatwg.org/commit-snapshots/a46cb9188a48c2c9d80ba32a9b1891652d6b4900/#default-port
28_DEFAULT_PORTS = {
29 "ftp": 21,
30 "file": None,
31 "http": 80,
32 "https": 443,
33 "ws": 80,
34 "wss": 443,
35}
36_SPECIAL_SCHEMES = set(_DEFAULT_PORTS.keys())
38# constants from RFC 3986, Section 2.2 and 2.3
39RFC3986_GEN_DELIMS = b":/?#[]@"
40RFC3986_SUB_DELIMS = b"!$&'()*+,;="
41RFC3986_RESERVED = RFC3986_GEN_DELIMS + RFC3986_SUB_DELIMS
42RFC3986_UNRESERVED = (string.ascii_letters + string.digits + "-._~").encode("ascii")
43EXTRA_SAFE_CHARS = b"|" # see https://github.com/scrapy/w3lib/pull/25
45RFC3986_USERINFO_SAFE_CHARS = RFC3986_UNRESERVED + RFC3986_SUB_DELIMS + b":"
46_SAFE_CHARS = RFC3986_RESERVED + RFC3986_UNRESERVED + EXTRA_SAFE_CHARS + b"%"
47_PATH_SAFE_CHARS = _SAFE_CHARS.replace(b"#", b"")
48_PATH_SAFE_CHARS_STR = _PATH_SAFE_CHARS.decode()
49_USES_NETLOC = frozenset(uses_netloc)
50_SCHEME_CHARS = frozenset(scheme_chars)
51_USES_PARAMS = frozenset(uses_params)
52_ASCII_TAB_OR_NEWLINE_TRANSLATION_TABLE = str.maketrans("", "", _ASCII_TAB_OR_NEWLINE)
53_C0_CONTROL_OR_SPACE_RE = re.compile(rf"[{_C0_CONTROL_OR_SPACE}]")
54_SCHEME_RE = re.compile(rf"^([{scheme_chars}]*):")
56_IPV_FUTURE_RE = re.compile(r"\Av[a-fA-F0-9]+\..+\Z")
57_NETLOC_DELIMS_RE = re.compile(r"[/?#@:]")
58_NETLOC_STRIP_CHARS = str.maketrans("", "", "@:#?")
61def _strip(input_string: str) -> str:
62 if not input_string:
63 return input_string
65 if not _C0_CONTROL_OR_SPACE_RE.search(input_string):
66 return input_string
68 return input_string.strip(_C0_CONTROL_OR_SPACE).translate(
69 _ASCII_TAB_OR_NEWLINE_TRANSLATION_TABLE
70 )
73@functools.cache
74def _hex_encode_table() -> bytes:
75 """Build a lookup table for percent-encoded byte values.
77 | byte | encoding |
78 |------|----------|
79 | 0 | %00 |
80 | 1 | %01 |
81 | ... | ... |
82 | 255 | %FF |
84 Each entry is exactly 3 bytes: b"%HH".
86 Returns:
87 A bytes object of length 256 * 3 containing all percent encodings.
88 """
89 return b"".join(f"%{i:02X}".encode() for i in range(256))
92@functools.cache
93def _hex_decode_table() -> bytes:
94 """Build a lookup table for decoding hex ASCII characters.
96 | ASCII | value |
97 |--------|--------------|
98 | '0'-'9'| 0-9 |
99 | 'A'-'F'| 10-15 |
100 | 'a'-'f'| 10-15 |
101 | other | 255 (invalid)|
103 Returns:
104 A bytes object of length 256 containing nibble values.
105 """
106 table = bytearray([255]) * 256
107 table[48:58] = bytes(range(10)) # '0'-'9'
108 table[65:71] = bytes(range(10, 16)) # 'A'-'F'
109 table[97:103] = bytes(range(10, 16)) # 'a'-'f'
110 return bytes(table)
113@functools.cache
114def _safe_table(safe: bytes = RFC3986_UNRESERVED) -> bytes:
115 """Build a lookup table marking safe (non-encoded) bytes.
117 | byte | is allowed? |
118 |------|-------------|
119 | 0 | 0 |
120 | 32 | 1 (if safe) |
121 | 65 | 1 |
122 | 255 | 0 |
124 Returns:
125 A bytes object of length 256 acting as a boolean mask (0/1).
126 """
127 table = bytearray(256)
128 for b in safe:
129 table[b] = 1
130 return bytes(table)
133@functools.cache
134def _quote_table(safe: bytes = b"", quote_plus: bool = False) -> tuple[bytes, ...]:
135 """Precompute encoding rules for all 256 byte values.
137 Decision table:
138 | condition | output |
139 |-------------------------------|--------|
140 | byte in safe | as-is |
141 | byte == 32 and quote_plus | "+" |
142 | otherwise | "%HH" |
144 Example mapping:
145 | byte | char | output |
146 |------|------|--------|
147 | 65 | A | b"A" |
148 | 32 | space| b"+" |
149 | 255 | N/A | b"%FF" |
151 Returns:
152 A 256-entry tuple mapping byte value (index) -> encoded bytes.
153 """
154 hex_table = _hex_encode_table()
155 allowed = _safe_table(RFC3986_UNRESERVED + safe) if safe else _safe_table()
156 output: list[bytes] = [b""] * 256
158 for idx, byte in enumerate(range(256)):
159 if allowed[byte]:
160 output[idx] = chr(byte).encode()
161 elif quote_plus and byte == 32: # ord(' ')
162 output[idx] = b"+"
163 else:
164 offset = byte * 3
165 output[idx] = hex_table[offset : offset + 3]
167 return tuple(output)
170def _quote(data: bytes, safe: bytes = b"", quote_plus: bool = False) -> bytes:
171 """Fast URL-style quoting using a precomputed table.
173 Args:
174 data: Input bytes.
175 safe: Additional unescaped bytes.
176 quote_plus: Encode space as '+' if True.
178 Returns:
179 Percent-encoded bytes.
180 """
181 if not data: # pragma: no cover
182 return b""
184 transform_table = _quote_table(safe, quote_plus)
185 return b"".join([transform_table[byte] for byte in data])
188def _quote_into(
189 data: bytes, output: bytearray, safe: bytes = b"", quote_plus: bool = False
190) -> None:
191 if not data: # pragma: no cover
192 return
194 transform_table = _quote_table(safe, quote_plus)
195 output += b"".join([transform_table[byte] for byte in data])
198def _unquote(
199 data: bytes | bytearray | str,
200 safe: bytes = b"",
201) -> bytes:
202 if not data:
203 return b""
205 if isinstance(data, str):
206 data = data.encode()
208 first_percent = data.find(b"%")
210 if first_percent < 0:
211 return bytes(data)
213 hex_decode_table = _hex_decode_table()
214 safe_table = _safe_table(safe)
216 data_length = len(data)
217 # stop at len - 2 because "%HH" decoding reads 2 extra bytes after '%'
218 decode_limit = data_length - 2
220 output = bytearray(data_length)
221 output[:first_percent] = data[:first_percent]
223 input_index = first_percent
224 output_index = first_percent
226 while input_index < decode_limit:
227 current_byte = data[input_index]
229 if current_byte == 37: # ord('%')
230 # Decoding "%HH" sequence
231 # Step 1: read two hex characters after '%'
232 # Example: "%4F" -> '4' and 'F'
233 high_nibble = hex_decode_table[data[input_index + 1]]
234 low_nibble = hex_decode_table[data[input_index + 2]]
236 # Step 2: validate both characters are valid hex digits
237 # hex_decode_table returns 255 for invalid input
238 # bitwise OR catches any invalid nibble quickly
239 if (high_nibble | low_nibble) != 255:
240 # Step 3: combine two 4-bit nibbles into one byte
241 # (high_nibble << 4) + low_nibble
242 # Example: 0x4 and 0xF -> 0x4F
243 decoded_byte = (high_nibble << 4) | low_nibble
245 # Step 4: check if decoded byte is NOT in safe set
246 # (only unsafe bytes are decoded; safe ones are left encoded
247 if not safe_table[decoded_byte]:
248 output[output_index] = decoded_byte
249 input_index += 3 # skip past "%HH" in input
250 output_index += 1 # advance output position by one decoded byte
251 continue
253 output[output_index] = current_byte
254 input_index += 1
255 output_index += 1
257 while input_index < data_length: # tail
258 output[output_index] = data[input_index]
259 input_index += 1
260 output_index += 1
262 return bytes(output[:output_index])
265def _unquote_plus(
266 data: bytes | bytearray | str,
267) -> bytes:
268 # This function is intentionally duplicated from `_unquote` for performance.
269 # The duplication avoids extra branching for '+' handling in hot loop.
270 if not data:
271 return b""
273 if isinstance(data, str): # pragma: no cover
274 data = data.encode()
276 first_percent = data.find(b"%")
277 first_plus = data.find(b"+")
279 first_special = min(first_plus, first_percent)
281 if first_special < 0:
282 first_special = max(first_percent, first_plus)
284 if first_special < 0:
285 return bytes(data)
287 hex_decode_table = _hex_decode_table()
288 safe_table = _safe_table()
290 data_length = len(data)
291 decode_limit = data_length - 2
293 output = bytearray(data_length)
294 output[:first_special] = data[:first_special]
296 input_index = first_special
297 output_index = first_special
299 while input_index < decode_limit:
300 current_byte = data[input_index]
302 if current_byte == 43: # ord('+')
303 output[output_index] = 32 # ord(' ')
304 input_index += 1
305 output_index += 1
306 continue
308 if current_byte == 37: # ord('%')
309 high_nibble = hex_decode_table[data[input_index + 1]]
310 low_nibble = hex_decode_table[data[input_index + 2]]
312 if (high_nibble | low_nibble) != 255:
313 decoded_byte = (high_nibble << 4) | low_nibble
315 if not safe_table[decoded_byte]:
316 output[output_index] = decoded_byte
317 input_index += 3
318 output_index += 1
319 continue
321 output[output_index] = current_byte
322 input_index += 1
323 output_index += 1
325 while input_index < data_length: # tail
326 current_byte = data[input_index]
328 if current_byte == 43: # ord('+')
329 output[output_index] = 32 # ord(' ')
330 else:
331 output[output_index] = current_byte
333 input_index += 1
334 output_index += 1
336 return bytes(output[:output_index])
339def _parse_qs(
340 qs: str | bytes,
341 keep_blank_values: bool = False,
342) -> dict[bytes, list[bytes]]:
343 """Reimplementation of urllib.parse.parse_qs which:
344 - Doesn't use _coerce_args or _coerce_result
345 - Works directly on bytes internally (no type coercion layer)
346 - Returns bytes keys/values only"""
347 if not qs: # pragma: no cover
348 return {}
350 if isinstance(qs, str): # pragma: no cover
351 qs = qs.encode()
353 result: dict[bytes, list[bytes]] = {}
355 for field in qs.split(b"&"):
356 if not field:
357 continue
359 key, sep, value = field.partition(b"=")
361 if not keep_blank_values and (not sep or not value):
362 continue
364 key = _unquote_plus(key)
365 value = _unquote_plus(value)
367 if key in result:
368 result[key].append(value)
369 else:
370 result[key] = [value]
372 return result
375def _parse_qsl(
376 qs: str | bytes,
377 keep_blank_values: bool = False,
378) -> list[tuple[bytes, bytes]]:
379 """Reimplementation of urllib.parse.parse_qsl which:
380 - Doesn't use _coerce_args or _coerce_result
381 - Works directly on bytes internally (no type coercion layer)
382 - Returns only bytes tuples"""
383 # This function is intentionally duplicated from `_parse_qs` for performance.
384 if not qs:
385 return []
387 if isinstance(qs, str):
388 qs = qs.encode()
390 result: list[tuple[bytes, bytes]] = []
392 for field in qs.split(b"&"):
393 if not field:
394 continue
396 key, sep, value = field.partition(b"=")
398 if not keep_blank_values and (not sep or not value):
399 continue
401 result.append((_unquote_plus(key), _unquote_plus(value)))
403 return result
406def _urlencode(query: _QueryType) -> bytes:
407 if hasattr(query, "items"): # pragma: no cover
408 query = query.items() # type: ignore[assignment]
410 if not query: # pragma: no cover
411 return b""
413 result: list[bytes] = []
414 tmp_buf = bytearray()
416 for key, value in query: # type: ignore[str-unpack]
417 _quote_into(
418 key if isinstance(key, bytes) else str(key).encode(),
419 output=tmp_buf,
420 quote_plus=True,
421 )
422 tmp_buf.append(61) # ord('=')
423 _quote_into(
424 value if isinstance(value, bytes) else str(value).encode(),
425 output=tmp_buf,
426 quote_plus=True,
427 )
428 result.append(bytes(tmp_buf))
429 tmp_buf.clear()
431 return b"&".join(result)
434def _urlparse(
435 url: str,
436 scheme: str = "",
437 allow_fragments: bool = True,
438) -> ParseResult:
439 """Reimplementation of urlib.parse.urlparse but without _coerce_args/_coerce_result."""
440 if not url: # pragma: no cover
441 return ParseResult(scheme, "", "", "", "", "")
443 scheme, netloc, url, query, fragment = _urlsplit(url, scheme, allow_fragments)
444 params = ""
446 if scheme in _USES_PARAMS:
447 semi_idx = url.find(";")
449 if semi_idx != -1:
450 slash_idx = url.rfind("/")
452 if slash_idx != -1 and slash_idx < semi_idx:
453 semi_idx = url.find(";", slash_idx)
455 url, params = url[:semi_idx], url[semi_idx + 1 :]
457 return ParseResult(scheme, netloc, url, params, query, fragment)
460def _urlunparse(
461 scheme: str,
462 netloc: str,
463 url: str,
464 params: str,
465 query: str,
466 fragment: str,
467) -> str:
468 """Reimplementation of urlib.parse.urlunparse but without _coerce_args/_coerce_result."""
469 if params:
470 url = f"{url};{params}"
471 return _urlunsplit(scheme, netloc, url, query, fragment)
474def _urlunsplit(scheme: str, netloc: str, url: str, query: str, fragment: str) -> str:
475 """Reimplementation of urlib.parse.urlunsplit but without _coerce_args/_coerce_result."""
477 if netloc:
478 if url and url[:1] != "/":
479 url = f"/{url}"
480 url = f"//{netloc}{url}"
481 elif url[:2] == "//" or (
482 scheme and scheme in _USES_NETLOC and (not url or url[:1] == "/")
483 ):
484 url = f"//{url}"
486 if scheme:
487 scheme = f"{scheme}:"
489 if query:
490 query = f"?{query}"
492 if fragment:
493 fragment = f"#{fragment}"
495 return f"{scheme}{url}{query}{fragment}"
498@dataclasses.dataclass(slots=True, eq=False, repr=False)
499class _SplitResult: # pylint: disable=too-many-instance-attributes
500 scheme: str
501 netloc: str
502 path: str
503 query: str
504 fragment: str
506 username: str | None = None
507 password: str | None = None
508 hostname: str | None = None
509 port: str | int | None = None
511 def __post_init__(self) -> None:
512 if self.hostname is not None:
513 hostname, delim, zone = self.hostname.partition("%")
514 self.hostname = f"{hostname.lower()}{delim}{zone}"
516 if self.port is not None:
517 try:
518 self.port = int(self.port)
519 except ValueError:
520 raise ValueError(
521 f"Port could not be cast to integer value as {self.port}"
522 ) from None
524 if self.port not in range(65535 + 1):
525 raise ValueError("Port out of range 0-65535")
527 def __iter__(self) -> Generator[str]:
528 yield self.scheme
529 yield self.netloc
530 yield self.path
531 yield self.query
532 yield self.fragment
534 def __len__(self) -> int:
535 return 5 # pragma: no cover
537 def __getitem__(self, index: int) -> str: # pragma: no cover
538 match index:
539 case 0:
540 return self.scheme
541 case 1:
542 return self.netloc
543 case 2:
544 return self.path
545 case 3:
546 return self.query
547 case 4:
548 return self.fragment
549 raise IndexError
552def _checknetloc(netloc: str) -> None:
553 """
554 Validate that NFKC normalization does not introduce reserved URL characters.
556 Raises:
557 ValueError: If normalization introduces reserved delimiters.
558 """
559 if not netloc or netloc.isascii():
560 return
562 # IDNA uses NFKC equivalence. Remove already-valid delimiters before
563 # normalization so we only detect newly introduced ones.
564 cleaned, normalized = _nfkc_netloc(netloc)
566 if cleaned == normalized:
567 return
569 if _NETLOC_DELIMS_RE.search(normalized):
570 raise ValueError(
571 f"netloc {netloc!r} contains invalid characters under NFKC normalization"
572 )
575def _check_bracketed_netloc(netloc: str) -> None:
576 """
577 Validate bracket usage in a URL netloc.
579 Raises:
580 ValueError: If bracket placement or host syntax is invalid.
582 NOTE: this is basically a backport of https://github.com/python/cpython/issues/105704
583 """
584 hostname_and_port = netloc.rpartition("@")[2]
586 before_bracket, has_open_bracket, bracketed = hostname_and_port.partition("[")
588 if has_open_bracket:
589 # No data is allowed before '['.
590 if before_bracket:
591 raise ValueError("Invalid IPv6 URL")
593 hostname, _, port = bracketed.partition("]")
595 # Only ':<port>' may follow ']'.
596 if port and not port.startswith(":"):
597 raise ValueError("Invalid IPv6 URL")
598 # port validation done after, in `_SplitResult.__post_init__`
599 else:
600 hostname, _, _ = hostname_and_port.partition(":")
602 _check_bracketed_host(hostname)
605def _check_bracketed_host(hostname: str) -> None:
606 """
607 Validate a bracketed host according to RFC 3986 / WHATWG URL rules.
609 Raises:
610 ValueError: If the host is invalid.
611 """
612 # IPvFuture: v<HEXDIG>.<address>
613 if hostname.startswith(("v", "V")):
614 if not _IPV_FUTURE_RE.fullmatch(hostname):
615 raise ValueError("IPvFuture address is invalid")
616 return
618 # ip_address() raises ValueError if invalid.
619 ip = ipaddress.ip_address(hostname)
621 # Bracketed IPv4 literals are forbidden.
622 if isinstance(ip, ipaddress.IPv4Address):
623 raise ValueError("An IPv4 address cannot be in brackets")
626@functools.lru_cache
627def _urlsplit( # pylint: disable=too-many-locals,too-many-statements
628 url: str,
629 scheme: str = "",
630 allow_fragments: bool = True,
631) -> _SplitResult:
632 """Reimplementation of urllib.parse.urlsplit which:
633 - Doesn't use _coerce_args or _coerce_result
634 - Does manual single-pass scanning instead of repeated .find/.split calls
635 - Have reduced string allocations by slicing once using computed indices
636 - Avoids extra computations as much as possible
637 """
638 if not url:
639 return _SplitResult(scheme, "", "", "", "")
641 url, scheme = url.lstrip(_C0_CONTROL_OR_SPACE), scheme.strip(_C0_CONTROL_OR_SPACE)
643 netloc = query = fragment = ""
645 if m := _SCHEME_RE.match(url):
646 scheme = m.group(1).lower()
647 url = url[m.end() :]
649 slash_pos = question_pos = hash_pos = open_br_pos = closing_br_pos = -1
650 for idx, char in enumerate(url[2:], 2):
651 if char == "/" and slash_pos == -1:
652 slash_pos = idx
653 elif char == "?" and question_pos == -1:
654 question_pos = idx
655 elif char == "#" and hash_pos == -1:
656 hash_pos = idx
657 elif char == "[" and open_br_pos == -1:
658 open_br_pos = idx
659 elif char == "]" and closing_br_pos == -1:
660 closing_br_pos = idx
661 if slash_pos != question_pos != hash_pos != open_br_pos != closing_br_pos != -1:
662 break
664 if url[:2] == "//":
665 if (open_br_pos != -1) != (closing_br_pos != -1):
666 raise ValueError("Invalid IPv6 URL")
667 delim = len(url)
669 if 0 < slash_pos < delim:
670 delim = slash_pos
671 if 0 < question_pos < delim:
672 delim = question_pos
673 if 0 < hash_pos < delim:
674 delim = hash_pos
676 netloc = url[2:delim]
677 if open_br_pos != -1 and closing_br_pos != -1:
678 _check_bracketed_netloc(netloc)
680 url = url[delim:]
682 if question_pos != -1:
683 question_pos -= delim
684 if hash_pos != -1:
685 hash_pos -= delim
686 _checknetloc(netloc)
688 if allow_fragments and hash_pos != -1:
689 url, fragment = url[:hash_pos], url[hash_pos + 1 :]
691 if question_pos != -1:
692 url, query = url[:question_pos], url[question_pos + 1 :]
694 username = password = hostname = port = None
695 userinfo, have_info, hostinfo = netloc.rpartition("@")
697 if have_info:
698 username, _, password = userinfo.partition(":")
699 password = password if _ else None
701 if open_br_pos != -1:
702 hostname, _, port = hostinfo.partition("[")[2].partition("]")
703 port = port.partition(":")[2]
704 else:
705 hostname, _, port = hostinfo.partition(":")
707 return _SplitResult(
708 scheme,
709 netloc,
710 url,
711 query,
712 fragment,
713 username,
714 password,
715 hostname,
716 port or None,
717 )
720def _url2pathname(url: str) -> str:
721 """Reimplementation of urllib.request.url2pathname but with faster _unquote"""
722 if not url:
723 return ""
725 # These branches are handled by `_urlparse`
726 if url[:3] == "///": # pragma: no cover
727 url = url[2:]
728 elif url[12:] == "//localhost/": # pragma: no cover
729 url = url[11:]
731 if not _IS_WINDOWS:
732 if "%" not in url:
733 return url
735 return _unquote(url, _PATH_SAFE_CHARS).decode(_FS_ENCODING, _FS_ERRORS)
737 if url[:3] == "///":
738 url = url[1:]
739 url = url.replace(":", "|")
740 if "|" not in url:
741 return _unquote(url.replace("/", "\\").encode(), _PATH_SAFE_CHARS).decode(
742 _FS_ENCODING, _FS_ERRORS
743 )
744 comp = url.split("|")
745 if len(comp) != 2 or comp[0][-1] not in string.ascii_letters:
746 raise OSError(f"Bad URL: {url}")
747 drive = comp[0][-1].upper()
748 tail = _unquote(comp[1].replace("/", "\\"), _PATH_SAFE_CHARS).decode(
749 _FS_ENCODING, _FS_ERRORS
750 )
751 return f"{drive}:{tail}"
754@functools.lru_cache
755def _idna(input_string: str) -> tuple[bytes, str]:
756 """Cached IDNA encoding using Python's built-in 'idna' codec.
758 NOTE: IDNA processing in CPython is implemented in pure Python (not C),
759 which makes it relatively slow and allocation-heavy. The only
760 lower-level optimisation involved is Unicode normalization
761 (NFKC), which may use optimized internal paths, but IDNA itself
762 remains Python-level logic.
763 """
764 if input_string.isascii():
765 return input_string.encode(), input_string
767 _, normalized = _nfkc_netloc(input_string)
769 encoded = normalized.encode("idna")
770 return encoded, encoded.decode()
773def _idna_bytes(input_string: str) -> bytes:
774 return _idna(input_string)[0]
777def _idna_str(input_string: str) -> str:
778 return _idna(input_string)[1]
781@functools.lru_cache
782def _nfkc_netloc(netloc: str) -> tuple[str, str]:
783 cleaned = netloc.translate(_NETLOC_STRIP_CHARS)
784 normalized = unicodedata.normalize("NFKC", cleaned)
785 return cleaned, normalized