Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/requests/utils.py: 16%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""
2requests.utils
3~~~~~~~~~~~~~~
5This module provides utility functions that are used within Requests
6that are also useful for external consumption.
7"""
9import codecs
10import contextlib
11import io
12import os
13import re
14import socket
15import struct
16import sys
17import tempfile
18import warnings
19import zipfile
20from collections import OrderedDict
22from urllib3.util import make_headers, parse_url
24from . import certs
25from .__version__ import __version__
27# to_native_string is unused here, but imported here for backwards compatibility
28from ._internal_utils import ( # noqa: F401
29 _HEADER_VALIDATORS_BYTE,
30 _HEADER_VALIDATORS_STR,
31 HEADER_VALIDATORS,
32 to_native_string,
33)
34from .compat import (
35 Mapping,
36 basestring,
37 bytes,
38 getproxies,
39 getproxies_environment,
40 integer_types,
41 is_urllib3_1,
42)
43from .compat import parse_http_list as _parse_list_header
44from .compat import (
45 proxy_bypass,
46 proxy_bypass_environment,
47 quote,
48 str,
49 unquote,
50 urlparse,
51 urlunparse,
52)
53from .cookies import cookiejar_from_dict
54from .exceptions import (
55 FileModeWarning,
56 InvalidHeader,
57 InvalidURL,
58 UnrewindableBodyError,
59)
60from .structures import CaseInsensitiveDict
62NETRC_FILES = (".netrc", "_netrc")
64DEFAULT_CA_BUNDLE_PATH = certs.where()
66DEFAULT_PORTS = {"http": 80, "https": 443}
68# Ensure that ', ' is used to preserve previous delimiter behavior.
69DEFAULT_ACCEPT_ENCODING = ", ".join(
70 re.split(r",\s*", make_headers(accept_encoding=True)["accept-encoding"])
71)
74if sys.platform == "win32":
75 # provide a proxy_bypass version on Windows without DNS lookups
77 def proxy_bypass_registry(host):
78 try:
79 import winreg
80 except ImportError:
81 return False
83 try:
84 internetSettings = winreg.OpenKey(
85 winreg.HKEY_CURRENT_USER,
86 r"Software\Microsoft\Windows\CurrentVersion\Internet Settings",
87 )
88 # ProxyEnable could be REG_SZ or REG_DWORD, normalizing it
89 proxyEnable = int(winreg.QueryValueEx(internetSettings, "ProxyEnable")[0])
90 # ProxyOverride is almost always a string
91 proxyOverride = winreg.QueryValueEx(internetSettings, "ProxyOverride")[0]
92 except (OSError, ValueError):
93 return False
94 if not proxyEnable or not proxyOverride:
95 return False
97 # make a check value list from the registry entry: replace the
98 # '<local>' string by the localhost entry and the corresponding
99 # canonical entry.
100 proxyOverride = proxyOverride.split(";")
101 # filter out empty strings to avoid re.match return true in the following code.
102 proxyOverride = filter(None, proxyOverride)
103 # now check if we match one of the registry values.
104 for test in proxyOverride:
105 if test == "<local>":
106 if "." not in host:
107 return True
108 test = test.replace(".", r"\.") # mask dots
109 test = test.replace("*", r".*") # change glob sequence
110 test = test.replace("?", r".") # change glob char
111 if re.match(test, host, re.I):
112 return True
113 return False
115 def proxy_bypass(host): # noqa
116 """Return True, if the host should be bypassed.
118 Checks proxy settings gathered from the environment, if specified,
119 or the registry.
120 """
121 if getproxies_environment():
122 return proxy_bypass_environment(host)
123 else:
124 return proxy_bypass_registry(host)
127def dict_to_sequence(d):
128 """Returns an internal sequence dictionary update."""
130 if hasattr(d, "items"):
131 d = d.items()
133 return d
136def super_len(o):
137 total_length = None
138 current_position = 0
140 if not is_urllib3_1 and isinstance(o, str):
141 # urllib3 2.x+ treats all strings as utf-8 instead
142 # of latin-1 (iso-8859-1) like http.client.
143 o = o.encode("utf-8")
145 if hasattr(o, "__len__"):
146 total_length = len(o)
148 elif hasattr(o, "len"):
149 total_length = o.len
151 elif hasattr(o, "fileno"):
152 try:
153 fileno = o.fileno()
154 except (io.UnsupportedOperation, AttributeError):
155 # AttributeError is a surprising exception, seeing as how we've just checked
156 # that `hasattr(o, 'fileno')`. It happens for objects obtained via
157 # `Tarfile.extractfile()`, per issue 5229.
158 pass
159 else:
160 total_length = os.fstat(fileno).st_size
162 # Having used fstat to determine the file length, we need to
163 # confirm that this file was opened up in binary mode.
164 if "b" not in o.mode:
165 warnings.warn(
166 (
167 "Requests has determined the content-length for this "
168 "request using the binary size of the file: however, the "
169 "file has been opened in text mode (i.e. without the 'b' "
170 "flag in the mode). This may lead to an incorrect "
171 "content-length. In Requests 3.0, support will be removed "
172 "for files in text mode."
173 ),
174 FileModeWarning,
175 )
177 if hasattr(o, "tell"):
178 try:
179 current_position = o.tell()
180 except OSError:
181 # This can happen in some weird situations, such as when the file
182 # is actually a special file descriptor like stdin. In this
183 # instance, we don't know what the length is, so set it to zero and
184 # let requests chunk it instead.
185 if total_length is not None:
186 current_position = total_length
187 else:
188 if hasattr(o, "seek") and total_length is None:
189 # StringIO and BytesIO have seek but no usable fileno
190 try:
191 # seek to end of file
192 o.seek(0, 2)
193 total_length = o.tell()
195 # seek back to current position to support
196 # partially read file-like objects
197 o.seek(current_position or 0)
198 except OSError:
199 total_length = 0
201 if total_length is None:
202 total_length = 0
204 return max(0, total_length - current_position)
207def get_netrc_auth(url, raise_errors=False):
208 """Returns the Requests tuple auth for a given url from netrc."""
210 netrc_file = os.environ.get("NETRC")
211 if netrc_file is not None:
212 netrc_locations = (netrc_file,)
213 else:
214 netrc_locations = (f"~/{f}" for f in NETRC_FILES)
216 try:
217 from netrc import NetrcParseError, netrc
219 netrc_path = None
221 for f in netrc_locations:
222 loc = os.path.expanduser(f)
223 if os.path.exists(loc):
224 netrc_path = loc
225 break
227 # Abort early if there isn't one.
228 if netrc_path is None:
229 return
231 ri = urlparse(url)
232 host = ri.hostname
234 try:
235 _netrc = netrc(netrc_path).authenticators(host)
236 if _netrc:
237 # Return with login / password
238 login_i = 0 if _netrc[0] else 1
239 return (_netrc[login_i], _netrc[2])
240 except (NetrcParseError, OSError):
241 # If there was a parsing error or a permissions issue reading the file,
242 # we'll just skip netrc auth unless explicitly asked to raise errors.
243 if raise_errors:
244 raise
246 # App Engine hackiness.
247 except (ImportError, AttributeError):
248 pass
251def guess_filename(obj):
252 """Tries to guess the filename of the given object."""
253 name = getattr(obj, "name", None)
254 if name and isinstance(name, basestring) and name[0] != "<" and name[-1] != ">":
255 return os.path.basename(name)
258def extract_zipped_paths(path):
259 """Replace nonexistent paths that look like they refer to a member of a zip
260 archive with the location of an extracted copy of the target, or else
261 just return the provided path unchanged.
262 """
263 if os.path.exists(path):
264 # this is already a valid path, no need to do anything further
265 return path
267 # find the first valid part of the provided path and treat that as a zip archive
268 # assume the rest of the path is the name of a member in the archive
269 archive, member = os.path.split(path)
270 while archive and not os.path.exists(archive):
271 archive, prefix = os.path.split(archive)
272 if not prefix:
273 # If we don't check for an empty prefix after the split (in other words, archive remains unchanged after the split),
274 # we _can_ end up in an infinite loop on a rare corner case affecting a small number of users
275 break
276 member = "/".join([prefix, member])
278 if not zipfile.is_zipfile(archive):
279 return path
281 zip_file = zipfile.ZipFile(archive)
282 if member not in zip_file.namelist():
283 return path
285 # we have a valid zip archive and a valid member of that archive
286 tmp = tempfile.gettempdir()
287 extracted_path = os.path.join(tmp, member.split("/")[-1])
288 if not os.path.exists(extracted_path):
289 # use read + write to avoid the creating nested folders, we only want the file, avoids mkdir racing condition
290 with atomic_open(extracted_path) as file_handler:
291 file_handler.write(zip_file.read(member))
292 return extracted_path
295@contextlib.contextmanager
296def atomic_open(filename):
297 """Write a file to the disk in an atomic fashion"""
298 tmp_descriptor, tmp_name = tempfile.mkstemp(dir=os.path.dirname(filename))
299 try:
300 with os.fdopen(tmp_descriptor, "wb") as tmp_handler:
301 yield tmp_handler
302 os.replace(tmp_name, filename)
303 except BaseException:
304 os.remove(tmp_name)
305 raise
308def from_key_val_list(value):
309 """Take an object and test to see if it can be represented as a
310 dictionary. Unless it can not be represented as such, return an
311 OrderedDict, e.g.,
313 ::
315 >>> from_key_val_list([('key', 'val')])
316 OrderedDict([('key', 'val')])
317 >>> from_key_val_list('string')
318 Traceback (most recent call last):
319 ...
320 ValueError: cannot encode objects that are not 2-tuples
321 >>> from_key_val_list({'key': 'val'})
322 OrderedDict([('key', 'val')])
324 :rtype: OrderedDict
325 """
326 if value is None:
327 return None
329 if isinstance(value, (str, bytes, bool, int)):
330 raise ValueError("cannot encode objects that are not 2-tuples")
332 return OrderedDict(value)
335def to_key_val_list(value):
336 """Take an object and test to see if it can be represented as a
337 dictionary. If it can be, return a list of tuples, e.g.,
339 ::
341 >>> to_key_val_list([('key', 'val')])
342 [('key', 'val')]
343 >>> to_key_val_list({'key': 'val'})
344 [('key', 'val')]
345 >>> to_key_val_list('string')
346 Traceback (most recent call last):
347 ...
348 ValueError: cannot encode objects that are not 2-tuples
350 :rtype: list
351 """
352 if value is None:
353 return None
355 if isinstance(value, (str, bytes, bool, int)):
356 raise ValueError("cannot encode objects that are not 2-tuples")
358 if isinstance(value, Mapping):
359 value = value.items()
361 return list(value)
364# From mitsuhiko/werkzeug (used with permission).
365def parse_list_header(value):
366 """Parse lists as described by RFC 2068 Section 2.
368 In particular, parse comma-separated lists where the elements of
369 the list may include quoted-strings. A quoted-string could
370 contain a comma. A non-quoted string could have quotes in the
371 middle. Quotes are removed automatically after parsing.
373 It basically works like :func:`parse_set_header` just that items
374 may appear multiple times and case sensitivity is preserved.
376 The return value is a standard :class:`list`:
378 >>> parse_list_header('token, "quoted value"')
379 ['token', 'quoted value']
381 To create a header from the :class:`list` again, use the
382 :func:`dump_header` function.
384 :param value: a string with a list header.
385 :return: :class:`list`
386 :rtype: list
387 """
388 result = []
389 for item in _parse_list_header(value):
390 if item[:1] == item[-1:] == '"':
391 item = unquote_header_value(item[1:-1])
392 result.append(item)
393 return result
396# From mitsuhiko/werkzeug (used with permission).
397def parse_dict_header(value):
398 """Parse lists of key, value pairs as described by RFC 2068 Section 2 and
399 convert them into a python dict:
401 >>> d = parse_dict_header('foo="is a fish", bar="as well"')
402 >>> type(d) is dict
403 True
404 >>> sorted(d.items())
405 [('bar', 'as well'), ('foo', 'is a fish')]
407 If there is no value for a key it will be `None`:
409 >>> parse_dict_header('key_without_value')
410 {'key_without_value': None}
412 To create a header from the :class:`dict` again, use the
413 :func:`dump_header` function.
415 :param value: a string with a dict header.
416 :return: :class:`dict`
417 :rtype: dict
418 """
419 result = {}
420 for item in _parse_list_header(value):
421 if "=" not in item:
422 result[item] = None
423 continue
424 name, value = item.split("=", 1)
425 if value[:1] == value[-1:] == '"':
426 value = unquote_header_value(value[1:-1])
427 result[name] = value
428 return result
431# From mitsuhiko/werkzeug (used with permission).
432def unquote_header_value(value, is_filename=False):
433 r"""Unquotes a header value. (Reversal of :func:`quote_header_value`).
434 This does not use the real unquoting but what browsers are actually
435 using for quoting.
437 :param value: the header value to unquote.
438 :rtype: str
439 """
440 if value and value[0] == value[-1] == '"':
441 # this is not the real unquoting, but fixing this so that the
442 # RFC is met will result in bugs with internet explorer and
443 # probably some other browsers as well. IE for example is
444 # uploading files with "C:\foo\bar.txt" as filename
445 value = value[1:-1]
447 # if this is a filename and the starting characters look like
448 # a UNC path, then just return the value without quotes. Using the
449 # replace sequence below on a UNC path has the effect of turning
450 # the leading double slash into a single slash and then
451 # _fix_ie_filename() doesn't work correctly. See #458.
452 if not is_filename or value[:2] != "\\\\":
453 return value.replace("\\\\", "\\").replace('\\"', '"')
454 return value
457def dict_from_cookiejar(cj):
458 """Returns a key/value dictionary from a CookieJar.
460 :param cj: CookieJar object to extract cookies from.
461 :rtype: dict
462 """
464 cookie_dict = {cookie.name: cookie.value for cookie in cj}
465 return cookie_dict
468def add_dict_to_cookiejar(cj, cookie_dict):
469 """Returns a CookieJar from a key/value dictionary.
471 :param cj: CookieJar to insert cookies into.
472 :param cookie_dict: Dict of key/values to insert into CookieJar.
473 :rtype: CookieJar
474 """
476 return cookiejar_from_dict(cookie_dict, cj)
479def get_encodings_from_content(content):
480 """Returns encodings from given content string.
482 :param content: bytestring to extract encodings from.
483 """
484 warnings.warn(
485 (
486 "In requests 3.0, get_encodings_from_content will be removed. For "
487 "more information, please see the discussion on issue #2266. (This"
488 " warning should only appear once.)"
489 ),
490 DeprecationWarning,
491 )
493 charset_re = re.compile(r'<meta.*?charset=["\']*(.+?)["\'>]', flags=re.I)
494 pragma_re = re.compile(r'<meta.*?content=["\']*;?charset=(.+?)["\'>]', flags=re.I)
495 xml_re = re.compile(r'^<\?xml.*?encoding=["\']*(.+?)["\'>]')
497 return (
498 charset_re.findall(content)
499 + pragma_re.findall(content)
500 + xml_re.findall(content)
501 )
504def _parse_content_type_header(header):
505 """Returns content type and parameters from given header
507 :param header: string
508 :return: tuple containing content type and dictionary of
509 parameters
510 """
512 tokens = header.split(";")
513 content_type, params = tokens[0].strip(), tokens[1:]
514 params_dict = {}
515 items_to_strip = "\"' "
517 for param in params:
518 param = param.strip()
519 if param:
520 key, value = param, True
521 index_of_equals = param.find("=")
522 if index_of_equals != -1:
523 key = param[:index_of_equals].strip(items_to_strip)
524 value = param[index_of_equals + 1 :].strip(items_to_strip)
525 params_dict[key.lower()] = value
526 return content_type, params_dict
529def get_encoding_from_headers(headers):
530 """Returns encodings from given HTTP Header Dict.
532 :param headers: dictionary to extract encoding from.
533 :rtype: str
534 """
536 content_type = headers.get("content-type")
538 if not content_type:
539 return None
541 content_type, params = _parse_content_type_header(content_type)
543 if "charset" in params:
544 return params["charset"].strip("'\"")
546 if "text" in content_type:
547 return "ISO-8859-1"
549 if "application/json" in content_type:
550 # Assume UTF-8 based on RFC 4627: https://www.ietf.org/rfc/rfc4627.txt since the charset was unset
551 return "utf-8"
554def stream_decode_response_unicode(iterator, r):
555 """Stream decodes an iterator."""
557 if r.encoding is None:
558 yield from iterator
559 return
561 decoder = codecs.getincrementaldecoder(r.encoding)(errors="replace")
562 for chunk in iterator:
563 rv = decoder.decode(chunk)
564 if rv:
565 yield rv
566 rv = decoder.decode(b"", final=True)
567 if rv:
568 yield rv
571def iter_slices(string, slice_length):
572 """Iterate over slices of a string."""
573 pos = 0
574 if slice_length is None or slice_length <= 0:
575 slice_length = len(string)
576 while pos < len(string):
577 yield string[pos : pos + slice_length]
578 pos += slice_length
581def get_unicode_from_response(r):
582 """Returns the requested content back in unicode.
584 :param r: Response object to get unicode content from.
586 Tried:
588 1. charset from content-type
589 2. fall back and replace all unicode characters
591 :rtype: str
592 """
593 warnings.warn(
594 (
595 "In requests 3.0, get_unicode_from_response will be removed. For "
596 "more information, please see the discussion on issue #2266. (This"
597 " warning should only appear once.)"
598 ),
599 DeprecationWarning,
600 )
602 tried_encodings = []
604 # Try charset from content-type
605 encoding = get_encoding_from_headers(r.headers)
607 if encoding:
608 try:
609 return str(r.content, encoding)
610 except UnicodeError:
611 tried_encodings.append(encoding)
613 # Fall back:
614 try:
615 return str(r.content, encoding, errors="replace")
616 except TypeError:
617 return r.content
620# The unreserved URI characters (RFC 3986)
621UNRESERVED_SET = frozenset(
622 "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" + "0123456789-._~"
623)
626def unquote_unreserved(uri):
627 """Un-escape any percent-escape sequences in a URI that are unreserved
628 characters. This leaves all reserved, illegal and non-ASCII bytes encoded.
630 :rtype: str
631 """
632 parts = uri.split("%")
633 for i in range(1, len(parts)):
634 h = parts[i][0:2]
635 if len(h) == 2 and h.isalnum():
636 try:
637 c = chr(int(h, 16))
638 except ValueError:
639 raise InvalidURL(f"Invalid percent-escape sequence: '{h}'")
641 if c in UNRESERVED_SET:
642 parts[i] = c + parts[i][2:]
643 else:
644 parts[i] = f"%{parts[i]}"
645 else:
646 parts[i] = f"%{parts[i]}"
647 return "".join(parts)
650def requote_uri(uri):
651 """Re-quote the given URI.
653 This function passes the given URI through an unquote/quote cycle to
654 ensure that it is fully and consistently quoted.
656 :rtype: str
657 """
658 safe_with_percent = "!#$%&'()*+,/:;=?@[]~"
659 safe_without_percent = "!#$&'()*+,/:;=?@[]~"
660 try:
661 # Unquote only the unreserved characters
662 # Then quote only illegal characters (do not quote reserved,
663 # unreserved, or '%')
664 return quote(unquote_unreserved(uri), safe=safe_with_percent)
665 except InvalidURL:
666 # We couldn't unquote the given URI, so let's try quoting it, but
667 # there may be unquoted '%'s in the URI. We need to make sure they're
668 # properly quoted so they do not cause issues elsewhere.
669 return quote(uri, safe=safe_without_percent)
672def address_in_network(ip, net):
673 """This function allows you to check if an IP belongs to a network subnet
675 Example: returns True if ip = 192.168.1.1 and net = 192.168.1.0/24
676 returns False if ip = 192.168.1.1 and net = 192.168.100.0/24
678 :rtype: bool
679 """
680 ipaddr = struct.unpack("=L", socket.inet_aton(ip))[0]
681 netaddr, bits = net.split("/")
682 netmask = struct.unpack("=L", socket.inet_aton(dotted_netmask(int(bits))))[0]
683 network = struct.unpack("=L", socket.inet_aton(netaddr))[0] & netmask
684 return (ipaddr & netmask) == (network & netmask)
687def dotted_netmask(mask):
688 """Converts mask from /xx format to xxx.xxx.xxx.xxx
690 Example: if mask is 24 function returns 255.255.255.0
692 :rtype: str
693 """
694 bits = 0xFFFFFFFF ^ (1 << 32 - mask) - 1
695 return socket.inet_ntoa(struct.pack(">I", bits))
698def is_ipv4_address(string_ip):
699 """
700 :rtype: bool
701 """
702 try:
703 socket.inet_aton(string_ip)
704 except OSError:
705 return False
706 return True
709def is_valid_cidr(string_network):
710 """
711 Very simple check of the cidr format in no_proxy variable.
713 :rtype: bool
714 """
715 if string_network.count("/") == 1:
716 try:
717 mask = int(string_network.split("/")[1])
718 except ValueError:
719 return False
721 if mask < 1 or mask > 32:
722 return False
724 try:
725 socket.inet_aton(string_network.split("/")[0])
726 except OSError:
727 return False
728 else:
729 return False
730 return True
733@contextlib.contextmanager
734def set_environ(env_name, value):
735 """Set the environment variable 'env_name' to 'value'
737 Save previous value, yield, and then restore the previous value stored in
738 the environment variable 'env_name'.
740 If 'value' is None, do nothing"""
741 value_changed = value is not None
742 if value_changed:
743 old_value = os.environ.get(env_name)
744 os.environ[env_name] = value
745 try:
746 yield
747 finally:
748 if value_changed:
749 if old_value is None:
750 del os.environ[env_name]
751 else:
752 os.environ[env_name] = old_value
755def should_bypass_proxies(url, no_proxy):
756 """
757 Returns whether we should bypass proxies or not.
759 :rtype: bool
760 """
762 # Prioritize lowercase environment variables over uppercase
763 # to keep a consistent behaviour with other http projects (curl, wget).
764 def get_proxy(key):
765 return os.environ.get(key) or os.environ.get(key.upper())
767 # First check whether no_proxy is defined. If it is, check that the URL
768 # we're getting isn't in the no_proxy list.
769 no_proxy_arg = no_proxy
770 if no_proxy is None:
771 no_proxy = get_proxy("no_proxy")
772 parsed = urlparse(url)
774 if parsed.hostname is None:
775 # URLs don't always have hostnames, e.g. file:/// urls.
776 return True
778 if no_proxy:
779 # We need to check whether we match here. We need to see if we match
780 # the end of the hostname, both with and without the port.
781 no_proxy = (host for host in no_proxy.replace(" ", "").split(",") if host)
783 if is_ipv4_address(parsed.hostname):
784 for proxy_ip in no_proxy:
785 if is_valid_cidr(proxy_ip):
786 if address_in_network(parsed.hostname, proxy_ip):
787 return True
788 elif parsed.hostname == proxy_ip:
789 # If no_proxy ip was defined in plain IP notation instead of cidr notation &
790 # matches the IP of the index
791 return True
792 else:
793 host_with_port = parsed.hostname
794 if parsed.port:
795 host_with_port += f":{parsed.port}"
797 for host in no_proxy:
798 if parsed.hostname.endswith(host) or host_with_port.endswith(host):
799 # The URL does match something in no_proxy, so we don't want
800 # to apply the proxies on this URL.
801 return True
803 with set_environ("no_proxy", no_proxy_arg):
804 # parsed.hostname can be `None` in cases such as a file URI.
805 try:
806 bypass = proxy_bypass(parsed.hostname)
807 except (TypeError, socket.gaierror):
808 bypass = False
810 if bypass:
811 return True
813 return False
816def get_environ_proxies(url, no_proxy=None):
817 """
818 Return a dict of environment proxies.
820 :rtype: dict
821 """
822 if should_bypass_proxies(url, no_proxy=no_proxy):
823 return {}
824 else:
825 return getproxies()
828def select_proxy(url, proxies):
829 """Select a proxy for the url, if applicable.
831 :param url: The url being for the request
832 :param proxies: A dictionary of schemes or schemes and hosts to proxy URLs
833 """
834 proxies = proxies or {}
835 urlparts = urlparse(url)
836 if urlparts.hostname is None:
837 return proxies.get(urlparts.scheme, proxies.get("all"))
839 proxy_keys = [
840 urlparts.scheme + "://" + urlparts.hostname,
841 urlparts.scheme,
842 "all://" + urlparts.hostname,
843 "all",
844 ]
845 proxy = None
846 for proxy_key in proxy_keys:
847 if proxy_key in proxies:
848 proxy = proxies[proxy_key]
849 break
851 return proxy
854def resolve_proxies(request, proxies, trust_env=True):
855 """This method takes proxy information from a request and configuration
856 input to resolve a mapping of target proxies. This will consider settings
857 such as NO_PROXY to strip proxy configurations.
859 :param request: Request or PreparedRequest
860 :param proxies: A dictionary of schemes or schemes and hosts to proxy URLs
861 :param trust_env: Boolean declaring whether to trust environment configs
863 :rtype: dict
864 """
865 proxies = proxies if proxies is not None else {}
866 url = request.url
867 scheme = urlparse(url).scheme
868 no_proxy = proxies.get("no_proxy")
869 new_proxies = proxies.copy()
871 if trust_env and not should_bypass_proxies(url, no_proxy=no_proxy):
872 environ_proxies = get_environ_proxies(url, no_proxy=no_proxy)
874 proxy = environ_proxies.get(scheme, environ_proxies.get("all"))
876 if proxy:
877 new_proxies.setdefault(scheme, proxy)
878 return new_proxies
881def default_user_agent(name="python-requests"):
882 """
883 Return a string representing the default user agent.
885 :rtype: str
886 """
887 return f"{name}/{__version__}"
890def default_headers():
891 """
892 :rtype: requests.structures.CaseInsensitiveDict
893 """
894 return CaseInsensitiveDict(
895 {
896 "User-Agent": default_user_agent(),
897 "Accept-Encoding": DEFAULT_ACCEPT_ENCODING,
898 "Accept": "*/*",
899 "Connection": "keep-alive",
900 }
901 )
904def parse_header_links(value):
905 """Return a list of parsed link headers proxies.
907 i.e. Link: <http:/.../front.jpeg>; rel=front; type="image/jpeg",<http://.../back.jpeg>; rel=back;type="image/jpeg"
909 :rtype: list
910 """
912 links = []
914 replace_chars = " '\""
916 value = value.strip(replace_chars)
917 if not value:
918 return links
920 for val in re.split(", *<", value):
921 try:
922 url, params = val.split(";", 1)
923 except ValueError:
924 url, params = val, ""
926 link = {"url": url.strip("<> '\"")}
928 for param in params.split(";"):
929 try:
930 key, value = param.split("=")
931 except ValueError:
932 break
934 link[key.strip(replace_chars)] = value.strip(replace_chars)
936 links.append(link)
938 return links
941# Null bytes; no need to recreate these on each call to guess_json_utf
942_null = "\x00".encode("ascii") # encoding to ASCII for Python 3
943_null2 = _null * 2
944_null3 = _null * 3
947def guess_json_utf(data):
948 """
949 :rtype: str
950 """
951 # JSON always starts with two ASCII characters, so detection is as
952 # easy as counting the nulls and from their location and count
953 # determine the encoding. Also detect a BOM, if present.
954 sample = data[:4]
955 if sample in (codecs.BOM_UTF32_LE, codecs.BOM_UTF32_BE):
956 return "utf-32" # BOM included
957 if sample[:3] == codecs.BOM_UTF8:
958 return "utf-8-sig" # BOM included, MS style (discouraged)
959 if sample[:2] in (codecs.BOM_UTF16_LE, codecs.BOM_UTF16_BE):
960 return "utf-16" # BOM included
961 nullcount = sample.count(_null)
962 if nullcount == 0:
963 return "utf-8"
964 if nullcount == 2:
965 if sample[::2] == _null2: # 1st and 3rd are null
966 return "utf-16-be"
967 if sample[1::2] == _null2: # 2nd and 4th are null
968 return "utf-16-le"
969 # Did not detect 2 valid UTF-16 ascii-range characters
970 if nullcount == 3:
971 if sample[:3] == _null3:
972 return "utf-32-be"
973 if sample[1:] == _null3:
974 return "utf-32-le"
975 # Did not detect a valid UTF-32 ascii-range character
976 return None
979def prepend_scheme_if_needed(url, new_scheme):
980 """Given a URL that may or may not have a scheme, prepend the given scheme.
981 Does not replace a present scheme with the one provided as an argument.
983 :rtype: str
984 """
985 parsed = parse_url(url)
986 scheme, auth, host, port, path, query, fragment = parsed
988 # A defect in urlparse determines that there isn't a netloc present in some
989 # urls. We previously assumed parsing was overly cautious, and swapped the
990 # netloc and path. Due to a lack of tests on the original defect, this is
991 # maintained with parse_url for backwards compatibility.
992 netloc = parsed.netloc
993 if not netloc:
994 netloc, path = path, netloc
996 if auth:
997 # parse_url doesn't provide the netloc with auth
998 # so we'll add it ourselves.
999 netloc = "@".join([auth, netloc])
1000 if scheme is None:
1001 scheme = new_scheme
1002 if path is None:
1003 path = ""
1005 return urlunparse((scheme, netloc, path, "", query, fragment))
1008def get_auth_from_url(url):
1009 """Given a url with authentication components, extract them into a tuple of
1010 username,password.
1012 :rtype: (str,str)
1013 """
1014 parsed = urlparse(url)
1016 try:
1017 auth = (unquote(parsed.username), unquote(parsed.password))
1018 except (AttributeError, TypeError):
1019 auth = ("", "")
1021 return auth
1024def check_header_validity(header):
1025 """Verifies that header parts don't contain leading whitespace
1026 reserved characters, or return characters.
1028 :param header: tuple, in the format (name, value).
1029 """
1030 name, value = header
1031 _validate_header_part(header, name, 0)
1032 _validate_header_part(header, value, 1)
1035def _validate_header_part(header, header_part, header_validator_index):
1036 if isinstance(header_part, str):
1037 validator = _HEADER_VALIDATORS_STR[header_validator_index]
1038 elif isinstance(header_part, bytes):
1039 validator = _HEADER_VALIDATORS_BYTE[header_validator_index]
1040 else:
1041 raise InvalidHeader(
1042 f"Header part ({header_part!r}) from {header} "
1043 f"must be of type str or bytes, not {type(header_part)}"
1044 )
1046 if not validator.match(header_part):
1047 header_kind = "name" if header_validator_index == 0 else "value"
1048 raise InvalidHeader(
1049 f"Invalid leading whitespace, reserved character(s), or return "
1050 f"character(s) in header {header_kind}: {header_part!r}"
1051 )
1054def urldefragauth(url):
1055 """
1056 Given a url remove the fragment and the authentication part.
1058 :rtype: str
1059 """
1060 scheme, netloc, path, params, query, fragment = urlparse(url)
1062 # see func:`prepend_scheme_if_needed`
1063 if not netloc:
1064 netloc, path = path, netloc
1066 netloc = netloc.rsplit("@", 1)[-1]
1068 return urlunparse((scheme, netloc, path, params, query, ""))
1071def rewind_body(prepared_request):
1072 """Move file pointer back to its recorded starting position
1073 so it can be read again on redirect.
1074 """
1075 body_seek = getattr(prepared_request.body, "seek", None)
1076 if body_seek is not None and isinstance(
1077 prepared_request._body_position, integer_types
1078 ):
1079 try:
1080 body_seek(prepared_request._body_position)
1081 except OSError:
1082 raise UnrewindableBodyError(
1083 "An error occurred when rewinding request body for redirect."
1084 )
1085 else:
1086 raise UnrewindableBodyError("Unable to rewind request body for redirect.")