Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/requests/utils.py: 17%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

509 statements  

1""" 

2requests.utils 

3~~~~~~~~~~~~~~ 

4 

5This module provides utility functions that are used within Requests 

6that are also useful for external consumption. 

7""" 

8 

9from __future__ import annotations 

10 

11import codecs 

12import contextlib 

13import io 

14import os 

15import re 

16import socket 

17import struct 

18import sys 

19import tempfile 

20import warnings 

21import zipfile 

22from collections import OrderedDict 

23from collections.abc import Generator, Iterable 

24from typing import ( 

25 TYPE_CHECKING, 

26 Any, 

27 Final, 

28 TypeVar, 

29 cast, 

30 overload, 

31) 

32 

33from urllib3.util import make_headers, parse_url 

34 

35from . import certs 

36from .__version__ import __version__ 

37 

38# to_native_string is unused here, but imported here for backwards compatibility 

39from ._internal_utils import ( # noqa: F401 

40 _HEADER_VALIDATORS_BYTE, # type: ignore[reportPrivateUsage] 

41 _HEADER_VALIDATORS_STR, # type: ignore[reportPrivateUsage] 

42 HEADER_VALIDATORS, # type: ignore[reportUnusedImport] 

43 to_native_string, # type: ignore[reportUnusedImport] 

44) 

45from ._types import SupportsItems as _SupportsItems 

46from .compat import ( 

47 Mapping, 

48 bytes, 

49 getproxies, 

50 getproxies_environment, 

51 integer_types, 

52 is_urllib3_1, 

53 proxy_bypass, 

54 proxy_bypass_environment, # type: ignore[attr-defined] # https://github.com/python/cpython/issues/145331 

55 quote, 

56 str, 

57 unquote, 

58 urlparse, 

59 urlunparse, 

60) 

61from .compat import parse_http_list as _parse_list_header 

62from .cookies import cookiejar_from_dict 

63from .exceptions import ( 

64 FileModeWarning, 

65 InvalidHeader, 

66 InvalidURL, 

67 UnrewindableBodyError, 

68) 

69from .structures import CaseInsensitiveDict 

70 

71if TYPE_CHECKING: 

72 from http.cookiejar import CookieJar 

73 from io import BufferedWriter 

74 

75 from . import _types as _t 

76 from .models import PreparedRequest, Request, Response 

77 

78NETRC_FILES: Final = (".netrc", "_netrc") 

79 

80 

81# Certificate is extracted by certifi when needed. 

82DEFAULT_CA_BUNDLE_PATH: str = certs.where() 

83 

84 

85DEFAULT_PORTS: Final = {"http": 80, "https": 443} 

86 

87_KT = TypeVar("_KT") 

88_VT = TypeVar("_VT") 

89 

90# Ensure that ', ' is used to preserve previous delimiter behavior. 

91DEFAULT_ACCEPT_ENCODING: Final = ", ".join( 

92 re.split(r",\s*", make_headers(accept_encoding=True)["accept-encoding"]) 

93) 

94 

95 

96if sys.platform == "win32": 

97 # provide a proxy_bypass version on Windows without DNS lookups 

98 

99 def proxy_bypass_registry(host: str) -> bool: 

100 try: 

101 import winreg 

102 except ImportError: 

103 return False 

104 

105 try: 

106 internetSettings = winreg.OpenKey( 

107 winreg.HKEY_CURRENT_USER, 

108 r"Software\Microsoft\Windows\CurrentVersion\Internet Settings", 

109 ) 

110 # ProxyEnable could be REG_SZ or REG_DWORD, normalizing it 

111 proxyEnable = int(winreg.QueryValueEx(internetSettings, "ProxyEnable")[0]) 

112 # ProxyOverride is almost always a string 

113 proxyOverride = winreg.QueryValueEx(internetSettings, "ProxyOverride")[0] 

114 except (OSError, ValueError): 

115 return False 

116 if not proxyEnable or not proxyOverride: 

117 return False 

118 

119 # make a check value list from the registry entry: replace the 

120 # '<local>' string by the localhost entry and the corresponding 

121 # canonical entry. 

122 proxyOverride = proxyOverride.split(";") 

123 # filter out empty strings to avoid re.match return true in the following code. 

124 proxyOverride = filter(None, proxyOverride) 

125 # now check if we match one of the registry values. 

126 for test in proxyOverride: 

127 if test == "<local>": 

128 if "." not in host: 

129 return True 

130 test = test.replace(".", r"\.") # mask dots 

131 test = test.replace("*", r".*") # change glob sequence 

132 test = test.replace("?", r".") # change glob char 

133 if re.match(test, host, re.I): 

134 return True 

135 return False 

136 

137 def proxy_bypass(host: str) -> bool: # noqa 

138 """Return True, if the host should be bypassed. 

139 

140 Checks proxy settings gathered from the environment, if specified, 

141 or the registry. 

142 """ 

143 if getproxies_environment(): 

144 return proxy_bypass_environment(host) 

145 else: 

146 return proxy_bypass_registry(host) 

147 

148 

149def dict_to_sequence( 

150 d: _t.SupportsItems[Any, Any] | Iterable[tuple[Any, Any]], 

151) -> Iterable[tuple[Any, Any]]: 

