Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/w3lib/_url.py: 67%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

374 statements  

1from __future__ import annotations 

2 

3import dataclasses 

4import functools 

5import ipaddress 

6import os 

7import re 

8import string 

9import sys 

10import unicodedata 

11from typing import TYPE_CHECKING 

12from urllib.parse import ParseResult, scheme_chars, uses_netloc, uses_params 

13 

14from w3lib._infra import _ASCII_TAB_OR_NEWLINE, _C0_CONTROL_OR_SPACE 

15 

16if TYPE_CHECKING: 

17 from collections.abc import Generator 

18 from urllib.parse import _QueryType 

19 

20_IS_WINDOWS = os.name == "nt" 

21 

22 

23_FS_ENCODING = sys.getfilesystemencoding() 

24_FS_ERRORS = sys.getfilesystemencodeerrors() 

25 

26# https://url.spec.whatwg.org/ 

27# https://url.spec.whatwg.org/commit-snapshots/a46cb9188a48c2c9d80ba32a9b1891652d6b4900/#default-port 

28_DEFAULT_PORTS = { 

29 "ftp": 21, 

30 "file": None, 

31 "http": 80, 

32 "https": 443, 

33 "ws": 80, 

34 "wss": 443, 

35} 

36_SPECIAL_SCHEMES = set(_DEFAULT_PORTS.keys()) 

37 

38# constants from RFC 3986, Section 2.2 and 2.3 

39RFC3986_GEN_DELIMS = b":/?#[]@" 

40RFC3986_SUB_DELIMS = b"!$&'()*+,;=" 

41RFC3986_RESERVED = RFC3986_GEN_DELIMS + RFC3986_SUB_DELIMS 

42RFC3986_UNRESERVED = (string.ascii_letters + string.digits + "-._~").encode("ascii") 

43EXTRA_SAFE_CHARS = b"|" # see https://github.com/scrapy/w3lib/pull/25 

44 

45RFC3986_USERINFO_SAFE_CHARS = RFC3986_UNRESERVED + RFC3986_SUB_DELIMS + b":" 

46_SAFE_CHARS = RFC3986_RESERVED + RFC3986_UNRESERVED + EXTRA_SAFE_CHARS + b"%" 

47_PATH_SAFE_CHARS = _SAFE_CHARS.replace(b"#", b"") 

48_PATH_SAFE_CHARS_STR = _PATH_SAFE_CHARS.decode() 

49_USES_NETLOC = frozenset(uses_netloc) 

50_SCHEME_CHARS = frozenset(scheme_chars) 

51_USES_PARAMS = frozenset(uses_params) 

52_ASCII_TAB_OR_NEWLINE_TRANSLATION_TABLE = str.maketrans("", "", _ASCII_TAB_OR_NEWLINE) 

53_C0_CONTROL_OR_SPACE_RE = re.compile(rf"[{_C0_CONTROL_OR_SPACE}]") 

54_SCHEME_RE = re.compile(rf"^([{scheme_chars}]*):") 

55 

56_IPV_FUTURE_RE = re.compile(r"\Av[a-fA-F0-9]+\..+\Z") 

57_NETLOC_DELIMS_RE = re.compile(r"[/?#@:]") 

58_NETLOC_STRIP_CHARS = str.maketrans("", "", "@:#?") 

59 

60 

61def _strip(input_string: str) -> str: 

62 if not input_string: 

63 return input_string 

64 

65 if not _C0_CONTROL_OR_SPACE_RE.search(input_string): 

66 return input_string 

67 

68 return input_string.strip(_C0_CONTROL_OR_SPACE).translate( 

69 _ASCII_TAB_OR_NEWLINE_TRANSLATION_TABLE 

70 ) 

71 

72 

73@functools.cache 

74def _hex_encode_table() -> bytes: 

75 """Build a lookup table for percent-encoded byte values. 

76 

77 | byte | encoding | 

78 |------|----------| 

79 | 0 | %00 | 

80 | 1 | %01 | 

81 | ... | ... | 

82 | 255 | %FF | 

83 

84 Each entry is exactly 3 bytes: b"%HH". 

85 

86 Returns: 

87 A bytes object of length 256 * 3 containing all percent encodings. 

88 """ 

