1"""
2requests.utils
3~~~~~~~~~~~~~~
4
5This module provides utility functions that are used within Requests
6that are also useful for external consumption.
7"""
8
9from __future__ import annotations
10
11import codecs
12import contextlib
13import io
14import os
15import re
16import socket
17import struct
18import sys
19import tempfile
20import warnings
21import zipfile
22from collections import OrderedDict
23from collections.abc import Generator, Iterable
24from typing import (
25 TYPE_CHECKING,
26 Any,
27 Final,
28 TypeVar,
29 cast,
30 overload,
31)
32
33from pip._vendor.urllib3.util import make_headers, parse_url
34
35from . import certs
36from .__version__ import __version__
37
38# to_native_string is unused here, but imported here for backwards compatibility
39from ._internal_utils import ( # noqa: F401
40 _HEADER_VALIDATORS_BYTE, # type: ignore[reportPrivateUsage]
41 _HEADER_VALIDATORS_STR, # type: ignore[reportPrivateUsage]
42 HEADER_VALIDATORS, # type: ignore[reportUnusedImport]
43 to_native_string, # type: ignore[reportUnusedImport]
44)
45from ._types import SupportsItems as _SupportsItems
46from .compat import (
47 Mapping,
48 bytes,
49 getproxies,
50 getproxies_environment,
51 integer_types,
52 is_urllib3_1,
53 proxy_bypass,
54 proxy_bypass_environment, # type: ignore[attr-defined] # https://github.com/python/cpython/issues/145331
55 quote,
56 str,
57 unquote,
58 urlparse,
59 urlunparse,
60)
61from .compat import parse_http_list as _parse_list_header
62from .cookies import cookiejar_from_dict
63from .exceptions import (
64 FileModeWarning,
65 InvalidHeader,
66 InvalidURL,
67 UnrewindableBodyError,
68)
69from .structures import CaseInsensitiveDict
70
71if TYPE_CHECKING:
72 from http.cookiejar import CookieJar
73 from io import BufferedWriter
74
75 from . import _types as _t
76 from .models import PreparedRequest, Request, Response
77
78NETRC_FILES: Final = (".netrc", "_netrc")
79
80
81# Certificate is extracted by certifi when needed.
82DEFAULT_CA_BUNDLE_PATH: str = certs.where()
83
84
85DEFAULT_PORTS: Final = {"http": 80, "https": 443}
86
87_KT = TypeVar("_KT")
88_VT = TypeVar("_VT")
89
90# Ensure that ', ' is used to preserve previous delimiter behavior.
91DEFAULT_ACCEPT_ENCODING: Final = ", ".join(
92 re.split(r",\s*", make_headers(accept_encoding=True)["accept-encoding"])
93)
94
95
96if sys.platform == "win32":
97 # provide a proxy_bypass version on Windows without DNS lookups
98
99 def proxy_bypass_registry(host: str) -> bool:
100 try:
101 import winreg
102 except ImportError:
103 return False
104
105 try:
106 internetSettings = winreg.OpenKey(
107 winreg.HKEY_CURRENT_USER,
108 r"Software\Microsoft\Windows\CurrentVersion\Internet Settings",
109 )
110 # ProxyEnable could be REG_SZ or REG_DWORD, normalizing it
111 proxyEnable = int(winreg.QueryValueEx(internetSettings, "ProxyEnable")[0])
112 # ProxyOverride is almost always a string
113 proxyOverride = winreg.QueryValueEx(internetSettings, "ProxyOverride")[0]
114 except (OSError, ValueError):
115 return False
116 if not proxyEnable or not proxyOverride:
117 return False
118
119 # make a check value list from the registry entry: replace the
120 # '<local>' string by the localhost entry and the corresponding
121 # canonical entry.
122 proxyOverride = proxyOverride.split(";")
123 # filter out empty strings to avoid re.match return true in the following code.
124 proxyOverride = filter(None, proxyOverride)
125 # now check if we match one of the registry values.
126 for test in proxyOverride:
127 if test == "<local>":
128 if "." not in host:
129 return True
130 test = test.replace(".", r"\.") # mask dots
131 test = test.replace("*", r".*") # change glob sequence
132 test = test.replace("?", r".") # change glob char
133 if re.match(test, host, re.I):
134 return True
135 return False
136
137 def proxy_bypass(host: str) -> bool: # noqa
138 """Return True, if the host should be bypassed.
139
140 Checks proxy settings gathered from the environment, if specified,
141 or the registry.
142 """
143 if getproxies_environment():
144 return proxy_bypass_environment(host)
145 else:
146 return proxy_bypass_registry(host)
147
148
149def dict_to_sequence(
150 d: _t.SupportsItems[Any, Any] | Iterable[tuple[Any, Any]],
151) -> Iterable[tuple[Any, Any]]:
152 """Returns an internal sequence dictionary update."""
153
154 if isinstance(d, _SupportsItems):
155 return d.items()
156
157 return d
158
159
160def super_len(o: Any) -> int:
161 total_length = None
162 current_position = 0
163
164 if not is_urllib3_1 and isinstance(o, str):
165 # urllib3 2.x+ treats all strings as utf-8 instead
166 # of latin-1 (iso-8859-1) like http.client.
167 o = o.encode("utf-8")
168
169 if hasattr(o, "__len__"):
170 total_length = len(o)
171
172 elif hasattr(o, "len"):
173 total_length = o.len
174
175 elif hasattr(o, "fileno"):
176 try:
177 fileno = o.fileno()
178 except (io.UnsupportedOperation, AttributeError):
179 # AttributeError is a surprising exception, seeing as how we've just checked
180 # that `hasattr(o, 'fileno')`. It happens for objects obtained via
181 # `Tarfile.extractfile()`, per issue 5229.
182 pass
183 else:
184 total_length = os.fstat(fileno).st_size
185
186 # Having used fstat to determine the file length, we need to
187 # confirm that this file was opened up in binary mode.
188 if "b" not in o.mode:
189 warnings.warn(
190 (
191 "Requests has determined the content-length for this "
192 "request using the binary size of the file: however, the "
193 "file has been opened in text mode (i.e. without the 'b' "
194 "flag in the mode). This may lead to an incorrect "
195 "content-length. In Requests 3.0, support will be removed "
196 "for files in text mode."
197 ),
198 FileModeWarning,
199 )
200
201 if hasattr(o, "tell"):
202 try:
203 current_position = o.tell()
204 except OSError:
205 # This can happen in some weird situations, such as when the file
206 # is actually a special file descriptor like stdin. In this
207 # instance, we don't know what the length is, so set it to zero and
208 # let requests chunk it instead.
209 if total_length is not None:
210 current_position = total_length
211 else:
212 if hasattr(o, "seek") and total_length is None:
213 # StringIO and BytesIO have seek but no usable fileno
214 try:
215 # seek to end of file
216 o.seek(0, 2)
217 total_length = o.tell()
218
219 # seek back to current position to support
220 # partially read file-like objects
221 o.seek(current_position or 0)
222 except OSError:
223 total_length = 0
224
225 if total_length is None:
226 total_length = 0
227
228 return max(0, total_length - current_position)
229
230
231def get_netrc_auth(
232 url: _t.UriType, raise_errors: bool = False
233) -> tuple[str, str] | None:
234 """Returns the Requests tuple auth for a given url from netrc."""
235
236 if isinstance(url, bytes):
237 url = url.decode("utf-8")
238
239 netrc_file = os.environ.get("NETRC")
240 if netrc_file is not None:
241 netrc_locations = (netrc_file,)
242 else:
243 netrc_locations = (f"~/{f}" for f in NETRC_FILES)
244
245 try:
246 from netrc import NetrcParseError, netrc
247
248 netrc_path = None
249
250 for f in netrc_locations:
251 loc = os.path.expanduser(f)
252 if os.path.exists(loc):
253 netrc_path = loc
254 break
255
256 # Abort early if there isn't one.
257 if netrc_path is None:
258 return
259
260 ri = urlparse(url)
261 host = ri.hostname
262
263 if host is None:
264 return
265
266 try:
267 _netrc = netrc(netrc_path).authenticators(host)
268 if _netrc and any(_netrc):
269 # Return with login / password
270 login_i = 0 if _netrc[0] else 1
271 return (_netrc[login_i] or "", _netrc[2] or "")
272 except (NetrcParseError, OSError):
273 # If there was a parsing error or a permissions issue reading the file,
274 # we'll just skip netrc auth unless explicitly asked to raise errors.
275 if raise_errors:
276 raise
277
278 # App Engine hackiness.
279 except (ImportError, AttributeError):
280 pass
281
282
283def guess_filename(obj: Any) -> str | None:
284 """Tries to guess the filename of the given object."""
285 name = getattr(obj, "name", None)
286 if name and isinstance(name, (str, bytes)) and name[0] != "<" and name[-1] != ">":
287 return os.path.basename(name) # type: ignore[return-value] # urllib3 accepts bytes but types str only
288
289
290def extract_zipped_paths(path: str) -> str:
291 """Replace nonexistent paths that look like they refer to a member of a zip
292 archive with the location of an extracted copy of the target, or else
293 just return the provided path unchanged.
294 """
295 if os.path.exists(path):
296 # this is already a valid path, no need to do anything further
297 return path
298
299 # find the first valid part of the provided path and treat that as a zip archive
300 # assume the rest of the path is the name of a member in the archive
301 archive, member = os.path.split(path)
302 while archive and not os.path.exists(archive):
303 archive, prefix = os.path.split(archive)
304 if not prefix:
305 # If we don't check for an empty prefix after the split (in other words, archive remains unchanged after the split),
306 # we _can_ end up in an infinite loop on a rare corner case affecting a small number of users
307 break
308 member = "/".join([prefix, member])
309
310 if not zipfile.is_zipfile(archive):
311 return path
312
313 zip_file = zipfile.ZipFile(archive)
314 if member not in zip_file.namelist():
315 return path
316
317 # we have a valid zip archive and a valid member of that archive
318 suffix = os.path.splitext(member.split("/")[-1])[-1]
319 fd, extracted_path = tempfile.mkstemp(suffix=suffix)
320 try:
321 os.write(fd, zip_file.read(member))
322 finally:
323 os.close(fd)
324
325 return extracted_path
326
327
328@contextlib.contextmanager
329def atomic_open(filename: str) -> Generator[BufferedWriter, None, None]:
330 """Write a file to the disk in an atomic fashion"""
331 tmp_descriptor, tmp_name = tempfile.mkstemp(dir=os.path.dirname(filename))
332 try:
333 with os.fdopen(tmp_descriptor, "wb") as tmp_handler:
334 yield tmp_handler
335 os.replace(tmp_name, filename)
336 except BaseException:
337 os.remove(tmp_name)
338 raise
339
340
341def from_key_val_list(
342 value: Mapping[Any, Any] | Iterable[tuple[Any, Any]] | None,
343) -> dict[Any, Any] | None:
344 """Take an object and test to see if it can be represented as a
345 dictionary. Unless it can not be represented as such, return an
346 OrderedDict, e.g.,
347
348 ::
349
350 >>> from_key_val_list([('key', 'val')])
351 OrderedDict([('key', 'val')])
352 >>> from_key_val_list('string')
353 Traceback (most recent call last):
354 ...
355 ValueError: cannot encode objects that are not 2-tuples
356 >>> from_key_val_list({'key': 'val'})
357 OrderedDict([('key', 'val')])
358
359 :rtype: OrderedDict
360 """
361 if value is None:
362 return None
363
364 if isinstance(value, (str, bytes, bool, int)):
365 raise ValueError("cannot encode objects that are not 2-tuples")
366
367 return OrderedDict(value)
368
369
370@overload
371def to_key_val_list(value: None) -> None: ...
372@overload
373def to_key_val_list(
374 value: _t.SupportsItems[_KT, _VT] | Iterable[tuple[_KT, _VT]],
375) -> list[tuple[_KT, _VT]]: ...
376def to_key_val_list(
377 value: _t.SupportsItems[_KT, _VT] | Iterable[tuple[_KT, _VT]] | None,
378) -> list[tuple[_KT, _VT]] | None:
379 """Take an object and test to see if it can be represented as a
380 dictionary. If it can be, return a list of tuples, e.g.,
381
382 ::
383
384 >>> to_key_val_list([('key', 'val')])
385 [('key', 'val')]
386 >>> to_key_val_list({'key': 'val'})
387 [('key', 'val')]
388 >>> to_key_val_list('string')
389 Traceback (most recent call last):
390 ...
391 ValueError: cannot encode objects that are not 2-tuples
392
393 :rtype: list
394 """
395 if value is None:
396 return None
397
398 if isinstance(value, (str, bytes, bool, int)):
399 raise ValueError("cannot encode objects that are not 2-tuples")
400
401 if isinstance(value, _SupportsItems):
402 return list(value.items())
403
404 return list(value)
405
406
407# From mitsuhiko/werkzeug (used with permission).
408def parse_list_header(value: str) -> list[str]:
409 """Parse lists as described by RFC 2068 Section 2.
410
411 In particular, parse comma-separated lists where the elements of
412 the list may include quoted-strings. A quoted-string could
413 contain a comma. A non-quoted string could have quotes in the
414 middle. Quotes are removed automatically after parsing.
415
416 It basically works like :func:`parse_set_header` just that items
417 may appear multiple times and case sensitivity is preserved.
418
419 The return value is a standard :class:`list`:
420
421 >>> parse_list_header('token, "quoted value"')
422 ['token', 'quoted value']
423
424 To create a header from the :class:`list` again, use the
425 :func:`dump_header` function.
426
427 :param value: a string with a list header.
428 :return: :class:`list`
429 :rtype: list
430 """
431 result: list[str] = []
432 for item in _parse_list_header(value):
433 if item[:1] == item[-1:] == '"':
434 item = unquote_header_value(item[1:-1])
435 result.append(item)
436 return result
437
438
439# From mitsuhiko/werkzeug (used with permission).
440def parse_dict_header(value: str) -> dict[str, str | None]:
441 """Parse lists of key, value pairs as described by RFC 2068 Section 2 and
442 convert them into a python dict:
443
444 >>> d = parse_dict_header('foo="is a fish", bar="as well"')
445 >>> type(d) is dict
446 True
447 >>> sorted(d.items())
448 [('bar', 'as well'), ('foo', 'is a fish')]
449
450 If there is no value for a key it will be `None`:
451
452 >>> parse_dict_header('key_without_value')
453 {'key_without_value': None}
454
455 To create a header from the :class:`dict` again, use the
456 :func:`dump_header` function.
457
458 :param value: a string with a dict header.
459 :return: :class:`dict`
460 :rtype: dict
461 """
462 result: dict[str, str | None] = {}
463 for item in _parse_list_header(value):
464 if "=" not in item:
465 result[item] = None
466 continue
467 name, value = item.split("=", 1)
468 if value[:1] == value[-1:] == '"':
469 value = unquote_header_value(value[1:-1])
470 result[name] = value
471 return result
472
473
474# From mitsuhiko/werkzeug (used with permission).
475def unquote_header_value(value: str, is_filename: bool = False) -> str:
476 r"""Unquotes a header value. (Reversal of :func:`quote_header_value`).
477 This does not use the real unquoting but what browsers are actually
478 using for quoting.
479
480 :param value: the header value to unquote.
481 :rtype: str
482 """
483 if value and value[0] == value[-1] == '"':
484 # this is not the real unquoting, but fixing this so that the
485 # RFC is met will result in bugs with internet explorer and
486 # probably some other browsers as well. IE for example is
487 # uploading files with "C:\foo\bar.txt" as filename
488 value = value[1:-1]
489
490 # if this is a filename and the starting characters look like
491 # a UNC path, then just return the value without quotes. Using the
492 # replace sequence below on a UNC path has the effect of turning
493 # the leading double slash into a single slash and then
494 # _fix_ie_filename() doesn't work correctly. See #458.
495 if not is_filename or value[:2] != "\\\\":
496 return value.replace("\\\\", "\\").replace('\\"', '"')
497 return value
498
499
500def dict_from_cookiejar(cj: CookieJar) -> dict[str, str | None]:
501 """Returns a key/value dictionary from a CookieJar.
502
503 :param cj: CookieJar object to extract cookies from.
504 :rtype: dict
505 """
506
507 cookie_dict = {cookie.name: cookie.value for cookie in cj}
508 return cookie_dict
509
510
511def add_dict_to_cookiejar(cj: CookieJar, cookie_dict: dict[str, str]) -> CookieJar:
512 """Returns a CookieJar from a key/value dictionary.
513
514 :param cj: CookieJar to insert cookies into.
515 :param cookie_dict: Dict of key/values to insert into CookieJar.
516 :rtype: CookieJar
517 """
518
519 return cookiejar_from_dict(cookie_dict, cj)
520
521
522def get_encodings_from_content(content: str) -> list[str]:
523 """Returns encodings from given content string.
524
525 :param content: bytestring to extract encodings from.
526 """
527 warnings.warn(
528 (
529 "In requests 3.0, get_encodings_from_content will be removed. For "
530 "more information, please see the discussion on issue #2266. (This"
531 " warning should only appear once.)"
532 ),
533 DeprecationWarning,
534 )
535
536 charset_re = re.compile(r'<meta.*?charset=["\']*(.+?)["\'>]', flags=re.I)
537 pragma_re = re.compile(r'<meta.*?content=["\']*;?charset=(.+?)["\'>]', flags=re.I)
538 xml_re = re.compile(r'^<\?xml.*?encoding=["\']*(.+?)["\'>]')
539
540 return (
541 charset_re.findall(content)
542 + pragma_re.findall(content)
543 + xml_re.findall(content)
544 )
545
546
547def _parse_content_type_header(header: str) -> tuple[str, dict[str, Any]]:
548 """Returns content type and parameters from given header.
549
550 :param header: string
551 :return: tuple containing content type and dictionary of
552 parameters.
553 """
554
555 tokens = header.split(";")
556 content_type, params = tokens[0].strip(), tokens[1:]
557 params_dict: dict[str, str | bool] = {}
558 strip_chars = "\"' "
559
560 for param in params:
561 param = param.strip()
562 if param and (idx := param.find("=")) != -1:
563 key = param[:idx].strip(strip_chars)
564 value = param[idx + 1 :].strip(strip_chars)
565 params_dict[key.lower()] = value
566 return content_type, params_dict
567
568
569def get_encoding_from_headers(headers: CaseInsensitiveDict[str]) -> str | None:
570 """Returns encodings from given HTTP Header Dict.
571
572 :param headers: dictionary to extract encoding from.
573 :rtype: str
574 """
575
576 content_type = headers.get("content-type")
577
578 if not content_type:
579 return None
580
581 content_type, params = _parse_content_type_header(content_type)
582
583 if "charset" in params:
584 return params["charset"].strip("'\"")
585
586 if "text" in content_type:
587 return "ISO-8859-1"
588
589 if "application/json" in content_type:
590 # Assume UTF-8 based on RFC 4627: https://www.ietf.org/rfc/rfc4627.txt since the charset was unset
591 return "utf-8"
592
593
594def stream_decode_response_unicode(
595 iterator: Iterable[bytes], r: Response
596) -> Generator[str | bytes, None, None]:
597 """Stream decodes an iterator."""
598
599 if r.encoding is None:
600 yield from iterator
601 return
602
603 decoder = codecs.getincrementaldecoder(r.encoding)(errors="replace")
604 for chunk in iterator:
605 rv = decoder.decode(chunk)
606 if rv:
607 yield rv
608 rv = decoder.decode(b"", final=True)
609 if rv:
610 yield rv
611
612
613@overload
614def iter_slices(
615 string: bytes, slice_length: int | None
616) -> Generator[bytes, None, None]: ...
617@overload
618def iter_slices(
619 string: str, slice_length: int | None
620) -> Generator[str, None, None]: ...
621def iter_slices(
622 string: bytes | str, slice_length: int | None
623) -> Generator[bytes | str, None, None]:
624 """Iterate over slices of a string."""
625 pos = 0
626 if slice_length is None or slice_length <= 0:
627 slice_length = len(string)
628 while pos < len(string):
629 yield string[pos : pos + slice_length]
630 pos += slice_length
631
632
633def get_unicode_from_response(r: Response) -> str | bytes | None:
634 """Returns the requested content back in unicode.
635
636 :param r: Response object to get unicode content from.
637
638 Tried:
639
640 1. charset from content-type
641 2. fall back and replace all unicode characters
642
643 :rtype: str
644 """
645 warnings.warn(
646 (
647 "In requests 3.0, get_unicode_from_response will be removed. For "
648 "more information, please see the discussion on issue #2266. (This"
649 " warning should only appear once.)"
650 ),
651 DeprecationWarning,
652 )
653 if r.content is None: # type: ignore[reportUnnecessaryComparison]
654 return None
655
656 tried_encodings: list[str] = []
657
658 # Try charset from content-type
659 encoding = get_encoding_from_headers(r.headers)
660
661 if encoding:
662 try:
663 return str(r.content, encoding)
664 except UnicodeError:
665 tried_encodings.append(encoding)
666
667 # Fall back:
668 try:
669 return str(r.content, encoding or "utf-8", errors="replace")
670 except TypeError:
671 return r.content
672
673
674# The unreserved URI characters (RFC 3986)
675UNRESERVED_SET: Final = frozenset(
676 "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" + "0123456789-._~"
677)
678
679
680def unquote_unreserved(uri: str) -> str:
681 """Un-escape any percent-escape sequences in a URI that are unreserved
682 characters. This leaves all reserved, illegal and non-ASCII bytes encoded.
683
684 :rtype: str
685 """
686 parts = uri.split("%")
687 for i in range(1, len(parts)):
688 h = parts[i][0:2]
689 if len(h) == 2 and h.isalnum():
690 try:
691 c = chr(int(h, 16))
692 except ValueError:
693 raise InvalidURL(f"Invalid percent-escape sequence: '{h}'")
694
695 if c in UNRESERVED_SET:
696 parts[i] = c + parts[i][2:]
697 else:
698 parts[i] = f"%{parts[i]}"
699 else:
700 parts[i] = f"%{parts[i]}"
701 return "".join(parts)
702
703
704def requote_uri(uri: str) -> str:
705 """Re-quote the given URI.
706
707 This function passes the given URI through an unquote/quote cycle to
708 ensure that it is fully and consistently quoted.
709
710 :rtype: str
711 """
712 safe_with_percent = "!#$%&'()*+,/:;=?@[]~"
713 safe_without_percent = "!#$&'()*+,/:;=?@[]~"
714 try:
715 # Unquote only the unreserved characters
716 # Then quote only illegal characters (do not quote reserved,
717 # unreserved, or '%')
718 return quote(unquote_unreserved(uri), safe=safe_with_percent)
719 except InvalidURL:
720 # We couldn't unquote the given URI, so let's try quoting it, but
721 # there may be unquoted '%'s in the URI. We need to make sure they're
722 # properly quoted so they do not cause issues elsewhere.
723 return quote(uri, safe=safe_without_percent)
724
725
726def address_in_network(ip: str, net: str) -> bool:
727 """This function allows you to check if an IP belongs to a network subnet
728
729 Example: returns True if ip = 192.168.1.1 and net = 192.168.1.0/24
730 returns False if ip = 192.168.1.1 and net = 192.168.100.0/24
731
732 :rtype: bool
733 """
734 ipaddr = struct.unpack("=L", socket.inet_aton(ip))[0]
735 netaddr, bits = net.split("/")
736 netmask = struct.unpack("=L", socket.inet_aton(dotted_netmask(int(bits))))[0]
737 network = struct.unpack("=L", socket.inet_aton(netaddr))[0] & netmask
738 return (ipaddr & netmask) == (network & netmask)
739
740
741def dotted_netmask(mask: int) -> str:
742 """Converts mask from /xx format to xxx.xxx.xxx.xxx
743
744 Example: if mask is 24 function returns 255.255.255.0
745
746 :rtype: str
747 """
748 bits = 0xFFFFFFFF ^ (1 << 32 - mask) - 1
749 return socket.inet_ntoa(struct.pack(">I", bits))
750
751
752def is_ipv4_address(string_ip: str) -> bool:
753 """
754 :rtype: bool
755 """
756 try:
757 socket.inet_aton(string_ip)
758 except OSError:
759 return False
760 return True
761
762
763def is_valid_cidr(string_network: str) -> bool:
764 """
765 Very simple check of the cidr format in no_proxy variable.
766
767 :rtype: bool
768 """
769 if string_network.count("/") == 1:
770 try:
771 mask = int(string_network.split("/")[1])
772 except ValueError:
773 return False
774
775 if mask < 1 or mask > 32:
776 return False
777
778 try:
779 socket.inet_aton(string_network.split("/")[0])
780 except OSError:
781 return False
782 else:
783 return False
784 return True
785
786
787@contextlib.contextmanager
788def set_environ(env_name: str, value: str | None) -> Generator[None, None, None]:
789 """Set the environment variable 'env_name' to 'value'
790
791 Save previous value, yield, and then restore the previous value stored in
792 the environment variable 'env_name'.
793
794 If 'value' is None, do nothing"""
795 value_changed = value is not None
796 old_value: str | None = None
797 if value_changed:
798 old_value = os.environ.get(env_name)
799 os.environ[env_name] = value
800 try:
801 yield
802 finally:
803 if value_changed:
804 if old_value is None:
805 del os.environ[env_name]
806 else:
807 os.environ[env_name] = old_value
808
809
810def should_bypass_proxies(url: str, no_proxy: str | None) -> bool:
811 """
812 Returns whether we should bypass proxies or not.
813
814 :rtype: bool
815 """
816
817 # Prioritize lowercase environment variables over uppercase
818 # to keep a consistent behaviour with other http projects (curl, wget).
819 def get_proxy(key: str) -> str | None:
820 return os.environ.get(key) or os.environ.get(key.upper())
821
822 # First check whether no_proxy is defined. If it is, check that the URL
823 # we're getting isn't in the no_proxy list.
824 no_proxy_arg = no_proxy
825 if no_proxy is None:
826 no_proxy = get_proxy("no_proxy")
827 parsed = urlparse(url)
828 hostname = parsed.hostname
829
830 if hostname is None:
831 # URLs don't always have hostnames, e.g. file:/// urls.
832 return True
833
834 if no_proxy:
835 # We need to check whether we match here. We need to see if we match
836 # the end of the hostname, both with and without the port.
837 no_proxy_hosts = (host for host in no_proxy.replace(" ", "").split(",") if host)
838
839 if is_ipv4_address(hostname):
840 for proxy_ip in no_proxy_hosts:
841 if is_valid_cidr(proxy_ip):
842 if address_in_network(hostname, proxy_ip):
843 return True
844 elif hostname == proxy_ip:
845 # If no_proxy ip was defined in plain IP notation instead of cidr notation &
846 # matches the IP of the index
847 return True
848 else:
849 host_with_port = hostname
850 if parsed.port:
851 host_with_port += f":{parsed.port}"
852
853 for host in no_proxy_hosts:
854 host = host.lstrip(".")
855 if hostname == host or host_with_port == host:
856 return True
857 host = "." + host
858 if hostname.endswith(host) or host_with_port.endswith(host):
859 return True
860
861 with set_environ("no_proxy", no_proxy_arg):
862 try:
863 bypass = proxy_bypass(hostname)
864 except (TypeError, socket.gaierror):
865 bypass = False
866
867 if bypass:
868 return True
869
870 return False
871
872
873def get_environ_proxies(url: str, no_proxy: str | None = None) -> dict[str, str]:
874 """
875 Return a dict of environment proxies.
876
877 :rtype: dict
878 """
879 if should_bypass_proxies(url, no_proxy=no_proxy):
880 return {}
881 else:
882 return getproxies()
883
884
885def select_proxy(url: str, proxies: dict[str, str] | None) -> str | None:
886 """Select a proxy for the url, if applicable.
887
888 :param url: The url being for the request
889 :param proxies: A dictionary of schemes or schemes and hosts to proxy URLs
890 """
891 proxies = proxies or {}
892 urlparts = urlparse(url)
893 if urlparts.hostname is None:
894 return proxies.get(urlparts.scheme, proxies.get("all"))
895
896 proxy_keys = [
897 urlparts.scheme + "://" + urlparts.hostname,
898 urlparts.scheme,
899 "all://" + urlparts.hostname,
900 "all",
901 ]
902 proxy = None
903 for proxy_key in proxy_keys:
904 if proxy_key in proxies:
905 proxy = proxies[proxy_key]
906 break
907
908 return proxy
909
910
911def resolve_proxies(
912 request: Request | PreparedRequest,
913 proxies: dict[str, str] | None,
914 trust_env: bool = True,
915) -> dict[str, str]:
916 """This method takes proxy information from a request and configuration
917 input to resolve a mapping of target proxies. This will consider settings
918 such as NO_PROXY to strip proxy configurations.
919
920 :param request: Request or PreparedRequest
921 :param proxies: A dictionary of schemes or schemes and hosts to proxy URLs
922 :param trust_env: Boolean declaring whether to trust environment configs
923
924 :rtype: dict
925 """
926 proxies = proxies if proxies is not None else {}
927 url = cast(str, request.url)
928 scheme = urlparse(url).scheme
929 no_proxy = proxies.get("no_proxy")
930 new_proxies = proxies.copy()
931
932 if trust_env and not should_bypass_proxies(url, no_proxy=no_proxy):
933 environ_proxies = get_environ_proxies(url, no_proxy=no_proxy)
934
935 proxy = environ_proxies.get(scheme, environ_proxies.get("all"))
936
937 if proxy:
938 new_proxies.setdefault(scheme, proxy)
939 return new_proxies
940
941
942def default_user_agent(name: str = "python-requests") -> str:
943 """
944 Return a string representing the default user agent.
945
946 :rtype: str
947 """
948 return f"{name}/{__version__}"
949
950
951def default_headers() -> CaseInsensitiveDict[str]:
952 """
953 :rtype: requests.structures.CaseInsensitiveDict
954 """
955 return CaseInsensitiveDict(
956 {
957 "User-Agent": default_user_agent(),
958 "Accept-Encoding": DEFAULT_ACCEPT_ENCODING,
959 "Accept": "*/*",
960 "Connection": "keep-alive",
961 }
962 )
963
964
965def parse_header_links(value: str) -> list[dict[str, str]]:
966 """Return a list of parsed link headers proxies.
967
968 i.e. Link: <http:/.../front.jpeg>; rel=front; type="image/jpeg",<http://.../back.jpeg>; rel=back;type="image/jpeg"
969
970 :rtype: list
971 """
972
973 links: list[dict[str, str]] = []
974
975 replace_chars = " '\""
976
977 value = value.strip(replace_chars)
978 if not value:
979 return links
980
981 for val in re.split(", *<", value):
982 try:
983 url, params = val.split(";", 1)
984 except ValueError:
985 url, params = val, ""
986
987 link: dict[str, str] = {"url": url.strip("<> '\"")}
988
989 for param in params.split(";"):
990 try:
991 key, value = param.split("=")
992 except ValueError:
993 break
994
995 link[key.strip(replace_chars)] = value.strip(replace_chars)
996
997 links.append(link)
998
999 return links
1000
1001
1002# Null bytes; no need to recreate these on each call to guess_json_utf
1003_null = "\x00".encode("ascii") # encoding to ASCII for Python 3
1004_null2 = _null * 2
1005_null3 = _null * 3
1006
1007
1008def guess_json_utf(data: bytes) -> str | None:
1009 """
1010 :rtype: str
1011 """
1012 # JSON always starts with two ASCII characters, so detection is as
1013 # easy as counting the nulls and from their location and count
1014 # determine the encoding. Also detect a BOM, if present.
1015 sample = data[:4]
1016 if sample in (codecs.BOM_UTF32_LE, codecs.BOM_UTF32_BE):
1017 return "utf-32" # BOM included
1018 if sample[:3] == codecs.BOM_UTF8:
1019 return "utf-8-sig" # BOM included, MS style (discouraged)
1020 if sample[:2] in (codecs.BOM_UTF16_LE, codecs.BOM_UTF16_BE):
1021 return "utf-16" # BOM included
1022 nullcount = sample.count(_null)
1023 if nullcount == 0:
1024 return "utf-8"
1025 if nullcount == 2:
1026 if sample[::2] == _null2: # 1st and 3rd are null
1027 return "utf-16-be"
1028 if sample[1::2] == _null2: # 2nd and 4th are null
1029 return "utf-16-le"
1030 # Did not detect 2 valid UTF-16 ascii-range characters
1031 if nullcount == 3:
1032 if sample[:3] == _null3:
1033 return "utf-32-be"
1034 if sample[1:] == _null3:
1035 return "utf-32-le"
1036 # Did not detect a valid UTF-32 ascii-range character
1037 return None
1038
1039
1040def prepend_scheme_if_needed(url: str, new_scheme: str) -> str:
1041 """Given a URL that may or may not have a scheme, prepend the given scheme.
1042 Does not replace a present scheme with the one provided as an argument.
1043
1044 :rtype: str
1045 """
1046 parsed = parse_url(url)
1047 scheme, auth, _host, _port, path, query, fragment = parsed
1048
1049 # A defect in urlparse determines that there isn't a netloc present in some
1050 # urls. We previously assumed parsing was overly cautious, and swapped the
1051 # netloc and path. Due to a lack of tests on the original defect, this is
1052 # maintained with parse_url for backwards compatibility.
1053 netloc = parsed.netloc
1054 if not netloc:
1055 netloc, path = path, netloc
1056
1057 if auth:
1058 # parse_url doesn't provide the netloc with auth
1059 # so we'll add it ourselves.
1060 netloc = cast(str, netloc)
1061 netloc = "@".join([auth, netloc])
1062 if scheme is None:
1063 scheme = new_scheme
1064 if path is None:
1065 path = ""
1066
1067 return urlunparse((scheme, netloc, path, "", query, fragment))
1068
1069
1070def get_auth_from_url(url: str) -> tuple[str, str]:
1071 """Given a url with authentication components, extract them into a tuple of
1072 username,password.
1073
1074 :rtype: (str,str)
1075 """
1076 parsed = urlparse(url)
1077
1078 try:
1079 # except handles parsed.username/password being None
1080 auth = (unquote(parsed.username), unquote(parsed.password)) # type: ignore[arg-type]
1081 except (AttributeError, TypeError):
1082 auth = ("", "")
1083
1084 return auth
1085
1086
1087def check_header_validity(header: tuple[str | bytes, str | bytes]) -> None:
1088 """Verifies that header parts don't contain leading whitespace
1089 reserved characters, or return characters.
1090
1091 :param header: tuple, in the format (name, value).
1092 """
1093 name, value = header
1094 _validate_header_part(header, name, 0)
1095 _validate_header_part(header, value, 1)
1096
1097
1098def _validate_header_part(
1099 header: tuple[str | bytes, str | bytes],
1100 header_part: str | bytes,
1101 header_validator_index: int,
1102) -> None:
1103 if isinstance(header_part, str):
1104 validator = _HEADER_VALIDATORS_STR[header_validator_index]
1105 elif isinstance(header_part, bytes): # type: ignore[reportUnnecessaryIsInstance]
1106 # runtime guard for non-str/bytes input
1107 validator = _HEADER_VALIDATORS_BYTE[header_validator_index]
1108 else:
1109 raise InvalidHeader(
1110 f"Header part ({header_part!r}) from {header} "
1111 f"must be of type str or bytes, not {type(header_part)}"
1112 )
1113
1114 if not validator.match(header_part): # type: ignore[arg-type]
1115 header_kind = "name" if header_validator_index == 0 else "value"
1116 raise InvalidHeader(
1117 f"Invalid leading whitespace, reserved character(s), or return "
1118 f"character(s) in header {header_kind}: {header_part!r}"
1119 )
1120
1121
1122def urldefragauth(url: str) -> str:
1123 """
1124 Given a url remove the fragment and the authentication part.
1125
1126 :rtype: str
1127 """
1128 scheme, netloc, path, params, query, _fragment = urlparse(url)
1129
1130 # see func:`prepend_scheme_if_needed`
1131 if not netloc:
1132 netloc, path = path, netloc
1133
1134 netloc = netloc.rsplit("@", 1)[-1]
1135
1136 return urlunparse((scheme, netloc, path, params, query, ""))
1137
1138
1139def rewind_body(prepared_request: PreparedRequest) -> None:
1140 """Move file pointer back to its recorded starting position
1141 so it can be read again on redirect.
1142 """
1143 body_seek = getattr(prepared_request.body, "seek", None)
1144 if body_seek is not None and isinstance(
1145 prepared_request._body_position, # type: ignore[reportPrivateUsage]
1146 integer_types,
1147 ):
1148 try:
1149 body_seek(prepared_request._body_position) # type: ignore[reportPrivateUsage]
1150 except OSError:
1151 raise UnrewindableBodyError(
1152 "An error occurred when rewinding request body for redirect."
1153 )
1154 else:
1155 raise UnrewindableBodyError("Unable to rewind request body for redirect.")