Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/requests/utils.py: 32%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""
2requests.utils
3~~~~~~~~~~~~~~
5This module provides utility functions that are used within Requests
6that are also useful for external consumption.
7"""
9import codecs
10import contextlib
11import io
12import os
13import re
14import socket
15import struct
16import sys
17import tempfile
18import warnings
19import zipfile
20from collections import OrderedDict
22from urllib3.util import make_headers, parse_url
24from . import certs
25from .__version__ import __version__
27# to_native_string is unused here, but imported here for backwards compatibility
28from ._internal_utils import ( # noqa: F401
29 _HEADER_VALIDATORS_BYTE,
30 _HEADER_VALIDATORS_STR,
31 HEADER_VALIDATORS,
32 to_native_string,
33)
34from .compat import (
35 Mapping,
36 basestring,
37 bytes,
38 getproxies,
39 getproxies_environment,
40 integer_types,
41 is_urllib3_1,
42 proxy_bypass,
43 proxy_bypass_environment,
44 quote,
45 str,
46 unquote,
47 urlparse,
48 urlunparse,
49)
50from .compat import parse_http_list as _parse_list_header
51from .cookies import cookiejar_from_dict
52from .exceptions import (
53 FileModeWarning,
54 InvalidHeader,
55 InvalidURL,
56 UnrewindableBodyError,
57)
58from .structures import CaseInsensitiveDict
60NETRC_FILES = (".netrc", "_netrc")
62DEFAULT_CA_BUNDLE_PATH = certs.where()
64DEFAULT_PORTS = {"http": 80, "https": 443}
66# Ensure that ', ' is used to preserve previous delimiter behavior.
67DEFAULT_ACCEPT_ENCODING = ", ".join(
68 re.split(r",\s*", make_headers(accept_encoding=True)["accept-encoding"])
69)
72if sys.platform == "win32":
73 # provide a proxy_bypass version on Windows without DNS lookups
75 def proxy_bypass_registry(host):
76 try:
77 import winreg
78 except ImportError:
79 return False
81 try:
82 internetSettings = winreg.OpenKey(
83 winreg.HKEY_CURRENT_USER,
84 r"Software\Microsoft\Windows\CurrentVersion\Internet Settings",
85 )
86 # ProxyEnable could be REG_SZ or REG_DWORD, normalizing it
87 proxyEnable = int(winreg.QueryValueEx(internetSettings, "ProxyEnable")[0])
88 # ProxyOverride is almost always a string
89 proxyOverride = winreg.QueryValueEx(internetSettings, "ProxyOverride")[0]
90 except (OSError, ValueError):
91 return False
92 if not proxyEnable or not proxyOverride:
93 return False
95 # make a check value list from the registry entry: replace the
96 # '<local>' string by the localhost entry and the corresponding
97 # canonical entry.
98 proxyOverride = proxyOverride.split(";")
99 # filter out empty strings to avoid re.match return true in the following code.
100 proxyOverride = filter(None, proxyOverride)
101 # now check if we match one of the registry values.
102 for test in proxyOverride:
103 if test == "<local>":
104 if "." not in host:
105 return True
106 test = test.replace(".", r"\.") # mask dots
107 test = test.replace("*", r".*") # change glob sequence
108 test = test.replace("?", r".") # change glob char
109 if re.match(test, host, re.I):
110 return True
111 return False
113 def proxy_bypass(host): # noqa
114 """Return True, if the host should be bypassed.
116 Checks proxy settings gathered from the environment, if specified,
117 or the registry.
118 """
119 if getproxies_environment():
120 return proxy_bypass_environment(host)
121 else:
122 return proxy_bypass_registry(host)
125def dict_to_sequence(d):
126 """Returns an internal sequence dictionary update."""
128 if hasattr(d, "items"):
129 d = d.items()
131 return d
134def super_len(o):
135 total_length = None
136 current_position = 0
138 if not is_urllib3_1 and isinstance(o, str):
139 # urllib3 2.x+ treats all strings as utf-8 instead
140 # of latin-1 (iso-8859-1) like http.client.
141 o = o.encode("utf-8")
143 if hasattr(o, "__len__"):
144 total_length = len(o)
146 elif hasattr(o, "len"):
147 total_length = o.len
149 elif hasattr(o, "fileno"):
150 try:
151 fileno = o.fileno()
152 except (io.UnsupportedOperation, AttributeError):
153 # AttributeError is a surprising exception, seeing as how we've just checked
154 # that `hasattr(o, 'fileno')`. It happens for objects obtained via
155 # `Tarfile.extractfile()`, per issue 5229.
156 pass
157 else:
158 total_length = os.fstat(fileno).st_size
160 # Having used fstat to determine the file length, we need to
161 # confirm that this file was opened up in binary mode.
162 if "b" not in o.mode:
163 warnings.warn(
164 (
165 "Requests has determined the content-length for this "
166 "request using the binary size of the file: however, the "
167 "file has been opened in text mode (i.e. without the 'b' "
168 "flag in the mode). This may lead to an incorrect "
169 "content-length. In Requests 3.0, support will be removed "
170 "for files in text mode."
171 ),
172 FileModeWarning,
173 )
175 if hasattr(o, "tell"):
176 try:
177 current_position = o.tell()
178 except OSError:
179 # This can happen in some weird situations, such as when the file
180 # is actually a special file descriptor like stdin. In this
181 # instance, we don't know what the length is, so set it to zero and
182 # let requests chunk it instead.
183 if total_length is not None:
184 current_position = total_length
185 else:
186 if hasattr(o, "seek") and total_length is None:
187 # StringIO and BytesIO have seek but no usable fileno
188 try:
189 # seek to end of file
190 o.seek(0, 2)
191 total_length = o.tell()
193 # seek back to current position to support
194 # partially read file-like objects
195 o.seek(current_position or 0)
196 except OSError:
197 total_length = 0
199 if total_length is None:
200 total_length = 0
202 return max(0, total_length - current_position)
205def get_netrc_auth(url, raise_errors=False):
206 """Returns the Requests tuple auth for a given url from netrc."""
208 netrc_file = os.environ.get("NETRC")
209 if netrc_file is not None:
210 netrc_locations = (netrc_file,)
211 else:
212 netrc_locations = (f"~/{f}" for f in NETRC_FILES)
214 try:
215 from netrc import NetrcParseError, netrc
217 netrc_path = None
219 for f in netrc_locations:
220 loc = os.path.expanduser(f)
221 if os.path.exists(loc):
222 netrc_path = loc
223 break
225 # Abort early if there isn't one.
226 if netrc_path is None:
227 return
229 ri = urlparse(url)
230 host = ri.hostname
232 try:
233 _netrc = netrc(netrc_path).authenticators(host)
234 if _netrc and any(_netrc):
235 # Return with login / password
236 login_i = 0 if _netrc[0] else 1
237 return (_netrc[login_i], _netrc[2])
238 except (NetrcParseError, OSError):
239 # If there was a parsing error or a permissions issue reading the file,
240 # we'll just skip netrc auth unless explicitly asked to raise errors.
241 if raise_errors:
242 raise
244 # App Engine hackiness.
245 except (ImportError, AttributeError):
246 pass
249def guess_filename(obj):
250 """Tries to guess the filename of the given object."""
251 name = getattr(obj, "name", None)
252 if name and isinstance(name, basestring) and name[0] != "<" and name[-1] != ">":
253 return os.path.basename(name)
256def extract_zipped_paths(path):
257 """Replace nonexistent paths that look like they refer to a member of a zip
258 archive with the location of an extracted copy of the target, or else
259 just return the provided path unchanged.
260 """
261 if os.path.exists(path):
262 # this is already a valid path, no need to do anything further
263 return path
265 # find the first valid part of the provided path and treat that as a zip archive
266 # assume the rest of the path is the name of a member in the archive
267 archive, member = os.path.split(path)
268 while archive and not os.path.exists(archive):
269 archive, prefix = os.path.split(archive)
270 if not prefix:
271 # If we don't check for an empty prefix after the split (in other words, archive remains unchanged after the split),
272 # we _can_ end up in an infinite loop on a rare corner case affecting a small number of users
273 break
274 member = "/".join([prefix, member])
276 if not zipfile.is_zipfile(archive):
277 return path
279 zip_file = zipfile.ZipFile(archive)
280 if member not in zip_file.namelist():
281 return path
283 # we have a valid zip archive and a valid member of that archive
284 tmp = tempfile.gettempdir()
285 extracted_path = os.path.join(tmp, member.split("/")[-1])
286 if not os.path.exists(extracted_path):
287 # use read + write to avoid the creating nested folders, we only want the file, avoids mkdir racing condition
288 with atomic_open(extracted_path) as file_handler:
289 file_handler.write(zip_file.read(member))
290 return extracted_path
293@contextlib.contextmanager
294def atomic_open(filename):
295 """Write a file to the disk in an atomic fashion"""
296 tmp_descriptor, tmp_name = tempfile.mkstemp(dir=os.path.dirname(filename))
297 try:
298 with os.fdopen(tmp_descriptor, "wb") as tmp_handler:
299 yield tmp_handler
300 os.replace(tmp_name, filename)
301 except BaseException:
302 os.remove(tmp_name)
303 raise
306def from_key_val_list(value):
307 """Take an object and test to see if it can be represented as a
308 dictionary. Unless it can not be represented as such, return an
309 OrderedDict, e.g.,
311 ::
313 >>> from_key_val_list([('key', 'val')])
314 OrderedDict([('key', 'val')])
315 >>> from_key_val_list('string')
316 Traceback (most recent call last):
317 ...
318 ValueError: cannot encode objects that are not 2-tuples
319 >>> from_key_val_list({'key': 'val'})
320 OrderedDict([('key', 'val')])
322 :rtype: OrderedDict
323 """
324 if value is None:
325 return None
327 if isinstance(value, (str, bytes, bool, int)):
328 raise ValueError("cannot encode objects that are not 2-tuples")
330 return OrderedDict(value)
333def to_key_val_list(value):
334 """Take an object and test to see if it can be represented as a
335 dictionary. If it can be, return a list of tuples, e.g.,
337 ::
339 >>> to_key_val_list([('key', 'val')])
340 [('key', 'val')]
341 >>> to_key_val_list({'key': 'val'})
342 [('key', 'val')]
343 >>> to_key_val_list('string')
344 Traceback (most recent call last):
345 ...
346 ValueError: cannot encode objects that are not 2-tuples
348 :rtype: list
349 """
350 if value is None:
351 return None
353 if isinstance(value, (str, bytes, bool, int)):
354 raise ValueError("cannot encode objects that are not 2-tuples")
356 if isinstance(value, Mapping):
357 value = value.items()
359 return list(value)
362# From mitsuhiko/werkzeug (used with permission).
363def parse_list_header(value):
364 """Parse lists as described by RFC 2068 Section 2.
366 In particular, parse comma-separated lists where the elements of
367 the list may include quoted-strings. A quoted-string could
368 contain a comma. A non-quoted string could have quotes in the
369 middle. Quotes are removed automatically after parsing.
371 It basically works like :func:`parse_set_header` just that items
372 may appear multiple times and case sensitivity is preserved.
374 The return value is a standard :class:`list`:
376 >>> parse_list_header('token, "quoted value"')
377 ['token', 'quoted value']
379 To create a header from the :class:`list` again, use the
380 :func:`dump_header` function.
382 :param value: a string with a list header.
383 :return: :class:`list`
384 :rtype: list
385 """
386 result = []
387 for item in _parse_list_header(value):
388 if item[:1] == item[-1:] == '"':
389 item = unquote_header_value(item[1:-1])
390 result.append(item)
391 return result
394# From mitsuhiko/werkzeug (used with permission).
395def parse_dict_header(value):
396 """Parse lists of key, value pairs as described by RFC 2068 Section 2 and
397 convert them into a python dict:
399 >>> d = parse_dict_header('foo="is a fish", bar="as well"')
400 >>> type(d) is dict
401 True
402 >>> sorted(d.items())
403 [('bar', 'as well'), ('foo', 'is a fish')]
405 If there is no value for a key it will be `None`:
407 >>> parse_dict_header('key_without_value')
408 {'key_without_value': None}
410 To create a header from the :class:`dict` again, use the
411 :func:`dump_header` function.
413 :param value: a string with a dict header.
414 :return: :class:`dict`
415 :rtype: dict
416 """
417 result = {}
418 for item in _parse_list_header(value):
419 if "=" not in item:
420 result[item] = None
421 continue
422 name, value = item.split("=", 1)
423 if value[:1] == value[-1:] == '"':
424 value = unquote_header_value(value[1:-1])
425 result[name] = value
426 return result
429# From mitsuhiko/werkzeug (used with permission).
430def unquote_header_value(value, is_filename=False):
431 r"""Unquotes a header value. (Reversal of :func:`quote_header_value`).
432 This does not use the real unquoting but what browsers are actually
433 using for quoting.
435 :param value: the header value to unquote.
436 :rtype: str
437 """
438 if value and value[0] == value[-1] == '"':
439 # this is not the real unquoting, but fixing this so that the
440 # RFC is met will result in bugs with internet explorer and
441 # probably some other browsers as well. IE for example is
442 # uploading files with "C:\foo\bar.txt" as filename
443 value = value[1:-1]
445 # if this is a filename and the starting characters look like
446 # a UNC path, then just return the value without quotes. Using the
447 # replace sequence below on a UNC path has the effect of turning
448 # the leading double slash into a single slash and then
449 # _fix_ie_filename() doesn't work correctly. See #458.
450 if not is_filename or value[:2] != "\\\\":
451 return value.replace("\\\\", "\\").replace('\\"', '"')
452 return value
455def dict_from_cookiejar(cj):
456 """Returns a key/value dictionary from a CookieJar.
458 :param cj: CookieJar object to extract cookies from.
459 :rtype: dict
460 """
462 cookie_dict = {cookie.name: cookie.value for cookie in cj}
463 return cookie_dict
466def add_dict_to_cookiejar(cj, cookie_dict):
467 """Returns a CookieJar from a key/value dictionary.
469 :param cj: CookieJar to insert cookies into.
470 :param cookie_dict: Dict of key/values to insert into CookieJar.
471 :rtype: CookieJar
472 """
474 return cookiejar_from_dict(cookie_dict, cj)
477def get_encodings_from_content(content):
478 """Returns encodings from given content string.
480 :param content: bytestring to extract encodings from.
481 """
482 warnings.warn(
483 (
484 "In requests 3.0, get_encodings_from_content will be removed. For "
485 "more information, please see the discussion on issue #2266. (This"
486 " warning should only appear once.)"
487 ),
488 DeprecationWarning,
489 )
491 charset_re = re.compile(r'<meta.*?charset=["\']*(.+?)["\'>]', flags=re.I)
492 pragma_re = re.compile(r'<meta.*?content=["\']*;?charset=(.+?)["\'>]', flags=re.I)
493 xml_re = re.compile(r'^<\?xml.*?encoding=["\']*(.+?)["\'>]')
495 return (
496 charset_re.findall(content)
497 + pragma_re.findall(content)
498 + xml_re.findall(content)
499 )
502def _parse_content_type_header(header):
503 """Returns content type and parameters from given header
505 :param header: string
506 :return: tuple containing content type and dictionary of
507 parameters
508 """
510 tokens = header.split(";")
511 content_type, params = tokens[0].strip(), tokens[1:]
512 params_dict = {}
513 items_to_strip = "\"' "
515 for param in params:
516 param = param.strip()
517 if param:
518 key, value = param, True
519 index_of_equals = param.find("=")
520 if index_of_equals != -1:
521 key = param[:index_of_equals].strip(items_to_strip)
522 value = param[index_of_equals + 1 :].strip(items_to_strip)
523 params_dict[key.lower()] = value
524 return content_type, params_dict
527def get_encoding_from_headers(headers):
528 """Returns encodings from given HTTP Header Dict.
530 :param headers: dictionary to extract encoding from.
531 :rtype: str
532 """
534 content_type = headers.get("content-type")
536 if not content_type:
537 return None
539 content_type, params = _parse_content_type_header(content_type)
541 if "charset" in params:
542 return params["charset"].strip("'\"")
544 if "text" in content_type:
545 return "ISO-8859-1"
547 if "application/json" in content_type:
548 # Assume UTF-8 based on RFC 4627: https://www.ietf.org/rfc/rfc4627.txt since the charset was unset
549 return "utf-8"
552def stream_decode_response_unicode(iterator, r):
553 """Stream decodes an iterator."""
555 if r.encoding is None:
556 yield from iterator
557 return
559 decoder = codecs.getincrementaldecoder(r.encoding)(errors="replace")
560 for chunk in iterator:
561 rv = decoder.decode(chunk)
562 if rv:
563 yield rv
564 rv = decoder.decode(b"", final=True)
565 if rv:
566 yield rv
569def iter_slices(string, slice_length):
570 """Iterate over slices of a string."""
571 pos = 0
572 if slice_length is None or slice_length <= 0:
573 slice_length = len(string)
574 while pos < len(string):
575 yield string[pos : pos + slice_length]
576 pos += slice_length
579def get_unicode_from_response(r):
580 """Returns the requested content back in unicode.
582 :param r: Response object to get unicode content from.
584 Tried:
586 1. charset from content-type
587 2. fall back and replace all unicode characters
589 :rtype: str
590 """
591 warnings.warn(
592 (
593 "In requests 3.0, get_unicode_from_response will be removed. For "
594 "more information, please see the discussion on issue #2266. (This"
595 " warning should only appear once.)"
596 ),
597 DeprecationWarning,
598 )
600 tried_encodings = []
602 # Try charset from content-type
603 encoding = get_encoding_from_headers(r.headers)
605 if encoding:
606 try:
607 return str(r.content, encoding)
608 except UnicodeError:
609 tried_encodings.append(encoding)
611 # Fall back:
612 try:
613 return str(r.content, encoding, errors="replace")
614 except TypeError:
615 return r.content
618# The unreserved URI characters (RFC 3986)
619UNRESERVED_SET = frozenset(
620 "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" + "0123456789-._~"
621)
624def unquote_unreserved(uri):
625 """Un-escape any percent-escape sequences in a URI that are unreserved
626 characters. This leaves all reserved, illegal and non-ASCII bytes encoded.
628 :rtype: str
629 """
630 parts = uri.split("%")
631 for i in range(1, len(parts)):
632 h = parts[i][0:2]
633 if len(h) == 2 and h.isalnum():
634 try:
635 c = chr(int(h, 16))
636 except ValueError:
637 raise InvalidURL(f"Invalid percent-escape sequence: '{h}'")
639 if c in UNRESERVED_SET:
640 parts[i] = c + parts[i][2:]
641 else:
642 parts[i] = f"%{parts[i]}"
643 else:
644 parts[i] = f"%{parts[i]}"
645 return "".join(parts)
648def requote_uri(uri):
649 """Re-quote the given URI.
651 This function passes the given URI through an unquote/quote cycle to
652 ensure that it is fully and consistently quoted.
654 :rtype: str
655 """
656 safe_with_percent = "!#$%&'()*+,/:;=?@[]~"
657 safe_without_percent = "!#$&'()*+,/:;=?@[]~"
658 try:
659 # Unquote only the unreserved characters
660 # Then quote only illegal characters (do not quote reserved,
661 # unreserved, or '%')
662 return quote(unquote_unreserved(uri), safe=safe_with_percent)
663 except InvalidURL:
664 # We couldn't unquote the given URI, so let's try quoting it, but
665 # there may be unquoted '%'s in the URI. We need to make sure they're
666 # properly quoted so they do not cause issues elsewhere.
667 return quote(uri, safe=safe_without_percent)
670def address_in_network(ip, net):
671 """This function allows you to check if an IP belongs to a network subnet
673 Example: returns True if ip = 192.168.1.1 and net = 192.168.1.0/24
674 returns False if ip = 192.168.1.1 and net = 192.168.100.0/24
676 :rtype: bool
677 """
678 ipaddr = struct.unpack("=L", socket.inet_aton(ip))[0]
679 netaddr, bits = net.split("/")
680 netmask = struct.unpack("=L", socket.inet_aton(dotted_netmask(int(bits))))[0]
681 network = struct.unpack("=L", socket.inet_aton(netaddr))[0] & netmask
682 return (ipaddr & netmask) == (network & netmask)
685def dotted_netmask(mask):
686 """Converts mask from /xx format to xxx.xxx.xxx.xxx
688 Example: if mask is 24 function returns 255.255.255.0
690 :rtype: str
691 """
692 bits = 0xFFFFFFFF ^ (1 << 32 - mask) - 1
693 return socket.inet_ntoa(struct.pack(">I", bits))
696def is_ipv4_address(string_ip):
697 """
698 :rtype: bool
699 """
700 try:
701 socket.inet_aton(string_ip)
702 except OSError:
703 return False
704 return True
707def is_valid_cidr(string_network):
708 """
709 Very simple check of the cidr format in no_proxy variable.
711 :rtype: bool
712 """
713 if string_network.count("/") == 1:
714 try:
715 mask = int(string_network.split("/")[1])
716 except ValueError:
717 return False
719 if mask < 1 or mask > 32:
720 return False
722 try:
723 socket.inet_aton(string_network.split("/")[0])
724 except OSError:
725 return False
726 else:
727 return False
728 return True
731@contextlib.contextmanager
732def set_environ(env_name, value):
733 """Set the environment variable 'env_name' to 'value'
735 Save previous value, yield, and then restore the previous value stored in
736 the environment variable 'env_name'.
738 If 'value' is None, do nothing"""
739 value_changed = value is not None
740 if value_changed:
741 old_value = os.environ.get(env_name)
742 os.environ[env_name] = value
743 try:
744 yield
745 finally:
746 if value_changed:
747 if old_value is None:
748 del os.environ[env_name]
749 else:
750 os.environ[env_name] = old_value
753def should_bypass_proxies(url, no_proxy):
754 """
755 Returns whether we should bypass proxies or not.
757 :rtype: bool
758 """
760 # Prioritize lowercase environment variables over uppercase
761 # to keep a consistent behaviour with other http projects (curl, wget).
762 def get_proxy(key):
763 return os.environ.get(key) or os.environ.get(key.upper())
765 # First check whether no_proxy is defined. If it is, check that the URL
766 # we're getting isn't in the no_proxy list.
767 no_proxy_arg = no_proxy
768 if no_proxy is None:
769 no_proxy = get_proxy("no_proxy")
770 parsed = urlparse(url)
772 if parsed.hostname is None:
773 # URLs don't always have hostnames, e.g. file:/// urls.
774 return True
776 if no_proxy:
777 # We need to check whether we match here. We need to see if we match
778 # the end of the hostname, both with and without the port.
779 no_proxy = (host for host in no_proxy.replace(" ", "").split(",") if host)
781 if is_ipv4_address(parsed.hostname):
782 for proxy_ip in no_proxy:
783 if is_valid_cidr(proxy_ip):
784 if address_in_network(parsed.hostname, proxy_ip):
785 return True
786 elif parsed.hostname == proxy_ip:
787 # If no_proxy ip was defined in plain IP notation instead of cidr notation &
788 # matches the IP of the index
789 return True
790 else:
791 host_with_port = parsed.hostname
792 if parsed.port:
793 host_with_port += f":{parsed.port}"
795 for host in no_proxy:
796 if parsed.hostname.endswith(host) or host_with_port.endswith(host):
797 # The URL does match something in no_proxy, so we don't want
798 # to apply the proxies on this URL.
799 return True
801 with set_environ("no_proxy", no_proxy_arg):
802 # parsed.hostname can be `None` in cases such as a file URI.
803 try:
804 bypass = proxy_bypass(parsed.hostname)
805 except (TypeError, socket.gaierror):
806 bypass = False
808 if bypass:
809 return True
811 return False
814def get_environ_proxies(url, no_proxy=None):
815 """
816 Return a dict of environment proxies.
818 :rtype: dict
819 """
820 if should_bypass_proxies(url, no_proxy=no_proxy):
821 return {}
822 else:
823 return getproxies()
826def select_proxy(url, proxies):
827 """Select a proxy for the url, if applicable.
829 :param url: The url being for the request
830 :param proxies: A dictionary of schemes or schemes and hosts to proxy URLs
831 """
832 proxies = proxies or {}
833 urlparts = urlparse(url)
834 if urlparts.hostname is None:
835 return proxies.get(urlparts.scheme, proxies.get("all"))
837 proxy_keys = [
838 urlparts.scheme + "://" + urlparts.hostname,
839 urlparts.scheme,
840 "all://" + urlparts.hostname,
841 "all",
842 ]
843 proxy = None
844 for proxy_key in proxy_keys:
845 if proxy_key in proxies:
846 proxy = proxies[proxy_key]
847 break
849 return proxy
852def resolve_proxies(request, proxies, trust_env=True):
853 """This method takes proxy information from a request and configuration
854 input to resolve a mapping of target proxies. This will consider settings
855 such as NO_PROXY to strip proxy configurations.
857 :param request: Request or PreparedRequest
858 :param proxies: A dictionary of schemes or schemes and hosts to proxy URLs
859 :param trust_env: Boolean declaring whether to trust environment configs
861 :rtype: dict
862 """
863 proxies = proxies if proxies is not None else {}
864 url = request.url
865 scheme = urlparse(url).scheme
866 no_proxy = proxies.get("no_proxy")
867 new_proxies = proxies.copy()
869 if trust_env and not should_bypass_proxies(url, no_proxy=no_proxy):
870 environ_proxies = get_environ_proxies(url, no_proxy=no_proxy)
872 proxy = environ_proxies.get(scheme, environ_proxies.get("all"))
874 if proxy:
875 new_proxies.setdefault(scheme, proxy)
876 return new_proxies
879def default_user_agent(name="python-requests"):
880 """
881 Return a string representing the default user agent.
883 :rtype: str
884 """
885 return f"{name}/{__version__}"
888def default_headers():
889 """
890 :rtype: requests.structures.CaseInsensitiveDict
891 """
892 return CaseInsensitiveDict(
893 {
894 "User-Agent": default_user_agent(),
895 "Accept-Encoding": DEFAULT_ACCEPT_ENCODING,
896 "Accept": "*/*",
897 "Connection": "keep-alive",
898 }
899 )
902def parse_header_links(value):
903 """Return a list of parsed link headers proxies.
905 i.e. Link: <http:/.../front.jpeg>; rel=front; type="image/jpeg",<http://.../back.jpeg>; rel=back;type="image/jpeg"
907 :rtype: list
908 """
910 links = []
912 replace_chars = " '\""
914 value = value.strip(replace_chars)
915 if not value:
916 return links
918 for val in re.split(", *<", value):
919 try:
920 url, params = val.split(";", 1)
921 except ValueError:
922 url, params = val, ""
924 link = {"url": url.strip("<> '\"")}
926 for param in params.split(";"):
927 try:
928 key, value = param.split("=")
929 except ValueError:
930 break
932 link[key.strip(replace_chars)] = value.strip(replace_chars)
934 links.append(link)
936 return links
939# Null bytes; no need to recreate these on each call to guess_json_utf
940_null = "\x00".encode("ascii") # encoding to ASCII for Python 3
941_null2 = _null * 2
942_null3 = _null * 3
945def guess_json_utf(data):
946 """
947 :rtype: str
948 """
949 # JSON always starts with two ASCII characters, so detection is as
950 # easy as counting the nulls and from their location and count
951 # determine the encoding. Also detect a BOM, if present.
952 sample = data[:4]
953 if sample in (codecs.BOM_UTF32_LE, codecs.BOM_UTF32_BE):
954 return "utf-32" # BOM included
955 if sample[:3] == codecs.BOM_UTF8:
956 return "utf-8-sig" # BOM included, MS style (discouraged)
957 if sample[:2] in (codecs.BOM_UTF16_LE, codecs.BOM_UTF16_BE):
958 return "utf-16" # BOM included
959 nullcount = sample.count(_null)
960 if nullcount == 0:
961 return "utf-8"
962 if nullcount == 2:
963 if sample[::2] == _null2: # 1st and 3rd are null
964 return "utf-16-be"
965 if sample[1::2] == _null2: # 2nd and 4th are null
966 return "utf-16-le"
967 # Did not detect 2 valid UTF-16 ascii-range characters
968 if nullcount == 3:
969 if sample[:3] == _null3:
970 return "utf-32-be"
971 if sample[1:] == _null3:
972 return "utf-32-le"
973 # Did not detect a valid UTF-32 ascii-range character
974 return None
977def prepend_scheme_if_needed(url, new_scheme):
978 """Given a URL that may or may not have a scheme, prepend the given scheme.
979 Does not replace a present scheme with the one provided as an argument.
981 :rtype: str
982 """
983 parsed = parse_url(url)
984 scheme, auth, host, port, path, query, fragment = parsed
986 # A defect in urlparse determines that there isn't a netloc present in some
987 # urls. We previously assumed parsing was overly cautious, and swapped the
988 # netloc and path. Due to a lack of tests on the original defect, this is
989 # maintained with parse_url for backwards compatibility.
990 netloc = parsed.netloc
991 if not netloc:
992 netloc, path = path, netloc
994 if auth:
995 # parse_url doesn't provide the netloc with auth
996 # so we'll add it ourselves.
997 netloc = "@".join([auth, netloc])
998 if scheme is None:
999 scheme = new_scheme
1000 if path is None:
1001 path = ""
1003 return urlunparse((scheme, netloc, path, "", query, fragment))
1006def get_auth_from_url(url):
1007 """Given a url with authentication components, extract them into a tuple of
1008 username,password.
1010 :rtype: (str,str)
1011 """
1012 parsed = urlparse(url)
1014 try:
1015 auth = (unquote(parsed.username), unquote(parsed.password))
1016 except (AttributeError, TypeError):
1017 auth = ("", "")
1019 return auth
1022def check_header_validity(header):
1023 """Verifies that header parts don't contain leading whitespace
1024 reserved characters, or return characters.
1026 :param header: tuple, in the format (name, value).
1027 """
1028 name, value = header
1029 _validate_header_part(header, name, 0)
1030 _validate_header_part(header, value, 1)
1033def _validate_header_part(header, header_part, header_validator_index):
1034 if isinstance(header_part, str):
1035 validator = _HEADER_VALIDATORS_STR[header_validator_index]
1036 elif isinstance(header_part, bytes):
1037 validator = _HEADER_VALIDATORS_BYTE[header_validator_index]
1038 else:
1039 raise InvalidHeader(
1040 f"Header part ({header_part!r}) from {header} "
1041 f"must be of type str or bytes, not {type(header_part)}"
1042 )
1044 if not validator.match(header_part):
1045 header_kind = "name" if header_validator_index == 0 else "value"
1046 raise InvalidHeader(
1047 f"Invalid leading whitespace, reserved character(s), or return "
1048 f"character(s) in header {header_kind}: {header_part!r}"
1049 )
1052def urldefragauth(url):
1053 """
1054 Given a url remove the fragment and the authentication part.
1056 :rtype: str
1057 """
1058 scheme, netloc, path, params, query, fragment = urlparse(url)
1060 # see func:`prepend_scheme_if_needed`
1061 if not netloc:
1062 netloc, path = path, netloc
1064 netloc = netloc.rsplit("@", 1)[-1]
1066 return urlunparse((scheme, netloc, path, params, query, ""))
1069def rewind_body(prepared_request):
1070 """Move file pointer back to its recorded starting position
1071 so it can be read again on redirect.
1072 """
1073 body_seek = getattr(prepared_request.body, "seek", None)
1074 if body_seek is not None and isinstance(
1075 prepared_request._body_position, integer_types
1076 ):
1077 try:
1078 body_seek(prepared_request._body_position)
1079 except OSError:
1080 raise UnrewindableBodyError(
1081 "An error occurred when rewinding request body for redirect."
1082 )
1083 else:
1084 raise UnrewindableBodyError("Unable to rewind request body for redirect.")