89 return b"".join(f"%{i:02X}".encode() for i in range(256)) 

90 

91 

92@functools.cache 

93def _hex_decode_table() -> bytes: 

94 """Build a lookup table for decoding hex ASCII characters. 

95 

96 | ASCII | value | 

97 |--------|--------------| 

98 | '0'-'9'| 0-9 | 

99 | 'A'-'F'| 10-15 | 

100 | 'a'-'f'| 10-15 | 

101 | other | 255 (invalid)| 

102 

103 Returns: 

104 A bytes object of length 256 containing nibble values. 

105 """ 

106 table = bytearray([255]) * 256 

107 table[48:58] = bytes(range(10)) # '0'-'9' 

108 table[65:71] = bytes(range(10, 16)) # 'A'-'F' 

109 table[97:103] = bytes(range(10, 16)) # 'a'-'f' 

110 return bytes(table) 

111 

112 

113@functools.cache 

114def _safe_table(safe: bytes = RFC3986_UNRESERVED) -> bytes: 

115 """Build a lookup table marking safe (non-encoded) bytes. 

116 

117 | byte | is allowed? | 

118 |------|-------------| 

119 | 0 | 0 | 

120 | 32 | 1 (if safe) | 

121 | 65 | 1 | 

122 | 255 | 0 | 

123 

124 Returns: 

125 A bytes object of length 256 acting as a boolean mask (0/1). 

126 """ 

127 table = bytearray(256) 

128 for b in safe: 

129 table[b] = 1 

130 return bytes(table) 

131 

132 

133@functools.cache 

134def _quote_table(safe: bytes = b"", quote_plus: bool = False) -> tuple[bytes, ...]: 

135 """Precompute encoding rules for all 256 byte values. 

136 

137 Decision table: 

138 | condition | output | 

139 |-------------------------------|--------| 

140 | byte in safe | as-is | 

141 | byte == 32 and quote_plus | "+" | 

142 | otherwise | "%HH" | 

143 

144 Example mapping: 

145 | byte | char | output | 

146 |------|------|--------| 

147 | 65 | A | b"A" | 

148 | 32 | space| b"+" | 

149 | 255 | N/A | b"%FF" | 

150 

151 Returns: 

152 A 256-entry tuple mapping byte value (index) -> encoded bytes. 

153 """ 

154 hex_table = _hex_encode_table() 

155 allowed = _safe_table(RFC3986_UNRESERVED + safe) if safe else _safe_table() 

156 output: list[bytes] = [b""] * 256 

157 

158 for idx, byte in enumerate(range(256)): 

159 if allowed[byte]: 

160 output[idx] = chr(byte).encode() 

161 elif quote_plus and byte == 32: # ord(' ') 

162 output[idx] = b"+" 

163 else: 

164 offset = byte * 3 

165 output[idx] = hex_table[offset : offset + 3] 

166 

167 return tuple(output) 

168 

169 

170def _quote(data: bytes, safe: bytes = b"", quote_plus: bool = False) -> bytes: 

171 """Fast URL-style quoting using a precomputed table. 

172 

173 Args: 

174 data: Input bytes. 

175 safe: Additional unescaped bytes. 

176 quote_plus: Encode space as '+' if True. 

177 

178 Returns: 

179 Percent-encoded bytes. 

180 """ 

181 if not data: # pragma: no cover 

182 return b"" 

183 

184 transform_table = _quote_table(safe, quote_plus) 

185 return b"".join([transform_table[byte] for byte in data]) 

186 

187 

188def _quote_into( 

189 data: bytes, output: bytearray, safe: bytes = b"", quote_plus: bool = False 

190) -> None: 

191 if not data: # pragma: no cover 

192 return 

193 

194 transform_table = _quote_table(safe, quote_plus) 

195 output += b"".join([transform_table[byte] for byte in data]) 

196 

197 