152 """Returns an internal sequence dictionary update.""" 

153 

154 if isinstance(d, _SupportsItems): 

155 return d.items() 

156 

157 return d 

158 

159 

160def super_len(o: Any) -> int: 

161 total_length = None 

162 current_position = 0 

163 

164 if not is_urllib3_1 and isinstance(o, str): 

165 # urllib3 2.x+ treats all strings as utf-8 instead 

166 # of latin-1 (iso-8859-1) like http.client. 

167 o = o.encode("utf-8") 

168 

169 if hasattr(o, "__len__"): 

170 total_length = len(o) 

171 

172 elif hasattr(o, "len"): 

173 total_length = o.len 

174 

175 elif hasattr(o, "fileno"): 

176 try: 

177 fileno = o.fileno() 

178 except (io.UnsupportedOperation, AttributeError): 

179 # AttributeError is a surprising exception, seeing as how we've just checked 

180 # that `hasattr(o, 'fileno')`. It happens for objects obtained via 

181 # `Tarfile.extractfile()`, per issue 5229. 

182 pass 

183 else: 

184 total_length = os.fstat(fileno).st_size 

185 

186 # Having used fstat to determine the file length, we need to 

187 # confirm that this file was opened up in binary mode. 

188 if "b" not in o.mode: 

189 warnings.warn( 

190 ( 

191 "Requests has determined the content-length for this " 

192 "request using the binary size of the file: however, the " 

193 "file has been opened in text mode (i.e. without the 'b' " 

194 "flag in the mode). This may lead to an incorrect " 

195 "content-length. In Requests 3.0, support will be removed " 

196 "for files in text mode." 

197 ), 

198 FileModeWarning, 

199 ) 

200 

201 if hasattr(o, "tell"): 

202 try: 

203 current_position = o.tell() 

204 except OSError: 

205 # This can happen in some weird situations, such as when the file 

206 # is actually a special file descriptor like stdin. In this 

207 # instance, we don't know what the length is, so set it to zero and 

208 # let requests chunk it instead. 

209 if total_length is not None: 

210 current_position = total_length 

211 else: 

212 if hasattr(o, "seek") and total_length is None: 

213 # StringIO and BytesIO have seek but no usable fileno 

214 try: 

215 # seek to end of file 

216 o.seek(0, 2) 

217 total_length = o.tell() 

218 

219 # seek back to current position to support 

220 # partially read file-like objects 

221 o.seek(current_position or 0) 

222 except OSError: 

223 total_length = 0 

224 

225 if total_length is None: 

226 total_length = 0 

227 

228 return max(0, total_length - current_position) 

229 

230 

231def get_netrc_auth( 

232 url: _t.UriType, raise_errors: bool = False 

233) -> tuple[str, str] | None: 

234 """Returns the Requests tuple auth for a given url from netrc.""" 

235 

236 if isinstance(url, bytes): 

237 url = url.decode("utf-8") 

238 

239 netrc_file = os.environ.get("NETRC") 

240 if netrc_file is not None: 

241 netrc_locations = (netrc_file,) 

242 else: 

243 netrc_locations = (f"~/{f}" for f in NETRC_FILES) 

244 

245 try: 

246 from netrc import NetrcParseError, netrc 

247 

248 netrc_path = None 

249 

250 for f in netrc_locations: 

251 loc = os.path.expanduser(f) 

252 if os.path.exists(loc): 

253 netrc_path = loc 

254 break 

255 

256 # Abort early if there isn't one. 

257 if netrc_path is None: 

258 return 

259 

260 ri = urlparse(url) 

261 host = ri.hostname 

262 

263 if host is None: 

264 return 

265 

266 try: 

267 _netrc = netrc(netrc_path).authenticators(host) 

268 if _netrc and any(_netrc): 

269 # Return with login / password 

270 login_i = 0 if _netrc[0] else 1 

271 return (_netrc[login_i] or "", _netrc[2] or "") 

272 except (NetrcParseError, OSError): 

273 # If there was a parsing error or a permissions issue reading the file, 

274 # we'll just skip netrc auth unless explicitly asked to raise errors. 

275 if raise_errors: 

276 raise 

277 

278 # App Engine hackiness. 

279 except (ImportError, AttributeError): 

280 pass 

281 

282 

283def guess_filename(obj: Any) -> str | None: 

284 """Tries to guess the filename of the given object.""" 

285 name = getattr(obj, "name", None) 

286 if name and isinstance(name, (str, bytes)) and name[0] != "<" and name[-1] != ">": 

287 return os.path.basename(name) # type: ignore[return-value] # urllib3 accepts bytes but types str only 

288 

289 

290def extract_zipped_paths(path: str) -> str: 

291 """Replace nonexistent paths that look like they refer to a member of a zip 

292 archive with the location of an extracted copy of the target, or else 

293 just return the provided path unchanged. 

294 """ 

295 if os.path.exists(path): 

296 # this is already a valid path, no need to do anything further 

297 return path 

298 

299 # find the first valid part of the provided path and treat that as a zip archive 

300 # assume the rest of the path is the name of a member in the archive 

