Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/requests/utils.py: 17%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""
2requests.utils
3~~~~~~~~~~~~~~
5This module provides utility functions that are used within Requests
6that are also useful for external consumption.
7"""
9from __future__ import annotations
11import codecs
12import contextlib
13import io
14import os
15import re
16import socket
17import struct
18import sys
19import tempfile
20import warnings
21import zipfile
22from collections import OrderedDict
23from collections.abc import Generator, Iterable
24from typing import (
25 TYPE_CHECKING,
26 Any,
27 Final,
28 TypeVar,
29 cast,
30 overload,
31)
33from urllib3.util import make_headers, parse_url
35from . import certs
36from .__version__ import __version__
38# to_native_string is unused here, but imported here for backwards compatibility
39from ._internal_utils import ( # noqa: F401
40 _HEADER_VALIDATORS_BYTE, # type: ignore[reportPrivateUsage]
41 _HEADER_VALIDATORS_STR, # type: ignore[reportPrivateUsage]
42 HEADER_VALIDATORS, # type: ignore[reportUnusedImport]
43 to_native_string, # type: ignore[reportUnusedImport]
44)
45from ._types import SupportsItems as _SupportsItems
46from .compat import (
47 Mapping,
48 bytes,
49 getproxies,
50 getproxies_environment,
51 integer_types,
52 is_urllib3_1,
53 proxy_bypass,
54 proxy_bypass_environment, # type: ignore[attr-defined] # https://github.com/python/cpython/issues/145331
55 quote,
56 str,
57 unquote,
58 urlparse,
59 urlunparse,
60)
61from .compat import parse_http_list as _parse_list_header
62from .cookies import cookiejar_from_dict
63from .exceptions import (
64 FileModeWarning,
65 InvalidHeader,
66 InvalidURL,
67 UnrewindableBodyError,
68)
69from .structures import CaseInsensitiveDict
71if TYPE_CHECKING:
72 from http.cookiejar import CookieJar
73 from io import BufferedWriter
75 from . import _types as _t
76 from .models import PreparedRequest, Request, Response
78NETRC_FILES: Final = (".netrc", "_netrc")
81# Certificate is extracted by certifi when needed.
82DEFAULT_CA_BUNDLE_PATH: str = certs.where()
85DEFAULT_PORTS: Final = {"http": 80, "https": 443}
87_KT = TypeVar("_KT")
88_VT = TypeVar("_VT")
90# Ensure that ', ' is used to preserve previous delimiter behavior.
91DEFAULT_ACCEPT_ENCODING: Final = ", ".join(
92 re.split(r",\s*", make_headers(accept_encoding=True)["accept-encoding"])
93)
96if sys.platform == "win32":
97 # provide a proxy_bypass version on Windows without DNS lookups
99 def proxy_bypass_registry(host: str) -> bool:
100 try:
101 import winreg
102 except ImportError:
103 return False
105 try:
106 internetSettings = winreg.OpenKey(
107 winreg.HKEY_CURRENT_USER,
108 r"Software\Microsoft\Windows\CurrentVersion\Internet Settings",
109 )
110 # ProxyEnable could be REG_SZ or REG_DWORD, normalizing it
111 proxyEnable = int(winreg.QueryValueEx(internetSettings, "ProxyEnable")[0])
112 # ProxyOverride is almost always a string
113 proxyOverride = winreg.QueryValueEx(internetSettings, "ProxyOverride")[0]
114 except (OSError, ValueError):
115 return False
116 if not proxyEnable or not proxyOverride:
117 return False
119 # make a check value list from the registry entry: replace the
120 # '<local>' string by the localhost entry and the corresponding
121 # canonical entry.
122 proxyOverride = proxyOverride.split(";")
123 # filter out empty strings to avoid re.match return true in the following code.
124 proxyOverride = filter(None, proxyOverride)
125 # now check if we match one of the registry values.
126 for test in proxyOverride:
127 if test == "<local>":
128 if "." not in host:
129 return True
130 test = test.replace(".", r"\.") # mask dots
131 test = test.replace("*", r".*") # change glob sequence
132 test = test.replace("?", r".") # change glob char
133 if re.match(test, host, re.I):
134 return True
135 return False
137 def proxy_bypass(host: str) -> bool: # noqa
138 """Return True, if the host should be bypassed.
140 Checks proxy settings gathered from the environment, if specified,
141 or the registry.
142 """
143 if getproxies_environment():
144 return proxy_bypass_environment(host)
145 else:
146 return proxy_bypass_registry(host)
149def dict_to_sequence(
150 d: _t.SupportsItems[Any, Any] | Iterable[tuple[Any, Any]],
151) -> Iterable[tuple[Any, Any]]:
152 """Returns an internal sequence dictionary update."""
154 if isinstance(d, _SupportsItems):
155 return d.items()
157 return d
160def super_len(o: Any) -> int:
161 total_length = None
162 current_position = 0
164 if not is_urllib3_1 and isinstance(o, str):
165 # urllib3 2.x+ treats all strings as utf-8 instead
166 # of latin-1 (iso-8859-1) like http.client.
167 o = o.encode("utf-8")
169 if hasattr(o, "__len__"):
170 total_length = len(o)
172 elif hasattr(o, "len"):
173 total_length = o.len
175 elif hasattr(o, "fileno"):
176 try:
177 fileno = o.fileno()
178 except (io.UnsupportedOperation, AttributeError):
179 # AttributeError is a surprising exception, seeing as how we've just checked
180 # that `hasattr(o, 'fileno')`. It happens for objects obtained via
181 # `Tarfile.extractfile()`, per issue 5229.
182 pass
183 else:
184 total_length = os.fstat(fileno).st_size
186 # Having used fstat to determine the file length, we need to
187 # confirm that this file was opened up in binary mode.
188 if "b" not in o.mode:
189 warnings.warn(
190 (
191 "Requests has determined the content-length for this "
192 "request using the binary size of the file: however, the "
193 "file has been opened in text mode (i.e. without the 'b' "
194 "flag in the mode). This may lead to an incorrect "
195 "content-length. In Requests 3.0, support will be removed "
196 "for files in text mode."
197 ),
198 FileModeWarning,
199 )
201 if hasattr(o, "tell"):
202 try:
203 current_position = o.tell()
204 except OSError:
205 # This can happen in some weird situations, such as when the file
206 # is actually a special file descriptor like stdin. In this
207 # instance, we don't know what the length is, so set it to zero and
208 # let requests chunk it instead.
209 if total_length is not None:
210 current_position = total_length
211 else:
212 if hasattr(o, "seek") and total_length is None:
213 # StringIO and BytesIO have seek but no usable fileno
214 try:
215 # seek to end of file
216 o.seek(0, 2)
217 total_length = o.tell()
219 # seek back to current position to support
220 # partially read file-like objects
221 o.seek(current_position or 0)
222 except OSError:
223 total_length = 0
225 if total_length is None:
226 total_length = 0
228 return max(0, total_length - current_position)
231def get_netrc_auth(
232 url: _t.UriType, raise_errors: bool = False
233) -> tuple[str, str] | None:
234 """Returns the Requests tuple auth for a given url from netrc."""
236 if isinstance(url, bytes):
237 url = url.decode("utf-8")
239 netrc_file = os.environ.get("NETRC")
240 if netrc_file is not None:
241 netrc_locations = (netrc_file,)
242 else:
243 netrc_locations = (f"~/{f}" for f in NETRC_FILES)
245 try:
246 from netrc import NetrcParseError, netrc
248 netrc_path = None
250 for f in netrc_locations:
251 loc = os.path.expanduser(f)
252 if os.path.exists(loc):
253 netrc_path = loc
254 break
256 # Abort early if there isn't one.
257 if netrc_path is None:
258 return
260 ri = urlparse(url)
261 host = ri.hostname
263 if host is None:
264 return
266 try:
267 _netrc = netrc(netrc_path).authenticators(host)
268 if _netrc and any(_netrc):
269 # Return with login / password
270 login_i = 0 if _netrc[0] else 1
271 return (_netrc[login_i] or "", _netrc[2] or "")
272 except (NetrcParseError, OSError):
273 # If there was a parsing error or a permissions issue reading the file,
274 # we'll just skip netrc auth unless explicitly asked to raise errors.
275 if raise_errors:
276 raise
278 # App Engine hackiness.
279 except (ImportError, AttributeError):
280 pass
283def guess_filename(obj: Any) -> str | None:
284 """Tries to guess the filename of the given object."""
285 name = getattr(obj, "name", None)
286 if name and isinstance(name, (str, bytes)) and name[0] != "<" and name[-1] != ">":
287 return os.path.basename(name) # type: ignore[return-value] # urllib3 accepts bytes but types str only
290def extract_zipped_paths(path: str) -> str:
291 """Replace nonexistent paths that look like they refer to a member of a zip
292 archive with the location of an extracted copy of the target, or else
293 just return the provided path unchanged.
294 """
295 if os.path.exists(path):
296 # this is already a valid path, no need to do anything further
297 return path
299 # find the first valid part of the provided path and treat that as a zip archive
300 # assume the rest of the path is the name of a member in the archive
301 archive, member = os.path.split(path)
302 while archive and not os.path.exists(archive):
303 archive, prefix = os.path.split(archive)
304 if not prefix:
305 # If we don't check for an empty prefix after the split (in other words, archive remains unchanged after the split),
306 # we _can_ end up in an infinite loop on a rare corner case affecting a small number of users
307 break
308 member = "/".join([prefix, member])
310 if not zipfile.is_zipfile(archive):
311 return path
313 zip_file = zipfile.ZipFile(archive)
314 if member not in zip_file.namelist():
315 return path
317 # we have a valid zip archive and a valid member of that archive
318 suffix = os.path.splitext(member.split("/")[-1])[-1]
319 fd, extracted_path = tempfile.mkstemp(suffix=suffix)
320 try:
321 os.write(fd, zip_file.read(member))
322 finally:
323 os.close(fd)
325 return extracted_path
328@contextlib.contextmanager
329def atomic_open(filename: str) -> Generator[BufferedWriter, None, None]:
330 """Write a file to the disk in an atomic fashion"""
331 tmp_descriptor, tmp_name = tempfile.mkstemp(dir=os.path.dirname(filename))
332 try:
333 with os.fdopen(tmp_descriptor, "wb") as tmp_handler:
334 yield tmp_handler
335 os.replace(tmp_name, filename)
336 except BaseException:
337 os.remove(tmp_name)
338 raise
341def from_key_val_list(
342 value: Mapping[Any, Any] | Iterable[tuple[Any, Any]] | None,
343) -> dict[Any, Any] | None:
344 """Take an object and test to see if it can be represented as a
345 dictionary. Unless it can not be represented as such, return an
346 OrderedDict, e.g.,
348 ::
350 >>> from_key_val_list([('key', 'val')])
351 OrderedDict([('key', 'val')])
352 >>> from_key_val_list('string')
353 Traceback (most recent call last):
354 ...
355 ValueError: cannot encode objects that are not 2-tuples
356 >>> from_key_val_list({'key': 'val'})
357 OrderedDict([('key', 'val')])
359 :rtype: OrderedDict
360 """
361 if value is None:
362 return None
364 if isinstance(value, (str, bytes, bool, int)):
365 raise ValueError("cannot encode objects that are not 2-tuples")
367 return OrderedDict(value)
370@overload
371def to_key_val_list(value: None) -> None: ...
372@overload
373def to_key_val_list(
374 value: _t.SupportsItems[_KT, _VT] | Iterable[tuple[_KT, _VT]],
375) -> list[tuple[_KT, _VT]]: ...
376def to_key_val_list(
377 value: _t.SupportsItems[_KT, _VT] | Iterable[tuple[_KT, _VT]] | None,
378) -> list[tuple[_KT, _VT]] | None:
379 """Take an object and test to see if it can be represented as a
380 dictionary. If it can be, return a list of tuples, e.g.,
382 ::
384 >>> to_key_val_list([('key', 'val')])
385 [('key', 'val')]
386 >>> to_key_val_list({'key': 'val'})
387 [('key', 'val')]
388 >>> to_key_val_list('string')
389 Traceback (most recent call last):
390 ...
391 ValueError: cannot encode objects that are not 2-tuples
393 :rtype: list
394 """
395 if value is None:
396 return None
398 if isinstance(value, (str, bytes, bool, int)):
399 raise ValueError("cannot encode objects that are not 2-tuples")
401 if isinstance(value, _SupportsItems):
402 return list(value.items())
404 return list(value)
407# From mitsuhiko/werkzeug (used with permission).
408def parse_list_header(value: str) -> list[str]:
409 """Parse lists as described by RFC 2068 Section 2.
411 In particular, parse comma-separated lists where the elements of
412 the list may include quoted-strings. A quoted-string could
413 contain a comma. A non-quoted string could have quotes in the
414 middle. Quotes are removed automatically after parsing.
416 It basically works like :func:`parse_set_header` just that items
417 may appear multiple times and case sensitivity is preserved.
419 The return value is a standard :class:`list`:
421 >>> parse_list_header('token, "quoted value"')
422 ['token', 'quoted value']
424 To create a header from the :class:`list` again, use the
425 :func:`dump_header` function.
427 :param value: a string with a list header.
428 :return: :class:`list`
429 :rtype: list
430 """
431 result: list[str] = []
432 for item in _parse_list_header(value):
433 if item[:1] == item[-1:] == '"':
434 item = unquote_header_value(item[1:-1])
435 result.append(item)
436 return result
439# From mitsuhiko/werkzeug (used with permission).
440def parse_dict_header(value: str) -> dict[str, str | None]:
441 """Parse lists of key, value pairs as described by RFC 2068 Section 2 and
442 convert them into a python dict:
444 >>> d = parse_dict_header('foo="is a fish", bar="as well"')
445 >>> type(d) is dict
446 True
447 >>> sorted(d.items())
448 [('bar', 'as well'), ('foo', 'is a fish')]
450 If there is no value for a key it will be `None`:
452 >>> parse_dict_header('key_without_value')
453 {'key_without_value': None}
455 To create a header from the :class:`dict` again, use the
456 :func:`dump_header` function.
458 :param value: a string with a dict header.
459 :return: :class:`dict`
460 :rtype: dict
461 """
462 result: dict[str, str | None] = {}
463 for item in _parse_list_header(value):
464 if "=" not in item:
465 result[item] = None
466 continue
467 name, value = item.split("=", 1)
468 if value[:1] == value[-1:] == '"':
469 value = unquote_header_value(value[1:-1])
470 result[name] = value
471 return result
474# From mitsuhiko/werkzeug (used with permission).
475def unquote_header_value(value: str, is_filename: bool = False) -> str:
476 r"""Unquotes a header value. (Reversal of :func:`quote_header_value`).
477 This does not use the real unquoting but what browsers are actually
478 using for quoting.
480 :param value: the header value to unquote.
481 :rtype: str
482 """
483 if value and value[0] == value[-1] == '"':
484 # this is not the real unquoting, but fixing this so that the
485 # RFC is met will result in bugs with internet explorer and
486 # probably some other browsers as well. IE for example is
487 # uploading files with "C:\foo\bar.txt" as filename
488 value = value[1:-1]
490 # if this is a filename and the starting characters look like
491 # a UNC path, then just return the value without quotes. Using the
492 # replace sequence below on a UNC path has the effect of turning
493 # the leading double slash into a single slash and then
494 # _fix_ie_filename() doesn't work correctly. See #458.
495 if not is_filename or value[:2] != "\\\\":
496 return value.replace("\\\\", "\\").replace('\\"', '"')
497 return value
500def dict_from_cookiejar(cj: CookieJar) -> dict[str, str | None]:
501 """Returns a key/value dictionary from a CookieJar.
503 :param cj: CookieJar object to extract cookies from.
504 :rtype: dict
505 """
507 cookie_dict = {cookie.name: cookie.value for cookie in cj}
508 return cookie_dict
511def add_dict_to_cookiejar(cj: CookieJar, cookie_dict: dict[str, str]) -> CookieJar:
512 """Returns a CookieJar from a key/value dictionary.
514 :param cj: CookieJar to insert cookies into.
515 :param cookie_dict: Dict of key/values to insert into CookieJar.
516 :rtype: CookieJar
517 """
519 return cookiejar_from_dict(cookie_dict, cj)
522def get_encodings_from_content(content: str) -> list[str]:
523 """Returns encodings from given content string.
525 :param content: bytestring to extract encodings from.
526 """
527 warnings.warn(
528 (
529 "In requests 3.0, get_encodings_from_content will be removed. For "
530 "more information, please see the discussion on issue #2266. (This"
531 " warning should only appear once.)"
532 ),
533 DeprecationWarning,
534 )
536 charset_re = re.compile(r'<meta.*?charset=["\']*(.+?)["\'>]', flags=re.I)
537 pragma_re = re.compile(r'<meta.*?content=["\']*;?charset=(.+?)["\'>]', flags=re.I)
538 xml_re = re.compile(r'^<\?xml.*?encoding=["\']*(.+?)["\'>]')
540 return (
541 charset_re.findall(content)
542 + pragma_re.findall(content)
543 + xml_re.findall(content)
544 )
547def _parse_content_type_header(header: str) -> tuple[str, dict[str, Any]]:
548 """Returns content type and parameters from given header.
550 :param header: string
551 :return: tuple containing content type and dictionary of
552 parameters.
553 """
555 tokens = header.split(";")
556 content_type, params = tokens[0].strip(), tokens[1:]
557 params_dict: dict[str, str | bool] = {}
558 strip_chars = "\"' "
560 for param in params:
561 param = param.strip()
562 if param and (idx := param.find("=")) != -1:
563 key = param[:idx].strip(strip_chars)
564 value = param[idx + 1 :].strip(strip_chars)
565 params_dict[key.lower()] = value
566 return content_type, params_dict
569def get_encoding_from_headers(headers: CaseInsensitiveDict[str]) -> str | None:
570 """Returns encodings from given HTTP Header Dict.
572 :param headers: dictionary to extract encoding from.
573 :rtype: str
574 """
576 content_type = headers.get("content-type")
578 if not content_type:
579 return None
581 content_type, params = _parse_content_type_header(content_type)
583 if "charset" in params:
584 return params["charset"].strip("'\"")
586 if "text" in content_type:
587 return "ISO-8859-1"
589 if "application/json" in content_type:
590 # Assume UTF-8 based on RFC 4627: https://www.ietf.org/rfc/rfc4627.txt since the charset was unset
591 return "utf-8"
594def stream_decode_response_unicode(
595 iterator: Iterable[bytes], r: Response
596) -> Generator[str | bytes, None, None]:
597 """Stream decodes an iterator."""
599 if r.encoding is None:
600 yield from iterator
601 return
603 decoder = codecs.getincrementaldecoder(r.encoding)(errors="replace")
604 for chunk in iterator:
605 rv = decoder.decode(chunk)
606 if rv:
607 yield rv
608 rv = decoder.decode(b"", final=True)
609 if rv:
610 yield rv
613@overload
614def iter_slices(
615 string: bytes, slice_length: int | None
616) -> Generator[bytes, None, None]: ...
617@overload
618def iter_slices(
619 string: str, slice_length: int | None
620) -> Generator[str, None, None]: ...
621def iter_slices(
622 string: bytes | str, slice_length: int | None
623) -> Generator[bytes | str, None, None]:
624 """Iterate over slices of a string."""
625 pos = 0
626 if slice_length is None or slice_length <= 0:
627 slice_length = len(string)
628 while pos < len(string):
629 yield string[pos : pos + slice_length]
630 pos += slice_length
633def get_unicode_from_response(r: Response) -> str | bytes | None:
634 """Returns the requested content back in unicode.
636 :param r: Response object to get unicode content from.
638 Tried:
640 1. charset from content-type
641 2. fall back and replace all unicode characters
643 :rtype: str
644 """
645 warnings.warn(
646 (
647 "In requests 3.0, get_unicode_from_response will be removed. For "
648 "more information, please see the discussion on issue #2266. (This"
649 " warning should only appear once.)"
650 ),
651 DeprecationWarning,
652 )
653 if r.content is None: # type: ignore[reportUnnecessaryComparison]
654 return None
656 tried_encodings: list[str] = []
658 # Try charset from content-type
659 encoding = get_encoding_from_headers(r.headers)
661 if encoding:
662 try:
663 return str(r.content, encoding)
664 except UnicodeError:
665 tried_encodings.append(encoding)
667 # Fall back:
668 try:
669 return str(r.content, encoding or "utf-8", errors="replace")
670 except TypeError:
671 return r.content
674# The unreserved URI characters (RFC 3986)
675UNRESERVED_SET: Final = frozenset(
676 "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" + "0123456789-._~"
677)
680def unquote_unreserved(uri: str) -> str:
681 """Un-escape any percent-escape sequences in a URI that are unreserved
682 characters. This leaves all reserved, illegal and non-ASCII bytes encoded.
684 :rtype: str
685 """
686 parts = uri.split("%")
687 for i in range(1, len(parts)):
688 h = parts[i][0:2]
689 if len(h) == 2 and h.isalnum():
690 try:
691 c = chr(int(h, 16))
692 except ValueError:
693 raise InvalidURL(f"Invalid percent-escape sequence: '{h}'")
695 if c in UNRESERVED_SET:
696 parts[i] = c + parts[i][2:]
697 else:
698 parts[i] = f"%{parts[i]}"
699 else:
700 parts[i] = f"%{parts[i]}"
701 return "".join(parts)
704def requote_uri(uri: str) -> str:
705 """Re-quote the given URI.
707 This function passes the given URI through an unquote/quote cycle to
708 ensure that it is fully and consistently quoted.
710 :rtype: str
711 """
712 safe_with_percent = "!#$%&'()*+,/:;=?@[]~"
713 safe_without_percent = "!#$&'()*+,/:;=?@[]~"
714 try:
715 # Unquote only the unreserved characters
716 # Then quote only illegal characters (do not quote reserved,
717 # unreserved, or '%')
718 return quote(unquote_unreserved(uri), safe=safe_with_percent)
719 except InvalidURL:
720 # We couldn't unquote the given URI, so let's try quoting it, but
721 # there may be unquoted '%'s in the URI. We need to make sure they're
722 # properly quoted so they do not cause issues elsewhere.
723 return quote(uri, safe=safe_without_percent)
726def address_in_network(ip: str, net: str) -> bool:
727 """This function allows you to check if an IP belongs to a network subnet
729 Example: returns True if ip = 192.168.1.1 and net = 192.168.1.0/24
730 returns False if ip = 192.168.1.1 and net = 192.168.100.0/24
732 :rtype: bool
733 """
734 ipaddr = struct.unpack("=L", socket.inet_aton(ip))[0]
735 netaddr, bits = net.split("/")
736 netmask = struct.unpack("=L", socket.inet_aton(dotted_netmask(int(bits))))[0]
737 network = struct.unpack("=L", socket.inet_aton(netaddr))[0] & netmask
738 return (ipaddr & netmask) == (network & netmask)
741def dotted_netmask(mask: int) -> str:
742 """Converts mask from /xx format to xxx.xxx.xxx.xxx
744 Example: if mask is 24 function returns 255.255.255.0
746 :rtype: str
747 """
748 bits = 0xFFFFFFFF ^ (1 << 32 - mask) - 1
749 return socket.inet_ntoa(struct.pack(">I", bits))
752def is_ipv4_address(string_ip: str) -> bool:
753 """
754 :rtype: bool
755 """
756 try:
757 socket.inet_aton(string_ip)
758 except OSError:
759 return False
760 return True
763def is_valid_cidr(string_network: str) -> bool:
764 """
765 Very simple check of the cidr format in no_proxy variable.
767 :rtype: bool
768 """
769 if string_network.count("/") == 1:
770 try:
771 mask = int(string_network.split("/")[1])
772 except ValueError:
773 return False
775 if mask < 1 or mask > 32:
776 return False
778 try:
779 socket.inet_aton(string_network.split("/")[0])
780 except OSError:
781 return False
782 else:
783 return False
784 return True
787@contextlib.contextmanager
788def set_environ(env_name: str, value: str | None) -> Generator[None, None, None]:
789 """Set the environment variable 'env_name' to 'value'
791 Save previous value, yield, and then restore the previous value stored in
792 the environment variable 'env_name'.
794 If 'value' is None, do nothing"""
795 value_changed = value is not None
796 old_value: str | None = None
797 if value_changed:
798 old_value = os.environ.get(env_name)
799 os.environ[env_name] = value
800 try:
801 yield
802 finally:
803 if value_changed:
804 if old_value is None:
805 del os.environ[env_name]
806 else:
807 os.environ[env_name] = old_value
810def should_bypass_proxies(url: str, no_proxy: str | None) -> bool:
811 """
812 Returns whether we should bypass proxies or not.
814 :rtype: bool
815 """
817 # Prioritize lowercase environment variables over uppercase
818 # to keep a consistent behaviour with other http projects (curl, wget).
819 def get_proxy(key: str) -> str | None:
820 return os.environ.get(key) or os.environ.get(key.upper())
822 # First check whether no_proxy is defined. If it is, check that the URL
823 # we're getting isn't in the no_proxy list.
824 no_proxy_arg = no_proxy
825 if no_proxy is None:
826 no_proxy = get_proxy("no_proxy")
827 parsed = urlparse(url)
828 hostname = parsed.hostname
830 if hostname is None:
831 # URLs don't always have hostnames, e.g. file:/// urls.
832 return True
834 if no_proxy:
835 # We need to check whether we match here. We need to see if we match
836 # the end of the hostname, both with and without the port.
837 no_proxy_hosts = (host for host in no_proxy.replace(" ", "").split(",") if host)
839 if is_ipv4_address(hostname):
840 for proxy_ip in no_proxy_hosts:
841 if is_valid_cidr(proxy_ip):
842 if address_in_network(hostname, proxy_ip):
843 return True
844 elif hostname == proxy_ip:
845 # If no_proxy ip was defined in plain IP notation instead of cidr notation &
846 # matches the IP of the index
847 return True
848 else:
849 host_with_port = hostname
850 if parsed.port:
851 host_with_port += f":{parsed.port}"
853 for host in no_proxy_hosts:
854 host = host.lstrip(".")
855 if hostname == host or host_with_port == host:
856 return True
857 host = "." + host
858 if hostname.endswith(host) or host_with_port.endswith(host):
859 return True
861 with set_environ("no_proxy", no_proxy_arg):
862 try:
863 bypass = proxy_bypass(hostname)
864 except (TypeError, socket.gaierror):
865 bypass = False
867 if bypass:
868 return True
870 return False
873def get_environ_proxies(url: str, no_proxy: str | None = None) -> dict[str, str]:
874 """
875 Return a dict of environment proxies.
877 :rtype: dict
878 """
879 if should_bypass_proxies(url, no_proxy=no_proxy):
880 return {}
881 else:
882 return getproxies()
885def select_proxy(url: str, proxies: dict[str, str] | None) -> str | None:
886 """Select a proxy for the url, if applicable.
888 :param url: The url being for the request
889 :param proxies: A dictionary of schemes or schemes and hosts to proxy URLs
890 """
891 proxies = proxies or {}
892 urlparts = urlparse(url)
893 if urlparts.hostname is None:
894 return proxies.get(urlparts.scheme, proxies.get("all"))
896 proxy_keys = [
897 urlparts.scheme + "://" + urlparts.hostname,
898 urlparts.scheme,
899 "all://" + urlparts.hostname,
900 "all",
901 ]
902 proxy = None
903 for proxy_key in proxy_keys:
904 if proxy_key in proxies:
905 proxy = proxies[proxy_key]
906 break
908 return proxy
911def resolve_proxies(
912 request: Request | PreparedRequest,
913 proxies: dict[str, str] | None,
914 trust_env: bool = True,
915) -> dict[str, str]:
916 """This method takes proxy information from a request and configuration
917 input to resolve a mapping of target proxies. This will consider settings
918 such as NO_PROXY to strip proxy configurations.
920 :param request: Request or PreparedRequest
921 :param proxies: A dictionary of schemes or schemes and hosts to proxy URLs
922 :param trust_env: Boolean declaring whether to trust environment configs
924 :rtype: dict
925 """
926 proxies = proxies if proxies is not None else {}
927 url = cast(str, request.url)
928 scheme = urlparse(url).scheme
929 no_proxy = proxies.get("no_proxy")
930 new_proxies = proxies.copy()
932 if trust_env and not should_bypass_proxies(url, no_proxy=no_proxy):
933 environ_proxies = get_environ_proxies(url, no_proxy=no_proxy)
935 proxy = environ_proxies.get(scheme, environ_proxies.get("all"))
937 if proxy:
938 new_proxies.setdefault(scheme, proxy)
939 return new_proxies
942def default_user_agent(name: str = "python-requests") -> str:
943 """
944 Return a string representing the default user agent.
946 :rtype: str
947 """
948 return f"{name}/{__version__}"
951def default_headers() -> CaseInsensitiveDict[str]:
952 """
953 :rtype: requests.structures.CaseInsensitiveDict
954 """
955 return CaseInsensitiveDict(
956 {
957 "User-Agent": default_user_agent(),
958 "Accept-Encoding": DEFAULT_ACCEPT_ENCODING,
959 "Accept": "*/*",
960 "Connection": "keep-alive",
961 }
962 )
965def parse_header_links(value: str) -> list[dict[str, str]]:
966 """Return a list of parsed link headers proxies.
968 i.e. Link: <http:/.../front.jpeg>; rel=front; type="image/jpeg",<http://.../back.jpeg>; rel=back;type="image/jpeg"
970 :rtype: list
971 """
973 links: list[dict[str, str]] = []
975 replace_chars = " '\""
977 value = value.strip(replace_chars)
978 if not value:
979 return links
981 for val in re.split(", *<", value):
982 try:
983 url, params = val.split(";", 1)
984 except ValueError:
985 url, params = val, ""
987 link: dict[str, str] = {"url": url.strip("<> '\"")}
989 for param in params.split(";"):
990 try:
991 key, value = param.split("=")
992 except ValueError:
993 break
995 link[key.strip(replace_chars)] = value.strip(replace_chars)
997 links.append(link)
999 return links
1002# Null bytes; no need to recreate these on each call to guess_json_utf
1003_null = "\x00".encode("ascii") # encoding to ASCII for Python 3
1004_null2 = _null * 2
1005_null3 = _null * 3
1008def guess_json_utf(data: bytes) -> str | None:
1009 """
1010 :rtype: str
1011 """
1012 # JSON always starts with two ASCII characters, so detection is as
1013 # easy as counting the nulls and from their location and count
1014 # determine the encoding. Also detect a BOM, if present.
1015 sample = data[:4]
1016 if sample in (codecs.BOM_UTF32_LE, codecs.BOM_UTF32_BE):
1017 return "utf-32" # BOM included
1018 if sample[:3] == codecs.BOM_UTF8:
1019 return "utf-8-sig" # BOM included, MS style (discouraged)
1020 if sample[:2] in (codecs.BOM_UTF16_LE, codecs.BOM_UTF16_BE):
1021 return "utf-16" # BOM included
1022 nullcount = sample.count(_null)
1023 if nullcount == 0:
1024 return "utf-8"
1025 if nullcount == 2:
1026 if sample[::2] == _null2: # 1st and 3rd are null
1027 return "utf-16-be"
1028 if sample[1::2] == _null2: # 2nd and 4th are null
1029 return "utf-16-le"
1030 # Did not detect 2 valid UTF-16 ascii-range characters
1031 if nullcount == 3:
1032 if sample[:3] == _null3:
1033 return "utf-32-be"
1034 if sample[1:] == _null3:
1035 return "utf-32-le"
1036 # Did not detect a valid UTF-32 ascii-range character
1037 return None
1040def prepend_scheme_if_needed(url: str, new_scheme: str) -> str:
1041 """Given a URL that may or may not have a scheme, prepend the given scheme.
1042 Does not replace a present scheme with the one provided as an argument.
1044 :rtype: str
1045 """
1046 parsed = parse_url(url)
1047 scheme, auth, _host, _port, path, query, fragment = parsed
1049 # A defect in urlparse determines that there isn't a netloc present in some
1050 # urls. We previously assumed parsing was overly cautious, and swapped the
1051 # netloc and path. Due to a lack of tests on the original defect, this is
1052 # maintained with parse_url for backwards compatibility.
1053 netloc = parsed.netloc
1054 if not netloc:
1055 netloc, path = path, netloc
1057 if auth:
1058 # parse_url doesn't provide the netloc with auth
1059 # so we'll add it ourselves.
1060 netloc = cast(str, netloc)
1061 netloc = "@".join([auth, netloc])
1062 if scheme is None:
1063 scheme = new_scheme
1064 if path is None:
1065 path = ""
1067 return urlunparse((scheme, netloc, path, "", query, fragment))
1070def get_auth_from_url(url: str) -> tuple[str, str]:
1071 """Given a url with authentication components, extract them into a tuple of
1072 username,password.
1074 :rtype: (str,str)
1075 """
1076 parsed = urlparse(url)
1078 try:
1079 # except handles parsed.username/password being None
1080 auth = (unquote(parsed.username), unquote(parsed.password)) # type: ignore[arg-type]
1081 except (AttributeError, TypeError):
1082 auth = ("", "")
1084 return auth
1087def check_header_validity(header: tuple[str | bytes, str | bytes]) -> None:
1088 """Verifies that header parts don't contain leading whitespace
1089 reserved characters, or return characters.
1091 :param header: tuple, in the format (name, value).
1092 """
1093 name, value = header
1094 _validate_header_part(header, name, 0)
1095 _validate_header_part(header, value, 1)
1098def _validate_header_part(
1099 header: tuple[str | bytes, str | bytes],
1100 header_part: str | bytes,
1101 header_validator_index: int,
1102) -> None:
1103 if isinstance(header_part, str):
1104 validator = _HEADER_VALIDATORS_STR[header_validator_index]
1105 elif isinstance(header_part, bytes): # type: ignore[reportUnnecessaryIsInstance]
1106 # runtime guard for non-str/bytes input
1107 validator = _HEADER_VALIDATORS_BYTE[header_validator_index]
1108 else:
1109 raise InvalidHeader(
1110 f"Header part ({header_part!r}) from {header} "
1111 f"must be of type str or bytes, not {type(header_part)}"
1112 )
1114 if not validator.match(header_part): # type: ignore[arg-type]
1115 header_kind = "name" if header_validator_index == 0 else "value"
1116 raise InvalidHeader(
1117 f"Invalid leading whitespace, reserved character(s), or return "
1118 f"character(s) in header {header_kind}: {header_part!r}"
1119 )
1122def urldefragauth(url: str) -> str:
1123 """
1124 Given a url remove the fragment and the authentication part.
1126 :rtype: str
1127 """
1128 scheme, netloc, path, params, query, _fragment = urlparse(url)
1130 # see func:`prepend_scheme_if_needed`
1131 if not netloc:
1132 netloc, path = path, netloc
1134 netloc = netloc.rsplit("@", 1)[-1]
1136 return urlunparse((scheme, netloc, path, params, query, ""))
1139def rewind_body(prepared_request: PreparedRequest) -> None:
1140 """Move file pointer back to its recorded starting position
1141 so it can be read again on redirect.
1142 """
1143 body_seek = getattr(prepared_request.body, "seek", None)
1144 if body_seek is not None and isinstance(
1145 prepared_request._body_position, # type: ignore[reportPrivateUsage]
1146 integer_types,
1147 ):
1148 try:
1149 body_seek(prepared_request._body_position) # type: ignore[reportPrivateUsage]
1150 except OSError:
1151 raise UnrewindableBodyError(
1152 "An error occurred when rewinding request body for redirect."
1153 )
1154 else:
1155 raise UnrewindableBodyError("Unable to rewind request body for redirect.")