198def _unquote( 

199 data: bytes | bytearray | str, 

200 safe: bytes = b"", 

201) -> bytes: 

202 if not data: 

203 return b"" 

204 

205 if isinstance(data, str): 

206 data = data.encode() 

207 

208 first_percent = data.find(b"%") 

209 

210 if first_percent < 0: 

211 return bytes(data) 

212 

213 hex_decode_table = _hex_decode_table() 

214 safe_table = _safe_table(safe) 

215 

216 data_length = len(data) 

217 # stop at len - 2 because "%HH" decoding reads 2 extra bytes after '%' 

218 decode_limit = data_length - 2 

219 

220 output = bytearray(data_length) 

221 output[:first_percent] = data[:first_percent] 

222 

223 input_index = first_percent 

224 output_index = first_percent 

225 

226 while input_index < decode_limit: 

227 current_byte = data[input_index] 

228 

229 if current_byte == 37: # ord('%') 

230 # Decoding "%HH" sequence 

231 # Step 1: read two hex characters after '%' 

232 # Example: "%4F" -> '4' and 'F' 

233 high_nibble = hex_decode_table[data[input_index + 1]] 

234 low_nibble = hex_decode_table[data[input_index + 2]] 

235 

236 # Step 2: validate both characters are valid hex digits 

237 # hex_decode_table returns 255 for invalid input 

238 # bitwise OR catches any invalid nibble quickly 

239 if (high_nibble | low_nibble) != 255: 

240 # Step 3: combine two 4-bit nibbles into one byte 

241 # (high_nibble << 4) + low_nibble 

242 # Example: 0x4 and 0xF -> 0x4F 

243 decoded_byte = (high_nibble << 4) | low_nibble 

244 

245 # Step 4: check if decoded byte is NOT in safe set 

246 # (only unsafe bytes are decoded; safe ones are left encoded 

247 if not safe_table[decoded_byte]: 

248 output[output_index] = decoded_byte 

249 input_index += 3 # skip past "%HH" in input 

250 output_index += 1 # advance output position by one decoded byte 

251 continue 

252 

253 output[output_index] = current_byte 

254 input_index += 1 

255 output_index += 1 

256 

257 while input_index < data_length: # tail 

258 output[output_index] = data[input_index] 

259 input_index += 1 

260 output_index += 1 

261 

262 return bytes(output[:output_index]) 

263 

264 

265def _unquote_plus( 

266 data: bytes | bytearray | str, 

267) -> bytes: 

268 # This function is intentionally duplicated from `_unquote` for performance. 

269 # The duplication avoids extra branching for '+' handling in hot loop. 

270 if not data: 

271 return b"" 

272 

273 if isinstance(data, str): # pragma: no cover 

274 data = data.encode() 

275 

276 first_percent = data.find(b"%") 

277 first_plus = data.find(b"+") 

278 

279 first_special = min(first_plus, first_percent) 

280 

281 if first_special < 0: 

282 first_special = max(first_percent, first_plus) 

283 

284 if first_special < 0: 

285 return bytes(data) 

286 

287 hex_decode_table = _hex_decode_table() 

288 safe_table = _safe_table() 

289 

290 data_length = len(data) 

291 decode_limit = data_length - 2 

292 

293 output = bytearray(data_length) 

294 output[:first_special] = data[:first_special] 

295 

296 input_index = first_special 

297 output_index = first_special 

298 

299 while input_index < decode_limit: 

300 current_byte = data[input_index] 

301 

302 if current_byte == 43: # ord('+') 

303 output[output_index] = 32 # ord(' ') 

304 input_index += 1 

305 output_index += 1 

306 continue 

307 

308 if current_byte == 37: # ord('%') 

309 high_nibble = hex_decode_table[data[input_index + 1]] 

310 low_nibble = hex_decode_table[data[input_index + 2]] 

311 

312 if (high_nibble | low_nibble) != 255: 

313 decoded_byte = (high_nibble << 4) | low_nibble 

314 

315 if not safe_table[decoded_byte]: 

316 output[output_index] = decoded_byte 