301 archive, member = os.path.split(path) 

302 while archive and not os.path.exists(archive): 

303 archive, prefix = os.path.split(archive) 

304 if not prefix: 

305 # If we don't check for an empty prefix after the split (in other words, archive remains unchanged after the split), 

306 # we _can_ end up in an infinite loop on a rare corner case affecting a small number of users 

307 break 

308 member = "/".join([prefix, member]) 

309 

310 if not zipfile.is_zipfile(archive): 

311 return path 

312 

313 zip_file = zipfile.ZipFile(archive) 

314 if member not in zip_file.namelist(): 

315 return path 

316 

317 # we have a valid zip archive and a valid member of that archive 

318 suffix = os.path.splitext(member.split("/")[-1])[-1] 

319 fd, extracted_path = tempfile.mkstemp(suffix=suffix) 

320 try: 

321 os.write(fd, zip_file.read(member)) 

322 finally: 

323 os.close(fd) 

324 

325 return extracted_path 

326 

327 

328@contextlib.contextmanager 

329def atomic_open(filename: str) -> Generator[BufferedWriter, None, None]: 

330 """Write a file to the disk in an atomic fashion""" 

331 tmp_descriptor, tmp_name = tempfile.mkstemp(dir=os.path.dirname(filename)) 

332 try: 

333 with os.fdopen(tmp_descriptor, "wb") as tmp_handler: 

334 yield tmp_handler 

335 os.replace(tmp_name, filename) 

336 except BaseException: 

337 os.remove(tmp_name) 

338 raise 

339 

340 

341def from_key_val_list( 

342 value: Mapping[Any, Any] | Iterable[tuple[Any, Any]] | None, 

343) -> dict[Any, Any] | None: 

344 """Take an object and test to see if it can be represented as a 

345 dictionary. Unless it can not be represented as such, return an 

346 OrderedDict, e.g., 

347 

348 :: 

349 

350 >>> from_key_val_list([('key', 'val')]) 

351 OrderedDict([('key', 'val')]) 

352 >>> from_key_val_list('string') 

353 Traceback (most recent call last): 

354 ... 

355 ValueError: cannot encode objects that are not 2-tuples 

356 >>> from_key_val_list({'key': 'val'}) 

357 OrderedDict([('key', 'val')]) 

358 

359 :rtype: OrderedDict 

360 """ 

361 if value is None: 

362 return None 

363 

364 if isinstance(value, (str, bytes, bool, int)): 

365 raise ValueError("cannot encode objects that are not 2-tuples") 

366 

367 return OrderedDict(value) 

368 

369 

370@overload 

371def to_key_val_list(value: None) -> None: ... 

372@overload 

373def to_key_val_list( 

374 value: _t.SupportsItems[_KT, _VT] | Iterable[tuple[_KT, _VT]], 

375) -> list[tuple[_KT, _VT]]: ... 

376def to_key_val_list( 

377 value: _t.SupportsItems[_KT, _VT] | Iterable[tuple[_KT, _VT]] | None, 

378) -> list[tuple[_KT, _VT]] | None: 

379 """Take an object and test to see if it can be represented as a 

380 dictionary. If it can be, return a list of tuples, e.g., 

381 

382 :: 

383 

384 >>> to_key_val_list([('key', 'val')]) 

385 [('key', 'val')] 

386 >>> to_key_val_list({'key': 'val'}) 

387 [('key', 'val')] 

388 >>> to_key_val_list('string') 

389 Traceback (most recent call last): 

390 ... 

391 ValueError: cannot encode objects that are not 2-tuples 

392 

393 :rtype: list 

394 """ 

395 if value is None: 

396 return None 

397 

398 if isinstance(value, (str, bytes, bool, int)): 

399 raise ValueError("cannot encode objects that are not 2-tuples") 

400 

401 if isinstance(value, _SupportsItems): 

402 return list(value.items()) 

403 

404 return list(value) 

405 

406 

407# From mitsuhiko/werkzeug (used with permission). 

408def parse_list_header(value: str) -> list[str]: 

409 """Parse lists as described by RFC 2068 Section 2. 

410 

411 In particular, parse comma-separated lists where the elements of 

412 the list may include quoted-strings. A quoted-string could 

413 contain a comma. A non-quoted string could have quotes in the 

414 middle. Quotes are removed automatically after parsing. 

415 

416 It basically works like :func:`parse_set_header` just that items 

417 may appear multiple times and case sensitivity is preserved. 

418 

419 The return value is a standard :class:`list`: 

420 

421 >>> parse_list_header('token, "quoted value"') 

422 ['token', 'quoted value'] 

423 

424 To create a header from the :class:`list` again, use the 

425 :func:`dump_header` function. 

426 

427 :param value: a string with a list header. 

428 :return: :class:`list` 

429 :rtype: list 

430 """ 

431 result: list[str] = [] 

432 for item in _parse_list_header(value): 

433 if item[:1] == item[-1:] == '"': 

434 item = unquote_header_value(item[1:-1]) 

435 result.append(item) 

436 return result 

437 

438 

439# From mitsuhiko/werkzeug (used with permission). 

440def parse_dict_header(value: str) -> dict[str, str | None]: 