317 input_index += 3 

318 output_index += 1 

319 continue 

320 

321 output[output_index] = current_byte 

322 input_index += 1 

323 output_index += 1 

324 

325 while input_index < data_length: # tail 

326 current_byte = data[input_index] 

327 

328 if current_byte == 43: # ord('+') 

329 output[output_index] = 32 # ord(' ') 

330 else: 

331 output[output_index] = current_byte 

332 

333 input_index += 1 

334 output_index += 1 

335 

336 return bytes(output[:output_index]) 

337 

338 

339def _parse_qs( 

340 qs: str | bytes, 

341 keep_blank_values: bool = False, 

342) -> dict[bytes, list[bytes]]: 

343 """Reimplementation of urllib.parse.parse_qs which: 

344 - Doesn't use _coerce_args or _coerce_result 

345 - Works directly on bytes internally (no type coercion layer) 

346 - Returns bytes keys/values only""" 

347 if not qs: # pragma: no cover 

348 return {} 

349 

350 if isinstance(qs, str): # pragma: no cover 

351 qs = qs.encode() 

352 

353 result: dict[bytes, list[bytes]] = {} 

354 

355 for field in qs.split(b"&"): 

356 if not field: 

357 continue 

358 

359 key, sep, value = field.partition(b"=") 

360 

361 if not keep_blank_values and (not sep or not value): 

362 continue 

363 

364 key = _unquote_plus(key) 

365 value = _unquote_plus(value) 

366 

367 if key in result: 

368 result[key].append(value) 

369 else: 

370 result[key] = [value] 

371 

372 return result 

373 

374 

375def _parse_qsl( 

376 qs: str | bytes, 

377 keep_blank_values: bool = False, 

378) -> list[tuple[bytes, bytes]]: 

379 """Reimplementation of urllib.parse.parse_qsl which: 

380 - Doesn't use _coerce_args or _coerce_result 

381 - Works directly on bytes internally (no type coercion layer) 

382 - Returns only bytes tuples""" 

383 # This function is intentionally duplicated from `_parse_qs` for performance. 

384 if not qs: 

385 return [] 

386 

387 if isinstance(qs, str): 

388 qs = qs.encode() 

389 

390 result: list[tuple[bytes, bytes]] = [] 

391 

392 for field in qs.split(b"&"): 

393 if not field: 

394 continue 

395 

396 key, sep, value = field.partition(b"=") 

397 

398 if not keep_blank_values and (not sep or not value): 

399 continue 

400 

401 result.append((_unquote_plus(key), _unquote_plus(value))) 

402 

403 return result 

404 

405 

406def _urlencode(query: _QueryType) -> bytes: 

407 if hasattr(query, "items"): # pragma: no cover 

408 query = query.items() # type: ignore[assignment] 

409 

410 if not query: # pragma: no cover 

411 return b"" 

412 

413 result: list[bytes] = [] 

414 tmp_buf = bytearray() 

415 

416 for key, value in query: # type: ignore[str-unpack] 

417 _quote_into( 

418 key if isinstance(key, bytes) else str(key).encode(), 

419 output=tmp_buf, 

420 quote_plus=True, 

421 ) 

422 tmp_buf.append(61) # ord('=') 

423 _quote_into( 

424 value if isinstance(value, bytes) else str(value).encode(), 

425 output=tmp_buf, 

426 quote_plus=True, 

427 ) 

428 result.append(bytes(tmp_buf)) 

429 tmp_buf.clear() 

430 

431 return b"&".join(result) 

432 

433 

434def _urlparse( 

435 url: str, 

436 scheme: str = "", 

437 allow_fragments: bool = True, 

438) -> ParseResult: 

439 """Reimplementation of urlib.parse.urlparse but without _coerce_args/_coerce_result.""" 

440 if not url: # pragma: no cover 

441 return ParseResult(scheme, "", "", "", "", "") 

442 

443 scheme, netloc, url, query, fragment = _urlsplit(url, scheme, allow_fragments) 

444 params = "" 

445 

446 if scheme in _USES_PARAMS: 