441 """Parse lists of key, value pairs as described by RFC 2068 Section 2 and 

442 convert them into a python dict: 

443 

444 >>> d = parse_dict_header('foo="is a fish", bar="as well"') 

445 >>> type(d) is dict 

446 True 

447 >>> sorted(d.items()) 

448 [('bar', 'as well'), ('foo', 'is a fish')] 

449 

450 If there is no value for a key it will be `None`: 

451 

452 >>> parse_dict_header('key_without_value') 

453 {'key_without_value': None} 

454 

455 To create a header from the :class:`dict` again, use the 

456 :func:`dump_header` function. 

457 

458 :param value: a string with a dict header. 

459 :return: :class:`dict` 

460 :rtype: dict 

461 """ 

462 result: dict[str, str | None] = {} 

463 for item in _parse_list_header(value): 

464 if "=" not in item: 

465 result[item] = None 

466 continue 

467 name, value = item.split("=", 1) 

468 if value[:1] == value[-1:] == '"': 

469 value = unquote_header_value(value[1:-1]) 

470 result[name] = value 

471 return result 

472 

473 

474# From mitsuhiko/werkzeug (used with permission). 

475def unquote_header_value(value: str, is_filename: bool = False) -> str: 

476 r"""Unquotes a header value. (Reversal of :func:`quote_header_value`). 

477 This does not use the real unquoting but what browsers are actually 

478 using for quoting. 

479 

480 :param value: the header value to unquote. 

481 :rtype: str 

482 """ 

483 if value and value[0] == value[-1] == '"': 

484 # this is not the real unquoting, but fixing this so that the 

485 # RFC is met will result in bugs with internet explorer and 

486 # probably some other browsers as well. IE for example is 

487 # uploading files with "C:\foo\bar.txt" as filename 

488 value = value[1:-1] 

489 

490 # if this is a filename and the starting characters look like 

491 # a UNC path, then just return the value without quotes. Using the 

492 # replace sequence below on a UNC path has the effect of turning 

493 # the leading double slash into a single slash and then 

494 # _fix_ie_filename() doesn't work correctly. See #458. 

495 if not is_filename or value[:2] != "\\\\": 

496 return value.replace("\\\\", "\\").replace('\\"', '"') 

497 return value 

498 

499 

500def dict_from_cookiejar(cj: CookieJar) -> dict[str, str | None]: 

501 """Returns a key/value dictionary from a CookieJar. 

502 

503 :param cj: CookieJar object to extract cookies from. 

504 :rtype: dict 

505 """ 

506 

507 cookie_dict = {cookie.name: cookie.value for cookie in cj} 

508 return cookie_dict 

509 

510 

511def add_dict_to_cookiejar(cj: CookieJar, cookie_dict: dict[str, str]) -> CookieJar: 

512 """Returns a CookieJar from a key/value dictionary. 

513 

514 :param cj: CookieJar to insert cookies into. 

515 :param cookie_dict: Dict of key/values to insert into CookieJar. 

516 :rtype: CookieJar 

517 """ 

518 

519 return cookiejar_from_dict(cookie_dict, cj) 

520 

521 

522def get_encodings_from_content(content: str) -> list[str]: 

523 """Returns encodings from given content string. 

524 

525 :param content: bytestring to extract encodings from. 

526 """ 

527 warnings.warn( 

528 ( 

529 "In requests 3.0, get_encodings_from_content will be removed. For " 

530 "more information, please see the discussion on issue #2266. (This" 

531 " warning should only appear once.)" 

532 ), 

533 DeprecationWarning, 

534 ) 

535 

536 charset_re = re.compile(r'<meta.*?charset=["\']*(.+?)["\'>]', flags=re.I) 

537 pragma_re = re.compile(r'<meta.*?content=["\']*;?charset=(.+?)["\'>]', flags=re.I) 

538 xml_re = re.compile(r'^<\?xml.*?encoding=["\']*(.+?)["\'>]') 

539 

540 return ( 

541 charset_re.findall(content) 

542 + pragma_re.findall(content) 

543 + xml_re.findall(content) 

544 ) 

545 

546 

547def _parse_content_type_header(header: str) -> tuple[str, dict[str, Any]]: 

548 """Returns content type and parameters from given header. 

549 

550 :param header: string 

551 :return: tuple containing content type and dictionary of 

552 parameters. 

553 """ 

554 

555 tokens = header.split(";") 

556 content_type, params = tokens[0].strip(), tokens[1:] 

557 params_dict: dict[str, str | bool] = {} 

558 strip_chars = "\"' " 

559 

560 for param in params: 

561 param = param.strip() 

562 if param and (idx := param.find("=")) != -1: 

563 key = param[:idx].strip(strip_chars) 

564 value = param[idx + 1 :].strip(strip_chars) 

565 params_dict[key.lower()] = value 

566 return content_type, params_dict 

567 

568 

569def get_encoding_from_headers(headers: CaseInsensitiveDict[str]) -> str | None: 

570 """Returns encodings from given HTTP Header Dict. 

571 

572 :param headers: dictionary to extract encoding from. 

573 :rtype: str 

574 """ 

575 

576 content_type = headers.get("content-type") 

577 

578 if not content_type: 

579 return None 

580 

581 content_type, params = _parse_content_type_header(content_type) 

582 

583 if "charset" in params: 

584 return params["charset"].strip("'\"") 

585 

586 if "text" in content_type: 

587 return "ISO-8859-1" 

588 

589 if "application/json" in content_type: 

590 # Assume UTF-8 based on RFC 4627: https://www.ietf.org/rfc/rfc4627.txt since the charset was unset 

591 return "utf-8" 

592 

593 

594def stream_decode_response_unicode( 

595 iterator: Iterable[bytes], r: Response 

596) -> Generator[str | bytes, None, None]: 

597 """Stream decodes an iterator.""" 

598 

599 if r.encoding is None: 

600 yield from iterator 

601 return 

602 

603 decoder = codecs.getincrementaldecoder(r.encoding)(errors="replace") 

604 for chunk in iterator: 

605 rv = decoder.decode(chunk) 

606 if rv: 

607 yield rv 

608 rv = decoder.decode(b"", final=True) 

609 if rv: 

610 yield rv 

611 

612 

613@overload 

614def iter_slices( 

615 string: bytes, slice_length: int | None 

616) -> Generator[bytes, None, None]: ... 

617@overload 

618def iter_slices( 

619 string: str, slice_length: int | None 

620) -> Generator[str, None, None]: ... 

621def iter_slices( 

622 string: bytes | str, slice_length: int | None 

623) -> Generator[bytes | str, None, None]: 

624 """Iterate over slices of a string.""" 

625 pos = 0 

626 if slice_length is None or slice_length <= 0: 

627 slice_length = len(string) 

628 while pos < len(string): 

629 yield string[pos : pos + slice_length] 

630 pos += slice_length 

631 

632 

633def get_unicode_from_response(r: Response) -> str | bytes | None: 

634 """Returns the requested content back in unicode. 

635 

636 :param r: Response object to get unicode content from. 

637 

638 Tried: 

639 

640 1. charset from content-type 

641 2. fall back and replace all unicode characters 

642 

643 :rtype: str 

644 """ 

645 warnings.warn( 

646 ( 

647 "In requests 3.0, get_unicode_from_response will be removed. For " 

648 "more information, please see the discussion on issue #2266. (This" 

649 " warning should only appear once.)" 

650 ), 

651 DeprecationWarning, 

652 ) 

653 if r.content is None: # type: ignore[reportUnnecessaryComparison] 

654 return None 

655 

656 tried_encodings: list[str] = [] 

657 

658 # Try charset from content-type 

659 encoding = get_encoding_from_headers(r.headers) 

660 

661 if encoding: 

662 try: 

663 return str(r.content, encoding) 

664 except UnicodeError: 

665 tried_encodings.append(encoding) 

666 

667 # Fall back: 

668 try: 

669 return str(r.content, encoding or "utf-8", errors="replace") 

670 except TypeError: 

671 return r.content 

672 

673 

674# The unreserved URI characters (RFC 3986) 

675UNRESERVED_SET: Final = frozenset( 

676 "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" + "0123456789-._~" 

677) 

678 

679 

680def unquote_unreserved(uri: str) -> str: 

681 """Un-escape any percent-escape sequences in a URI that are unreserved 

682 characters. This leaves all reserved, illegal and non-ASCII bytes encoded. 

683 

684 :rtype: str 

685 """ 

686 parts = uri.split("%") 

687 for i in range(1, len(parts)): 

688 h = parts[i][0:2] 

689 if len(h) == 2 and h.isalnum(): 

690 try: 

691 c = chr(int(h, 16)) 

692 except ValueError: 

693 raise InvalidURL(f"Invalid percent-escape sequence: '{h}'") 

694 

695 if c in UNRESERVED_SET: 

696 parts[i] = c + parts[i][2:] 

697 else: 

698 parts[i] = f"%{parts[i]}" 

699 else: 

700 parts[i] = f"%{parts[i]}" 

701 return "".join(parts) 

702 

703 

704def requote_uri(uri: str) -> str: 

705 """Re-quote the given URI. 

706 

707 This function passes the given URI through an unquote/quote cycle to 

708 ensure that it is fully and consistently quoted. 

709 

710 :rtype: str 

711 """ 

712 safe_with_percent = "!#$%&'()*+,/:;=?@[]~" 

713 safe_without_percent = "!#$&'()*+,/:;=?@[]~" 

714 try: 

715 # Unquote only the unreserved characters 

716 # Then quote only illegal characters (do not quote reserved, 

717 # unreserved, or '%') 

718 return quote(unquote_unreserved(uri), safe=safe_with_percent) 

719 except InvalidURL: 

720 # We couldn't unquote the given URI, so let's try quoting it, but 

721 # there may be unquoted '%'s in the URI. We need to make sure they're 

722 # properly quoted so they do not cause issues elsewhere. 

723 return quote(uri, safe=safe_without_percent) 

724 

725 

726def address_in_network(ip: str, net: str) -> bool: 