447 semi_idx = url.find(";") 

448 

449 if semi_idx != -1: 

450 slash_idx = url.rfind("/") 

451 

452 if slash_idx != -1 and slash_idx < semi_idx: 

453 semi_idx = url.find(";", slash_idx) 

454 

455 url, params = url[:semi_idx], url[semi_idx + 1 :] 

456 

457 return ParseResult(scheme, netloc, url, params, query, fragment) 

458 

459 

460def _urlunparse( 

461 scheme: str, 

462 netloc: str, 

463 url: str, 

464 params: str, 

465 query: str, 

466 fragment: str, 

467) -> str: 

468 """Reimplementation of urlib.parse.urlunparse but without _coerce_args/_coerce_result.""" 

469 if params: 

470 url = f"{url};{params}" 

471 return _urlunsplit(scheme, netloc, url, query, fragment) 

472 

473 

474def _urlunsplit(scheme: str, netloc: str, url: str, query: str, fragment: str) -> str: 

475 """Reimplementation of urlib.parse.urlunsplit but without _coerce_args/_coerce_result.""" 

476 

477 if netloc: 

478 if url and url[:1] != "/": 

479 url = f"/{url}" 

480 url = f"//{netloc}{url}" 

481 elif url[:2] == "//" or ( 

482 scheme and scheme in _USES_NETLOC and (not url or url[:1] == "/") 

483 ): 

484 url = f"//{url}" 

485 

486 if scheme: 

487 scheme = f"{scheme}:" 

488 

489 if query: 

490 query = f"?{query}" 

491 

492 if fragment: 

493 fragment = f"#{fragment}" 

494 

495 return f"{scheme}{url}{query}{fragment}" 

496 

497 

498@dataclasses.dataclass(slots=True, eq=False, repr=False) 

499class _SplitResult: # pylint: disable=too-many-instance-attributes 

500 scheme: str 

501 netloc: str 

502 path: str 

503 query: str 

504 fragment: str 

505 

506 username: str | None = None 

507 password: str | None = None 

508 hostname: str | None = None 

509 port: str | int | None = None 

510 

511 def __post_init__(self) -> None: 

512 if self.hostname is not None: 

513 hostname, delim, zone = self.hostname.partition("%") 

514 self.hostname = f"{hostname.lower()}{delim}{zone}" 

515 

516 if self.port is not None: 

517 try: 

518 self.port = int(self.port) 

519 except ValueError: 

520 raise ValueError( 

521 f"Port could not be cast to integer value as {self.port}" 

522 ) from None 

523 

524 if self.port not in range(65535 + 1): 

525 raise ValueError("Port out of range 0-65535") 

526 

527 def __iter__(self) -> Generator[str]: 

528 yield self.scheme 

529 yield self.netloc 

530 yield self.path 

531 yield self.query 

532 yield self.fragment 

533 

534 def __len__(self) -> int: 

535 return 5 # pragma: no cover 

536 

537 def __getitem__(self, index: int) -> str: # pragma: no cover 

538 match index: 

539 case 0: 

540 return self.scheme 

541 case 1: 

542 return self.netloc 

543 case 2: 

544 return self.path 

545 case 3: 

546 return self.query 

547 case 4: 

548 return self.fragment 

549 raise IndexError 

550 

551 

552def _checknetloc(netloc: str) -> None: 

553 """ 

554 Validate that NFKC normalization does not introduce reserved URL characters. 

555 

556 Raises: 

557 ValueError: If normalization introduces reserved delimiters. 

558 """ 

559 if not netloc or netloc.isascii(): 

560 return 

561 

562 # IDNA uses NFKC equivalence. Remove already-valid delimiters before 

563 # normalization so we only detect newly introduced ones. 

564 cleaned, normalized = _nfkc_netloc(netloc) 

565 

566 if cleaned == normalized: 

567 return 

568 

569 if _NETLOC_DELIMS_RE.search(normalized): 

570 raise ValueError( 

571 f"netloc {netloc!r} contains invalid characters under NFKC normalization" 

572 ) 

573 

574 

575def _check_bracketed_netloc(netloc: str) -> None: 

576 """ 

577 Validate bracket usage in a URL netloc. 

578 

579 Raises: 

580 ValueError: If bracket placement or host syntax is invalid. 

581 

582 NOTE: this is basically a backport of https://github.com/python/cpython/issues/105704 

583 """ 

584 hostname_and_port = netloc.rpartition("@")[2] 

585 

586 before_bracket, has_open_bracket, bracketed = hostname_and_port.partition("[") 

587 

588 if has_open_bracket: 

589 # No data is allowed before '['. 

590 if before_bracket: 

591 raise ValueError("Invalid IPv6 URL") 

592 

593 hostname, _, port = bracketed.partition("]") 

594 

595 # Only ':<port>' may follow ']'. 

596 if port and not port.startswith(":"): 

597 raise ValueError("Invalid IPv6 URL") 

598 # port validation done after, in `_SplitResult.__post_init__` 

599 else: 

600 hostname, _, _ = hostname_and_port.partition(":") 

601 

602 _check_bracketed_host(hostname) 

603 

604 

605def _check_bracketed_host(hostname: str) -> None: 

606 """ 

607 Validate a bracketed host according to RFC 3986 / WHATWG URL rules. 

608 

609 Raises: 

610 ValueError: If the host is invalid. 

611 """ 

612 # IPvFuture: v<HEXDIG>.<address> 

613 if hostname.startswith(("v", "V")): 

614 if not _IPV_FUTURE_RE.fullmatch(hostname): 

615 raise ValueError("IPvFuture address is invalid") 

616 return 

617 

618 # ip_address() raises ValueError if invalid. 

619 ip = ipaddress.ip_address(hostname) 

620 

621 # Bracketed IPv4 literals are forbidden. 

622 if isinstance(ip, ipaddress.IPv4Address): 

623 raise ValueError("An IPv4 address cannot be in brackets") 

624 

625 

626@functools.lru_cache 

627def _urlsplit( # pylint: disable=too-many-locals,too-many-statements 

628 url: str, 

629 scheme: str = "", 

630 allow_fragments: bool = True, 

631) -> _SplitResult: 

632 """Reimplementation of urllib.parse.urlsplit which: 

633 - Doesn't use _coerce_args or _coerce_result 

634 - Does manual single-pass scanning instead of repeated .find/.split calls 

635 - Have reduced string allocations by slicing once using computed indices 

636 - Avoids extra computations as much as possible 

637 """ 

638 if not url: 

639 return _SplitResult(scheme, "", "", "", "") 

640 

641 url, scheme = url.lstrip(_C0_CONTROL_OR_SPACE), scheme.strip(_C0_CONTROL_OR_SPACE) 

642 

643 netloc = query = fragment = "" 

644 

645 if m := _SCHEME_RE.match(url): 

646 scheme = m.group(1).lower() 

647 url = url[m.end() :] 

648 

649 slash_pos = question_pos = hash_pos = open_br_pos = closing_br_pos = -1 

650 for idx, char in enumerate(url[2:], 2): 

651 if char == "/" and slash_pos == -1: 

652 slash_pos = idx 

653 elif char == "?" and question_pos == -1: 

654 question_pos = idx 

655 elif char == "#" and hash_pos == -1: 

656 hash_pos = idx 

657 elif char == "[" and open_br_pos == -1: 

658 open_br_pos = idx 

659 elif char == "]" and closing_br_pos == -1: 

660 closing_br_pos = idx 

661 if slash_pos != question_pos != hash_pos != open_br_pos != closing_br_pos != -1: 

662 break 

663 

664 if url[:2] == "//": 

665 if (open_br_pos != -1) != (closing_br_pos != -1): 

666 raise ValueError("Invalid IPv6 URL") 

667 delim = len(url) 

668 

669 if 0 < slash_pos < delim: 

670 delim = slash_pos 

671 if 0 < question_pos < delim: 

672 delim = question_pos 

673 if 0 < hash_pos < delim: 