727 """This function allows you to check if an IP belongs to a network subnet 

728 

729 Example: returns True if ip = 192.168.1.1 and net = 192.168.1.0/24 

730 returns False if ip = 192.168.1.1 and net = 192.168.100.0/24 

731 

732 :rtype: bool 

733 """ 

734 ipaddr = struct.unpack("=L", socket.inet_aton(ip))[0] 

735 netaddr, bits = net.split("/") 

736 netmask = struct.unpack("=L", socket.inet_aton(dotted_netmask(int(bits))))[0] 

737 network = struct.unpack("=L", socket.inet_aton(netaddr))[0] & netmask 

738 return (ipaddr & netmask) == (network & netmask) 

739 

740 

741def dotted_netmask(mask: int) -> str: 

742 """Converts mask from /xx format to xxx.xxx.xxx.xxx 

743 

744 Example: if mask is 24 function returns 255.255.255.0 

745 

746 :rtype: str 

747 """ 

748 bits = 0xFFFFFFFF ^ (1 << 32 - mask) - 1 

749 return socket.inet_ntoa(struct.pack(">I", bits)) 

750 

751 

752def is_ipv4_address(string_ip: str) -> bool: 

753 """ 

754 :rtype: bool 

755 """ 

756 try: 

757 socket.inet_aton(string_ip) 

758 except OSError: 

759 return False 

760 return True 

761 

762 

763def is_valid_cidr(string_network: str) -> bool: 

764 """ 

765 Very simple check of the cidr format in no_proxy variable. 

766 

767 :rtype: bool 

768 """ 

769 if string_network.count("/") == 1: 

770 try: 

771 mask = int(string_network.split("/")[1]) 

772 except ValueError: 

773 return False 

774 

775 if mask < 1 or mask > 32: 

776 return False 

777 

778 try: 

779 socket.inet_aton(string_network.split("/")[0]) 

780 except OSError: 

781 return False 

782 else: 

783 return False 

784 return True 

785 

786 

787@contextlib.contextmanager 

788def set_environ(env_name: str, value: str | None) -> Generator[None, None, None]: 

789 """Set the environment variable 'env_name' to 'value' 

790 

791 Save previous value, yield, and then restore the previous value stored in 

792 the environment variable 'env_name'. 

793 

794 If 'value' is None, do nothing""" 

795 value_changed = value is not None 

796 old_value: str | None = None 

797 if value_changed: 

798 old_value = os.environ.get(env_name) 

799 os.environ[env_name] = value 

800 try: 

801 yield 

802 finally: 

803 if value_changed: 

804 if old_value is None: 

805 del os.environ[env_name] 

806 else: 

807 os.environ[env_name] = old_value 

808 

809 

810def should_bypass_proxies(url: str, no_proxy: str | None) -> bool: 

811 """ 

812 Returns whether we should bypass proxies or not. 

813 

814 :rtype: bool 

815 """ 

816 

817 # Prioritize lowercase environment variables over uppercase 

818 # to keep a consistent behaviour with other http projects (curl, wget). 

819 def get_proxy(key: str) -> str | None: 

820 return os.environ.get(key) or os.environ.get(key.upper()) 

821 

822 # First check whether no_proxy is defined. If it is, check that the URL 

823 # we're getting isn't in the no_proxy list. 

824 no_proxy_arg = no_proxy 

825 if no_proxy is None: 

826 no_proxy = get_proxy("no_proxy") 

827 parsed = urlparse(url) 

828 hostname = parsed.hostname 

829 

830 if hostname is None: 

831 # URLs don't always have hostnames, e.g. file:/// urls. 

832 return True 

833 

834 if no_proxy: 

835 # We need to check whether we match here. We need to see if we match 

836 # the end of the hostname, both with and without the port. 

837 no_proxy_hosts = (host for host in no_proxy.replace(" ", "").split(",") if host) 

838 

839 if is_ipv4_address(hostname): 

840 for proxy_ip in no_proxy_hosts: 

841 if is_valid_cidr(proxy_ip): 

842 if address_in_network(hostname, proxy_ip): 

843 return True 

844 elif hostname == proxy_ip: 

845 # If no_proxy ip was defined in plain IP notation instead of cidr notation & 

846 # matches the IP of the index 

847 return True 

848 else: 

849 host_with_port = hostname 

850 if parsed.port: 

851 host_with_port += f":{parsed.port}" 

852 

853 for host in no_proxy_hosts: 

854 host = host.lstrip(".") 

855 if hostname == host or host_with_port == host: 

856 return True 

857 host = "." + host 

858 if hostname.endswith(host) or host_with_port.endswith(host): 

859 return True 

860 

861 with set_environ("no_proxy", no_proxy_arg): 

862 try: 

863 bypass = proxy_bypass(hostname) 

864 except (TypeError, socket.gaierror): 

865 bypass = False 

866 

867 if bypass: 

868 return True 

869 

870 return False 

871 

872 

873def get_environ_proxies(url: str, no_proxy: str | None = None) -> dict[str, str]: 

874 """ 

875 Return a dict of environment proxies. 

876 

877 :rtype: dict 

878 """ 

879 if should_bypass_proxies(url, no_proxy=no_proxy): 

880 return {} 

881 else: 

882 return getproxies() 

883 

884 

885def select_proxy(url: str, proxies: dict[str, str] | None) -> str | None: 

886 """Select a proxy for the url, if applicable. 

887 

888 :param url: The url being for the request 

889 :param proxies: A dictionary of schemes or schemes and hosts to proxy URLs 

890 """ 

891 proxies = proxies or {} 

892 urlparts = urlparse(url) 

893 if urlparts.hostname is None: 

894 return proxies.get(urlparts.scheme, proxies.get("all")) 

895 

896 proxy_keys = [ 

897 urlparts.scheme + "://" + urlparts.hostname, 

898 urlparts.scheme, 

899 "all://" + urlparts.hostname, 

900 "all", 

901 ] 

902 proxy = None 

903 for proxy_key in proxy_keys: 

904 if proxy_key in proxies: 

905 proxy = proxies[proxy_key] 

906 break 

907 

908 return proxy 

909 

910 

911def resolve_proxies( 

912 request: Request | PreparedRequest, 

913 proxies: dict[str, str] | None, 

914 trust_env: bool = True, 

915) -> dict[str, str]: 

916 """This method takes proxy information from a request and configuration 

917 input to resolve a mapping of target proxies. This will consider settings 

918 such as NO_PROXY to strip proxy configurations. 

919 

920 :param request: Request or PreparedRequest 

921 :param proxies: A dictionary of schemes or schemes and hosts to proxy URLs 

922 :param trust_env: Boolean declaring whether to trust environment configs 

923 

924 :rtype: dict 

925 """ 

926 proxies = proxies if proxies is not None else {} 

927 url = cast(str, request.url) 

928 scheme = urlparse(url).scheme 

929 no_proxy = proxies.get("no_proxy") 

930 new_proxies = proxies.copy() 

931 

932 if trust_env and not should_bypass_proxies(url, no_proxy=no_proxy): 

933 environ_proxies = get_environ_proxies(url, no_proxy=no_proxy) 

934 

935 proxy = environ_proxies.get(scheme, environ_proxies.get("all")) 

936 

937 if proxy: 

938 new_proxies.setdefault(scheme, proxy) 

939 return new_proxies 

940 

941 

942def default_user_agent(name: str = "python-requests") -> str: 

943 """ 

944 Return a string representing the default user agent. 

945 

946 :rtype: str 

947 """ 

948 return f"{name}/{__version__}" 

949 

950 

951def default_headers() -> CaseInsensitiveDict[str]: 

952 """ 

953 :rtype: requests.structures.CaseInsensitiveDict 

954 """ 

955 return CaseInsensitiveDict( 

956 { 

957 "User-Agent": default_user_agent(), 

958 "Accept-Encoding": DEFAULT_ACCEPT_ENCODING, 

959 "Accept": "*/*", 

960 "Connection": "keep-alive", 

961 } 

962 ) 

963 

964 

965def parse_header_links(value: str) -> list[dict[str, str]]: 

966 """Return a list of parsed link headers proxies. 

967 

968 i.e. Link: <http:/.../front.jpeg>; rel=front; type="image/jpeg",<http://.../back.jpeg>; rel=back;type="image/jpeg" 

969 

970 :rtype: list 

971 """ 

972 

973 links: list[dict[str, str]] = [] 

974 

975 replace_chars = " '\"" 

976 

977 value = value.strip(replace_chars) 

978 if not value: 

979 return links 

980 

981 for val in re.split(", *<", value): 

982 try: 

983 url, params = val.split(";", 1) 

984 except ValueError: 

985 url, params = val, "" 

986 

987 link: dict[str, str] = {"url": url.strip("<> '\"")} 

988 

989 for param in params.split(";"): 

990 try: 

991 key, value = param.split("=") 

992 except ValueError: 

993 break 

994 

995 link[key.strip(replace_chars)] = value.strip(replace_chars) 

996 

997 links.append(link) 

998 

999 return links 

1000 

1001 

1002# Null bytes; no need to recreate these on each call to guess_json_utf 

1003_null = "\x00".encode("ascii") # encoding to ASCII for Python 3 

1004_null2 = _null * 2 

1005_null3 = _null * 3 

1006 

1007 

1008def guess_json_utf(data: bytes) -> str | None: 

1009 """ 

1010 :rtype: str 

1011 """ 

1012 # JSON always starts with two ASCII characters, so detection is as 

1013 # easy as counting the nulls and from their location and count 

1014 # determine the encoding. Also detect a BOM, if present. 

1015 sample = data[:4] 

1016 if sample in (codecs.BOM_UTF32_LE, codecs.BOM_UTF32_BE): 

1017 return "utf-32" # BOM included 

1018 if sample[:3] == codecs.BOM_UTF8: 

1019 return "utf-8-sig" # BOM included, MS style (discouraged) 

1020 if sample[:2] in (codecs.BOM_UTF16_LE, codecs.BOM_UTF16_BE): 

1021 return "utf-16" # BOM included 

1022 nullcount = sample.count(_null) 

1023 if nullcount == 0: 

1024 return "utf-8" 

1025 if nullcount == 2: 

1026 if sample[::2] == _null2: # 1st and 3rd are null 

1027 return "utf-16-be" 