674 delim = hash_pos 

675 

676 netloc = url[2:delim] 

677 if open_br_pos != -1 and closing_br_pos != -1: 

678 _check_bracketed_netloc(netloc) 

679 

680 url = url[delim:] 

681 

682 if question_pos != -1: 

683 question_pos -= delim 

684 if hash_pos != -1: 

685 hash_pos -= delim 

686 _checknetloc(netloc) 

687 

688 if allow_fragments and hash_pos != -1: 

689 url, fragment = url[:hash_pos], url[hash_pos + 1 :] 

690 

691 if question_pos != -1: 

692 url, query = url[:question_pos], url[question_pos + 1 :] 

693 

694 username = password = hostname = port = None 

695 userinfo, have_info, hostinfo = netloc.rpartition("@") 

696 

697 if have_info: 

698 username, _, password = userinfo.partition(":") 

699 password = password if _ else None 

700 

701 if open_br_pos != -1: 

702 hostname, _, port = hostinfo.partition("[")[2].partition("]") 

703 port = port.partition(":")[2] 

704 else: 

705 hostname, _, port = hostinfo.partition(":") 

706 

707 return _SplitResult( 

708 scheme, 

709 netloc, 

710 url, 

711 query, 

712 fragment, 

713 username, 

714 password, 

715 hostname, 

716 port or None, 

717 ) 

718 

719 

720def _url2pathname(url: str) -> str: 

721 """Reimplementation of urllib.request.url2pathname but with faster _unquote""" 

722 if not url: 

723 return "" 

724 

725 # These branches are handled by `_urlparse` 

726 if url[:3] == "///": # pragma: no cover 

727 url = url[2:] 

728 elif url[12:] == "//localhost/": # pragma: no cover 

729 url = url[11:] 

730 

731 if not _IS_WINDOWS: 

732 if "%" not in url: 

733 return url 

734 

735 return _unquote(url, _PATH_SAFE_CHARS).decode(_FS_ENCODING, _FS_ERRORS) 

736 

737 if url[:3] == "///": 

738 url = url[1:] 

739 url = url.replace(":", "|") 

740 if "|" not in url: 

741 return _unquote(url.replace("/", "\\").encode(), _PATH_SAFE_CHARS).decode( 

742 _FS_ENCODING, _FS_ERRORS 

743 ) 

744 comp = url.split("|") 

745 if len(comp) != 2 or comp[0][-1] not in string.ascii_letters: 

746 raise OSError(f"Bad URL: {url}") 

747 drive = comp[0][-1].upper() 

748 tail = _unquote(comp[1].replace("/", "\\"), _PATH_SAFE_CHARS).decode( 

749 _FS_ENCODING, _FS_ERRORS 

750 ) 

751 return f"{drive}:{tail}" 

752 

753 

754@functools.lru_cache 

755def _idna(input_string: str) -> tuple[bytes, str]: 

756 """Cached IDNA encoding using Python's built-in 'idna' codec. 

757 

758 NOTE: IDNA processing in CPython is implemented in pure Python (not C), 

759 which makes it relatively slow and allocation-heavy. The only 

760 lower-level optimisation involved is Unicode normalization 

761 (NFKC), which may use optimized internal paths, but IDNA itself 

762 remains Python-level logic. 

763 """ 

764 if input_string.isascii(): 

765 return input_string.encode(), input_string 

766 

767 _, normalized = _nfkc_netloc(input_string) 

768 

769 encoded = normalized.encode("idna") 

770 return encoded, encoded.decode() 

771 

772 

773def _idna_bytes(input_string: str) -> bytes: 

774 return _idna(input_string)[0] 

775 

776 

777def _idna_str(input_string: str) -> str: 

778 return _idna(input_string)[1] 

779 

780 

781@functools.lru_cache 

782def _nfkc_netloc(netloc: str) -> tuple[str, str]: 

783 cleaned = netloc.translate(_NETLOC_STRIP_CHARS) 

784 normalized = unicodedata.normalize("NFKC", cleaned) 

785 return cleaned, normalized