1028 if sample[1::2] == _null2: # 2nd and 4th are null 

1029 return "utf-16-le" 

1030 # Did not detect 2 valid UTF-16 ascii-range characters 

1031 if nullcount == 3: 

1032 if sample[:3] == _null3: 

1033 return "utf-32-be" 

1034 if sample[1:] == _null3: 

1035 return "utf-32-le" 

1036 # Did not detect a valid UTF-32 ascii-range character 

1037 return None 

1038 

1039 

1040def prepend_scheme_if_needed(url: str, new_scheme: str) -> str: 

1041 """Given a URL that may or may not have a scheme, prepend the given scheme. 

1042 Does not replace a present scheme with the one provided as an argument. 

1043 

1044 :rtype: str 

1045 """ 

1046 parsed = parse_url(url) 

1047 scheme, auth, _host, _port, path, query, fragment = parsed 

1048 

1049 # A defect in urlparse determines that there isn't a netloc present in some 

1050 # urls. We previously assumed parsing was overly cautious, and swapped the 

1051 # netloc and path. Due to a lack of tests on the original defect, this is 

1052 # maintained with parse_url for backwards compatibility. 

1053 netloc = parsed.netloc 

1054 if not netloc: 

1055 netloc, path = path, netloc 

1056 

1057 if auth: 

1058 # parse_url doesn't provide the netloc with auth 

1059 # so we'll add it ourselves. 

1060 netloc = cast(str, netloc) 

1061 netloc = "@".join([auth, netloc]) 

1062 if scheme is None: 

1063 scheme = new_scheme 

1064 if path is None: 

1065 path = "" 

1066 

1067 return urlunparse((scheme, netloc, path, "", query, fragment)) 

1068 

1069 

1070def get_auth_from_url(url: str) -> tuple[str, str]: 

1071 """Given a url with authentication components, extract them into a tuple of 

1072 username,password. 

1073 

1074 :rtype: (str,str) 

1075 """ 

1076 parsed = urlparse(url) 

1077 

1078 try: 

1079 # except handles parsed.username/password being None 

1080 auth = (unquote(parsed.username), unquote(parsed.password)) # type: ignore[arg-type] 

1081 except (AttributeError, TypeError): 

1082 auth = ("", "") 

1083 

1084 return auth 

1085 

1086 

1087def check_header_validity(header: tuple[str | bytes, str | bytes]) -> None: 

1088 """Verifies that header parts don't contain leading whitespace 

1089 reserved characters, or return characters. 

1090 

1091 :param header: tuple, in the format (name, value). 

1092 """ 

1093 name, value = header 

1094 _validate_header_part(header, name, 0) 

1095 _validate_header_part(header, value, 1) 

1096 

1097 

1098def _validate_header_part( 

1099 header: tuple[str | bytes, str | bytes], 

1100 header_part: str | bytes, 

1101 header_validator_index: int, 

1102) -> None: 

1103 if isinstance(header_part, str): 

1104 validator = _HEADER_VALIDATORS_STR[header_validator_index] 

1105 elif isinstance(header_part, bytes): # type: ignore[reportUnnecessaryIsInstance] 

1106 # runtime guard for non-str/bytes input 

1107 validator = _HEADER_VALIDATORS_BYTE[header_validator_index] 

1108 else: 

1109 raise InvalidHeader( 

1110 f"Header part ({header_part!r}) from {header} " 

1111 f"must be of type str or bytes, not {type(header_part)}" 

1112 ) 

1113 

1114 if not validator.match(header_part): # type: ignore[arg-type] 

1115 header_kind = "name" if header_validator_index == 0 else "value" 

1116 raise InvalidHeader( 

1117 f"Invalid leading whitespace, reserved character(s), or return " 

1118 f"character(s) in header {header_kind}: {header_part!r}" 

1119 ) 

1120 

1121 

1122def urldefragauth(url: str) -> str: 

1123 """ 

1124 Given a url remove the fragment and the authentication part. 

1125 

1126 :rtype: str 

1127 """ 

1128 scheme, netloc, path, params, query, _fragment = urlparse(url) 

1129 

1130 # see func:`prepend_scheme_if_needed` 

1131 if not netloc: 

1132 netloc, path = path, netloc 

1133 

1134 netloc = netloc.rsplit("@", 1)[-1] 

1135 

1136 return urlunparse((scheme, netloc, path, params, query, "")) 

1137 

1138 

1139def rewind_body(prepared_request: PreparedRequest) -> None: 

1140 """Move file pointer back to its recorded starting position 

1141 so it can be read again on redirect. 

1142 """ 

1143 body_seek = getattr(prepared_request.body, "seek", None) 

1144 if body_seek is not None and isinstance( 

1145 prepared_request._body_position, # type: ignore[reportPrivateUsage] 

1146 integer_types, 

1147 ): 

1148 try: 

1149 body_seek(prepared_request._body_position) # type: ignore[reportPrivateUsage] 

1150 except OSError: 

1151 raise UnrewindableBodyError( 

1152 "An error occurred when rewinding request body for redirect." 

1153 ) 

1154 else: 

1155 raise UnrewindableBodyError("Unable to rewind request body for redirect.")