1"""
2requests.utils
3~~~~~~~~~~~~~~
4
5This module provides utility functions that are used within Requests
6that are also useful for external consumption.
7"""
8
9import codecs
10import contextlib
11import io
12import os
13import re
14import socket
15import struct
16import sys
17import tempfile
18import warnings
19import zipfile
20from collections import OrderedDict
21
22from pip._vendor.urllib3.util import make_headers, parse_url
23
24from . import certs
25from .__version__ import __version__
26
27# to_native_string is unused here, but imported here for backwards compatibility
28from ._internal_utils import ( # noqa: F401
29 _HEADER_VALIDATORS_BYTE,
30 _HEADER_VALIDATORS_STR,
31 HEADER_VALIDATORS,
32 to_native_string,
33)
34from .compat import (
35 Mapping,
36 basestring,
37 bytes,
38 getproxies,
39 getproxies_environment,
40 integer_types,
41 is_urllib3_1,
42 proxy_bypass,
43 proxy_bypass_environment,
44 quote,
45 str,
46 unquote,
47 urlparse,
48 urlunparse,
49)
50from .compat import parse_http_list as _parse_list_header
51from .cookies import cookiejar_from_dict
52from .exceptions import (
53 FileModeWarning,
54 InvalidHeader,
55 InvalidURL,
56 UnrewindableBodyError,
57)
58from .structures import CaseInsensitiveDict
59
60NETRC_FILES = (".netrc", "_netrc")
61
62# Certificate is extracted by certifi when needed.
63DEFAULT_CA_BUNDLE_PATH = certs.where()
64
65DEFAULT_PORTS = {"http": 80, "https": 443}
66
67# Ensure that ', ' is used to preserve previous delimiter behavior.
68DEFAULT_ACCEPT_ENCODING = ", ".join(
69 re.split(r",\s*", make_headers(accept_encoding=True)["accept-encoding"])
70)
71
72
73if sys.platform == "win32":
74 # provide a proxy_bypass version on Windows without DNS lookups
75
76 def proxy_bypass_registry(host):
77 try:
78 import winreg
79 except ImportError:
80 return False
81
82 try:
83 internetSettings = winreg.OpenKey(
84 winreg.HKEY_CURRENT_USER,
85 r"Software\Microsoft\Windows\CurrentVersion\Internet Settings",
86 )
87 # ProxyEnable could be REG_SZ or REG_DWORD, normalizing it
88 proxyEnable = int(winreg.QueryValueEx(internetSettings, "ProxyEnable")[0])
89 # ProxyOverride is almost always a string
90 proxyOverride = winreg.QueryValueEx(internetSettings, "ProxyOverride")[0]
91 except (OSError, ValueError):
92 return False
93 if not proxyEnable or not proxyOverride:
94 return False
95
96 # make a check value list from the registry entry: replace the
97 # '<local>' string by the localhost entry and the corresponding
98 # canonical entry.
99 proxyOverride = proxyOverride.split(";")
100 # filter out empty strings to avoid re.match return true in the following code.
101 proxyOverride = filter(None, proxyOverride)
102 # now check if we match one of the registry values.
103 for test in proxyOverride:
104 if test == "<local>":
105 if "." not in host:
106 return True
107 test = test.replace(".", r"\.") # mask dots
108 test = test.replace("*", r".*") # change glob sequence
109 test = test.replace("?", r".") # change glob char
110 if re.match(test, host, re.I):
111 return True
112 return False
113
114 def proxy_bypass(host): # noqa
115 """Return True, if the host should be bypassed.
116
117 Checks proxy settings gathered from the environment, if specified,
118 or the registry.
119 """
120 if getproxies_environment():
121 return proxy_bypass_environment(host)
122 else:
123 return proxy_bypass_registry(host)
124
125
126def dict_to_sequence(d):
127 """Returns an internal sequence dictionary update."""
128
129 if hasattr(d, "items"):
130 d = d.items()
131
132 return d
133
134
135def super_len(o):
136 total_length = None
137 current_position = 0
138
139 if not is_urllib3_1 and isinstance(o, str):
140 # urllib3 2.x+ treats all strings as utf-8 instead
141 # of latin-1 (iso-8859-1) like http.client.
142 o = o.encode("utf-8")
143
144 if hasattr(o, "__len__"):
145 total_length = len(o)
146
147 elif hasattr(o, "len"):
148 total_length = o.len
149
150 elif hasattr(o, "fileno"):
151 try:
152 fileno = o.fileno()
153 except (io.UnsupportedOperation, AttributeError):
154 # AttributeError is a surprising exception, seeing as how we've just checked
155 # that `hasattr(o, 'fileno')`. It happens for objects obtained via
156 # `Tarfile.extractfile()`, per issue 5229.
157 pass
158 else:
159 total_length = os.fstat(fileno).st_size
160
161 # Having used fstat to determine the file length, we need to
162 # confirm that this file was opened up in binary mode.
163 if "b" not in o.mode:
164 warnings.warn(
165 (
166 "Requests has determined the content-length for this "
167 "request using the binary size of the file: however, the "
168 "file has been opened in text mode (i.e. without the 'b' "
169 "flag in the mode). This may lead to an incorrect "
170 "content-length. In Requests 3.0, support will be removed "
171 "for files in text mode."
172 ),
173 FileModeWarning,
174 )
175
176 if hasattr(o, "tell"):
177 try:
178 current_position = o.tell()
179 except OSError:
180 # This can happen in some weird situations, such as when the file
181 # is actually a special file descriptor like stdin. In this
182 # instance, we don't know what the length is, so set it to zero and
183 # let requests chunk it instead.
184 if total_length is not None:
185 current_position = total_length
186 else:
187 if hasattr(o, "seek") and total_length is None:
188 # StringIO and BytesIO have seek but no usable fileno
189 try:
190 # seek to end of file
191 o.seek(0, 2)
192 total_length = o.tell()
193
194 # seek back to current position to support
195 # partially read file-like objects
196 o.seek(current_position or 0)
197 except OSError:
198 total_length = 0
199
200 if total_length is None:
201 total_length = 0
202
203 return max(0, total_length - current_position)
204
205
206def get_netrc_auth(url, raise_errors=False):
207 """Returns the Requests tuple auth for a given url from netrc."""
208
209 netrc_file = os.environ.get("NETRC")
210 if netrc_file is not None:
211 netrc_locations = (netrc_file,)
212 else:
213 netrc_locations = (f"~/{f}" for f in NETRC_FILES)
214
215 try:
216 from netrc import NetrcParseError, netrc
217
218 netrc_path = None
219
220 for f in netrc_locations:
221 loc = os.path.expanduser(f)
222 if os.path.exists(loc):
223 netrc_path = loc
224 break
225
226 # Abort early if there isn't one.
227 if netrc_path is None:
228 return
229
230 ri = urlparse(url)
231 host = ri.hostname
232
233 try:
234 _netrc = netrc(netrc_path).authenticators(host)
235 if _netrc and any(_netrc):
236 # Return with login / password
237 login_i = 0 if _netrc[0] else 1
238 return (_netrc[login_i], _netrc[2])
239 except (NetrcParseError, OSError):
240 # If there was a parsing error or a permissions issue reading the file,
241 # we'll just skip netrc auth unless explicitly asked to raise errors.
242 if raise_errors:
243 raise
244
245 # App Engine hackiness.
246 except (ImportError, AttributeError):
247 pass
248
249
250def guess_filename(obj):
251 """Tries to guess the filename of the given object."""
252 name = getattr(obj, "name", None)
253 if name and isinstance(name, basestring) and name[0] != "<" and name[-1] != ">":
254 return os.path.basename(name)
255
256
257def extract_zipped_paths(path):
258 """Replace nonexistent paths that look like they refer to a member of a zip
259 archive with the location of an extracted copy of the target, or else
260 just return the provided path unchanged.
261 """
262 if os.path.exists(path):
263 # this is already a valid path, no need to do anything further
264 return path
265
266 # find the first valid part of the provided path and treat that as a zip archive
267 # assume the rest of the path is the name of a member in the archive
268 archive, member = os.path.split(path)
269 while archive and not os.path.exists(archive):
270 archive, prefix = os.path.split(archive)
271 if not prefix:
272 # If we don't check for an empty prefix after the split (in other words, archive remains unchanged after the split),
273 # we _can_ end up in an infinite loop on a rare corner case affecting a small number of users
274 break
275 member = "/".join([prefix, member])
276
277 if not zipfile.is_zipfile(archive):
278 return path
279
280 zip_file = zipfile.ZipFile(archive)
281 if member not in zip_file.namelist():
282 return path
283
284 # we have a valid zip archive and a valid member of that archive
285 suffix = os.path.splitext(member.split("/")[-1])[-1]
286 fd, extracted_path = tempfile.mkstemp(suffix=suffix)
287 try:
288 os.write(fd, zip_file.read(member))
289 finally:
290 os.close(fd)
291
292 return extracted_path
293
294
295@contextlib.contextmanager
296def atomic_open(filename):
297 """Write a file to the disk in an atomic fashion"""
298 tmp_descriptor, tmp_name = tempfile.mkstemp(dir=os.path.dirname(filename))
299 try:
300 with os.fdopen(tmp_descriptor, "wb") as tmp_handler:
301 yield tmp_handler
302 os.replace(tmp_name, filename)
303 except BaseException:
304 os.remove(tmp_name)
305 raise
306
307
308def from_key_val_list(value):
309 """Take an object and test to see if it can be represented as a
310 dictionary. Unless it can not be represented as such, return an
311 OrderedDict, e.g.,
312
313 ::
314
315 >>> from_key_val_list([('key', 'val')])
316 OrderedDict([('key', 'val')])
317 >>> from_key_val_list('string')
318 Traceback (most recent call last):
319 ...
320 ValueError: cannot encode objects that are not 2-tuples
321 >>> from_key_val_list({'key': 'val'})
322 OrderedDict([('key', 'val')])
323
324 :rtype: OrderedDict
325 """
326 if value is None:
327 return None
328
329 if isinstance(value, (str, bytes, bool, int)):
330 raise ValueError("cannot encode objects that are not 2-tuples")
331
332 return OrderedDict(value)
333
334
335def to_key_val_list(value):
336 """Take an object and test to see if it can be represented as a
337 dictionary. If it can be, return a list of tuples, e.g.,
338
339 ::
340
341 >>> to_key_val_list([('key', 'val')])
342 [('key', 'val')]
343 >>> to_key_val_list({'key': 'val'})
344 [('key', 'val')]
345 >>> to_key_val_list('string')
346 Traceback (most recent call last):
347 ...
348 ValueError: cannot encode objects that are not 2-tuples
349
350 :rtype: list
351 """
352 if value is None:
353 return None
354
355 if isinstance(value, (str, bytes, bool, int)):
356 raise ValueError("cannot encode objects that are not 2-tuples")
357
358 if isinstance(value, Mapping):
359 value = value.items()
360
361 return list(value)
362
363
364# From mitsuhiko/werkzeug (used with permission).
365def parse_list_header(value):
366 """Parse lists as described by RFC 2068 Section 2.
367
368 In particular, parse comma-separated lists where the elements of
369 the list may include quoted-strings. A quoted-string could
370 contain a comma. A non-quoted string could have quotes in the
371 middle. Quotes are removed automatically after parsing.
372
373 It basically works like :func:`parse_set_header` just that items
374 may appear multiple times and case sensitivity is preserved.
375
376 The return value is a standard :class:`list`:
377
378 >>> parse_list_header('token, "quoted value"')
379 ['token', 'quoted value']
380
381 To create a header from the :class:`list` again, use the
382 :func:`dump_header` function.
383
384 :param value: a string with a list header.
385 :return: :class:`list`
386 :rtype: list
387 """
388 result = []
389 for item in _parse_list_header(value):
390 if item[:1] == item[-1:] == '"':
391 item = unquote_header_value(item[1:-1])
392 result.append(item)
393 return result
394
395
396# From mitsuhiko/werkzeug (used with permission).
397def parse_dict_header(value):
398 """Parse lists of key, value pairs as described by RFC 2068 Section 2 and
399 convert them into a python dict:
400
401 >>> d = parse_dict_header('foo="is a fish", bar="as well"')
402 >>> type(d) is dict
403 True
404 >>> sorted(d.items())
405 [('bar', 'as well'), ('foo', 'is a fish')]
406
407 If there is no value for a key it will be `None`:
408
409 >>> parse_dict_header('key_without_value')
410 {'key_without_value': None}
411
412 To create a header from the :class:`dict` again, use the
413 :func:`dump_header` function.
414
415 :param value: a string with a dict header.
416 :return: :class:`dict`
417 :rtype: dict
418 """
419 result = {}
420 for item in _parse_list_header(value):
421 if "=" not in item:
422 result[item] = None
423 continue
424 name, value = item.split("=", 1)
425 if value[:1] == value[-1:] == '"':
426 value = unquote_header_value(value[1:-1])
427 result[name] = value
428 return result
429
430
431# From mitsuhiko/werkzeug (used with permission).
432def unquote_header_value(value, is_filename=False):
433 r"""Unquotes a header value. (Reversal of :func:`quote_header_value`).
434 This does not use the real unquoting but what browsers are actually
435 using for quoting.
436
437 :param value: the header value to unquote.
438 :rtype: str
439 """
440 if value and value[0] == value[-1] == '"':
441 # this is not the real unquoting, but fixing this so that the
442 # RFC is met will result in bugs with internet explorer and
443 # probably some other browsers as well. IE for example is
444 # uploading files with "C:\foo\bar.txt" as filename
445 value = value[1:-1]
446
447 # if this is a filename and the starting characters look like
448 # a UNC path, then just return the value without quotes. Using the
449 # replace sequence below on a UNC path has the effect of turning
450 # the leading double slash into a single slash and then
451 # _fix_ie_filename() doesn't work correctly. See #458.
452 if not is_filename or value[:2] != "\\\\":
453 return value.replace("\\\\", "\\").replace('\\"', '"')
454 return value
455
456
457def dict_from_cookiejar(cj):
458 """Returns a key/value dictionary from a CookieJar.
459
460 :param cj: CookieJar object to extract cookies from.
461 :rtype: dict
462 """
463
464 cookie_dict = {cookie.name: cookie.value for cookie in cj}
465 return cookie_dict
466
467
468def add_dict_to_cookiejar(cj, cookie_dict):
469 """Returns a CookieJar from a key/value dictionary.
470
471 :param cj: CookieJar to insert cookies into.
472 :param cookie_dict: Dict of key/values to insert into CookieJar.
473 :rtype: CookieJar
474 """
475
476 return cookiejar_from_dict(cookie_dict, cj)
477
478
479def get_encodings_from_content(content):
480 """Returns encodings from given content string.
481
482 :param content: bytestring to extract encodings from.
483 """
484 warnings.warn(
485 (
486 "In requests 3.0, get_encodings_from_content will be removed. For "
487 "more information, please see the discussion on issue #2266. (This"
488 " warning should only appear once.)"
489 ),
490 DeprecationWarning,
491 )
492
493 charset_re = re.compile(r'<meta.*?charset=["\']*(.+?)["\'>]', flags=re.I)
494 pragma_re = re.compile(r'<meta.*?content=["\']*;?charset=(.+?)["\'>]', flags=re.I)
495 xml_re = re.compile(r'^<\?xml.*?encoding=["\']*(.+?)["\'>]')
496
497 return (
498 charset_re.findall(content)
499 + pragma_re.findall(content)
500 + xml_re.findall(content)
501 )
502
503
504def _parse_content_type_header(header):
505 """Returns content type and parameters from given header.
506
507 :param header: string
508 :return: tuple containing content type and dictionary of
509 parameters.
510 """
511
512 tokens = header.split(";")
513 content_type, params = tokens[0].strip(), tokens[1:]
514 params_dict = {}
515 strip_chars = "\"' "
516
517 for param in params:
518 param = param.strip()
519 if param and (idx := param.find("=")) != -1:
520 key = param[:idx].strip(strip_chars)
521 value = param[idx + 1 :].strip(strip_chars)
522 params_dict[key.lower()] = value
523 return content_type, params_dict
524
525
526def get_encoding_from_headers(headers):
527 """Returns encodings from given HTTP Header Dict.
528
529 :param headers: dictionary to extract encoding from.
530 :rtype: str
531 """
532
533 content_type = headers.get("content-type")
534
535 if not content_type:
536 return None
537
538 content_type, params = _parse_content_type_header(content_type)
539
540 if "charset" in params:
541 return params["charset"].strip("'\"")
542
543 if "text" in content_type:
544 return "ISO-8859-1"
545
546 if "application/json" in content_type:
547 # Assume UTF-8 based on RFC 4627: https://www.ietf.org/rfc/rfc4627.txt since the charset was unset
548 return "utf-8"
549
550
551def stream_decode_response_unicode(iterator, r):
552 """Stream decodes an iterator."""
553
554 if r.encoding is None:
555 yield from iterator
556 return
557
558 decoder = codecs.getincrementaldecoder(r.encoding)(errors="replace")
559 for chunk in iterator:
560 rv = decoder.decode(chunk)
561 if rv:
562 yield rv
563 rv = decoder.decode(b"", final=True)
564 if rv:
565 yield rv
566
567
568def iter_slices(string, slice_length):
569 """Iterate over slices of a string."""
570 pos = 0
571 if slice_length is None or slice_length <= 0:
572 slice_length = len(string)
573 while pos < len(string):
574 yield string[pos : pos + slice_length]
575 pos += slice_length
576
577
578def get_unicode_from_response(r):
579 """Returns the requested content back in unicode.
580
581 :param r: Response object to get unicode content from.
582
583 Tried:
584
585 1. charset from content-type
586 2. fall back and replace all unicode characters
587
588 :rtype: str
589 """
590 warnings.warn(
591 (
592 "In requests 3.0, get_unicode_from_response will be removed. For "
593 "more information, please see the discussion on issue #2266. (This"
594 " warning should only appear once.)"
595 ),
596 DeprecationWarning,
597 )
598
599 tried_encodings = []
600
601 # Try charset from content-type
602 encoding = get_encoding_from_headers(r.headers)
603
604 if encoding:
605 try:
606 return str(r.content, encoding)
607 except UnicodeError:
608 tried_encodings.append(encoding)
609
610 # Fall back:
611 try:
612 return str(r.content, encoding, errors="replace")
613 except TypeError:
614 return r.content
615
616
617# The unreserved URI characters (RFC 3986)
618UNRESERVED_SET = frozenset(
619 "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" + "0123456789-._~"
620)
621
622
623def unquote_unreserved(uri):
624 """Un-escape any percent-escape sequences in a URI that are unreserved
625 characters. This leaves all reserved, illegal and non-ASCII bytes encoded.
626
627 :rtype: str
628 """
629 parts = uri.split("%")
630 for i in range(1, len(parts)):
631 h = parts[i][0:2]
632 if len(h) == 2 and h.isalnum():
633 try:
634 c = chr(int(h, 16))
635 except ValueError:
636 raise InvalidURL(f"Invalid percent-escape sequence: '{h}'")
637
638 if c in UNRESERVED_SET:
639 parts[i] = c + parts[i][2:]
640 else:
641 parts[i] = f"%{parts[i]}"
642 else:
643 parts[i] = f"%{parts[i]}"
644 return "".join(parts)
645
646
647def requote_uri(uri):
648 """Re-quote the given URI.
649
650 This function passes the given URI through an unquote/quote cycle to
651 ensure that it is fully and consistently quoted.
652
653 :rtype: str
654 """
655 safe_with_percent = "!#$%&'()*+,/:;=?@[]~"
656 safe_without_percent = "!#$&'()*+,/:;=?@[]~"
657 try:
658 # Unquote only the unreserved characters
659 # Then quote only illegal characters (do not quote reserved,
660 # unreserved, or '%')
661 return quote(unquote_unreserved(uri), safe=safe_with_percent)
662 except InvalidURL:
663 # We couldn't unquote the given URI, so let's try quoting it, but
664 # there may be unquoted '%'s in the URI. We need to make sure they're
665 # properly quoted so they do not cause issues elsewhere.
666 return quote(uri, safe=safe_without_percent)
667
668
669def address_in_network(ip, net):
670 """This function allows you to check if an IP belongs to a network subnet
671
672 Example: returns True if ip = 192.168.1.1 and net = 192.168.1.0/24
673 returns False if ip = 192.168.1.1 and net = 192.168.100.0/24
674
675 :rtype: bool
676 """
677 ipaddr = struct.unpack("=L", socket.inet_aton(ip))[0]
678 netaddr, bits = net.split("/")
679 netmask = struct.unpack("=L", socket.inet_aton(dotted_netmask(int(bits))))[0]
680 network = struct.unpack("=L", socket.inet_aton(netaddr))[0] & netmask
681 return (ipaddr & netmask) == (network & netmask)
682
683
684def dotted_netmask(mask):
685 """Converts mask from /xx format to xxx.xxx.xxx.xxx
686
687 Example: if mask is 24 function returns 255.255.255.0
688
689 :rtype: str
690 """
691 bits = 0xFFFFFFFF ^ (1 << 32 - mask) - 1
692 return socket.inet_ntoa(struct.pack(">I", bits))
693
694
695def is_ipv4_address(string_ip):
696 """
697 :rtype: bool
698 """
699 try:
700 socket.inet_aton(string_ip)
701 except OSError:
702 return False
703 return True
704
705
706def is_valid_cidr(string_network):
707 """
708 Very simple check of the cidr format in no_proxy variable.
709
710 :rtype: bool
711 """
712 if string_network.count("/") == 1:
713 try:
714 mask = int(string_network.split("/")[1])
715 except ValueError:
716 return False
717
718 if mask < 1 or mask > 32:
719 return False
720
721 try:
722 socket.inet_aton(string_network.split("/")[0])
723 except OSError:
724 return False
725 else:
726 return False
727 return True
728
729
730@contextlib.contextmanager
731def set_environ(env_name, value):
732 """Set the environment variable 'env_name' to 'value'
733
734 Save previous value, yield, and then restore the previous value stored in
735 the environment variable 'env_name'.
736
737 If 'value' is None, do nothing"""
738 value_changed = value is not None
739 if value_changed:
740 old_value = os.environ.get(env_name)
741 os.environ[env_name] = value
742 try:
743 yield
744 finally:
745 if value_changed:
746 if old_value is None:
747 del os.environ[env_name]
748 else:
749 os.environ[env_name] = old_value
750
751
752def should_bypass_proxies(url, no_proxy):
753 """
754 Returns whether we should bypass proxies or not.
755
756 :rtype: bool
757 """
758
759 # Prioritize lowercase environment variables over uppercase
760 # to keep a consistent behaviour with other http projects (curl, wget).
761 def get_proxy(key):
762 return os.environ.get(key) or os.environ.get(key.upper())
763
764 # First check whether no_proxy is defined. If it is, check that the URL
765 # we're getting isn't in the no_proxy list.
766 no_proxy_arg = no_proxy
767 if no_proxy is None:
768 no_proxy = get_proxy("no_proxy")
769 parsed = urlparse(url)
770
771 if parsed.hostname is None:
772 # URLs don't always have hostnames, e.g. file:/// urls.
773 return True
774
775 if no_proxy:
776 # We need to check whether we match here. We need to see if we match
777 # the end of the hostname, both with and without the port.
778 no_proxy = (host for host in no_proxy.replace(" ", "").split(",") if host)
779
780 if is_ipv4_address(parsed.hostname):
781 for proxy_ip in no_proxy:
782 if is_valid_cidr(proxy_ip):
783 if address_in_network(parsed.hostname, proxy_ip):
784 return True
785 elif parsed.hostname == proxy_ip:
786 # If no_proxy ip was defined in plain IP notation instead of cidr notation &
787 # matches the IP of the index
788 return True
789 else:
790 host_with_port = parsed.hostname
791 if parsed.port:
792 host_with_port += f":{parsed.port}"
793
794 for host in no_proxy:
795 if parsed.hostname.endswith(host) or host_with_port.endswith(host):
796 # The URL does match something in no_proxy, so we don't want
797 # to apply the proxies on this URL.
798 return True
799
800 with set_environ("no_proxy", no_proxy_arg):
801 # parsed.hostname can be `None` in cases such as a file URI.
802 try:
803 bypass = proxy_bypass(parsed.hostname)
804 except (TypeError, socket.gaierror):
805 bypass = False
806
807 if bypass:
808 return True
809
810 return False
811
812
813def get_environ_proxies(url, no_proxy=None):
814 """
815 Return a dict of environment proxies.
816
817 :rtype: dict
818 """
819 if should_bypass_proxies(url, no_proxy=no_proxy):
820 return {}
821 else:
822 return getproxies()
823
824
825def select_proxy(url, proxies):
826 """Select a proxy for the url, if applicable.
827
828 :param url: The url being for the request
829 :param proxies: A dictionary of schemes or schemes and hosts to proxy URLs
830 """
831 proxies = proxies or {}
832 urlparts = urlparse(url)
833 if urlparts.hostname is None:
834 return proxies.get(urlparts.scheme, proxies.get("all"))
835
836 proxy_keys = [
837 urlparts.scheme + "://" + urlparts.hostname,
838 urlparts.scheme,
839 "all://" + urlparts.hostname,
840 "all",
841 ]
842 proxy = None
843 for proxy_key in proxy_keys:
844 if proxy_key in proxies:
845 proxy = proxies[proxy_key]
846 break
847
848 return proxy
849
850
851def resolve_proxies(request, proxies, trust_env=True):
852 """This method takes proxy information from a request and configuration
853 input to resolve a mapping of target proxies. This will consider settings
854 such as NO_PROXY to strip proxy configurations.
855
856 :param request: Request or PreparedRequest
857 :param proxies: A dictionary of schemes or schemes and hosts to proxy URLs
858 :param trust_env: Boolean declaring whether to trust environment configs
859
860 :rtype: dict
861 """
862 proxies = proxies if proxies is not None else {}
863 url = request.url
864 scheme = urlparse(url).scheme
865 no_proxy = proxies.get("no_proxy")
866 new_proxies = proxies.copy()
867
868 if trust_env and not should_bypass_proxies(url, no_proxy=no_proxy):
869 environ_proxies = get_environ_proxies(url, no_proxy=no_proxy)
870
871 proxy = environ_proxies.get(scheme, environ_proxies.get("all"))
872
873 if proxy:
874 new_proxies.setdefault(scheme, proxy)
875 return new_proxies
876
877
878def default_user_agent(name="python-requests"):
879 """
880 Return a string representing the default user agent.
881
882 :rtype: str
883 """
884 return f"{name}/{__version__}"
885
886
887def default_headers():
888 """
889 :rtype: requests.structures.CaseInsensitiveDict
890 """
891 return CaseInsensitiveDict(
892 {
893 "User-Agent": default_user_agent(),
894 "Accept-Encoding": DEFAULT_ACCEPT_ENCODING,
895 "Accept": "*/*",
896 "Connection": "keep-alive",
897 }
898 )
899
900
901def parse_header_links(value):
902 """Return a list of parsed link headers proxies.
903
904 i.e. Link: <http:/.../front.jpeg>; rel=front; type="image/jpeg",<http://.../back.jpeg>; rel=back;type="image/jpeg"
905
906 :rtype: list
907 """
908
909 links = []
910
911 replace_chars = " '\""
912
913 value = value.strip(replace_chars)
914 if not value:
915 return links
916
917 for val in re.split(", *<", value):
918 try:
919 url, params = val.split(";", 1)
920 except ValueError:
921 url, params = val, ""
922
923 link = {"url": url.strip("<> '\"")}
924
925 for param in params.split(";"):
926 try:
927 key, value = param.split("=")
928 except ValueError:
929 break
930
931 link[key.strip(replace_chars)] = value.strip(replace_chars)
932
933 links.append(link)
934
935 return links
936
937
938# Null bytes; no need to recreate these on each call to guess_json_utf
939_null = "\x00".encode("ascii") # encoding to ASCII for Python 3
940_null2 = _null * 2
941_null3 = _null * 3
942
943
944def guess_json_utf(data):
945 """
946 :rtype: str
947 """
948 # JSON always starts with two ASCII characters, so detection is as
949 # easy as counting the nulls and from their location and count
950 # determine the encoding. Also detect a BOM, if present.
951 sample = data[:4]
952 if sample in (codecs.BOM_UTF32_LE, codecs.BOM_UTF32_BE):
953 return "utf-32" # BOM included
954 if sample[:3] == codecs.BOM_UTF8:
955 return "utf-8-sig" # BOM included, MS style (discouraged)
956 if sample[:2] in (codecs.BOM_UTF16_LE, codecs.BOM_UTF16_BE):
957 return "utf-16" # BOM included
958 nullcount = sample.count(_null)
959 if nullcount == 0:
960 return "utf-8"
961 if nullcount == 2:
962 if sample[::2] == _null2: # 1st and 3rd are null
963 return "utf-16-be"
964 if sample[1::2] == _null2: # 2nd and 4th are null
965 return "utf-16-le"
966 # Did not detect 2 valid UTF-16 ascii-range characters
967 if nullcount == 3:
968 if sample[:3] == _null3:
969 return "utf-32-be"
970 if sample[1:] == _null3:
971 return "utf-32-le"
972 # Did not detect a valid UTF-32 ascii-range character
973 return None
974
975
976def prepend_scheme_if_needed(url, new_scheme):
977 """Given a URL that may or may not have a scheme, prepend the given scheme.
978 Does not replace a present scheme with the one provided as an argument.
979
980 :rtype: str
981 """
982 parsed = parse_url(url)
983 scheme, auth, host, port, path, query, fragment = parsed
984
985 # A defect in urlparse determines that there isn't a netloc present in some
986 # urls. We previously assumed parsing was overly cautious, and swapped the
987 # netloc and path. Due to a lack of tests on the original defect, this is
988 # maintained with parse_url for backwards compatibility.
989 netloc = parsed.netloc
990 if not netloc:
991 netloc, path = path, netloc
992
993 if auth:
994 # parse_url doesn't provide the netloc with auth
995 # so we'll add it ourselves.
996 netloc = "@".join([auth, netloc])
997 if scheme is None:
998 scheme = new_scheme
999 if path is None:
1000 path = ""
1001
1002 return urlunparse((scheme, netloc, path, "", query, fragment))
1003
1004
1005def get_auth_from_url(url):
1006 """Given a url with authentication components, extract them into a tuple of
1007 username,password.
1008
1009 :rtype: (str,str)
1010 """
1011 parsed = urlparse(url)
1012
1013 try:
1014 auth = (unquote(parsed.username), unquote(parsed.password))
1015 except (AttributeError, TypeError):
1016 auth = ("", "")
1017
1018 return auth
1019
1020
1021def check_header_validity(header):
1022 """Verifies that header parts don't contain leading whitespace
1023 reserved characters, or return characters.
1024
1025 :param header: tuple, in the format (name, value).
1026 """
1027 name, value = header
1028 _validate_header_part(header, name, 0)
1029 _validate_header_part(header, value, 1)
1030
1031
1032def _validate_header_part(header, header_part, header_validator_index):
1033 if isinstance(header_part, str):
1034 validator = _HEADER_VALIDATORS_STR[header_validator_index]
1035 elif isinstance(header_part, bytes):
1036 validator = _HEADER_VALIDATORS_BYTE[header_validator_index]
1037 else:
1038 raise InvalidHeader(
1039 f"Header part ({header_part!r}) from {header} "
1040 f"must be of type str or bytes, not {type(header_part)}"
1041 )
1042
1043 if not validator.match(header_part):
1044 header_kind = "name" if header_validator_index == 0 else "value"
1045 raise InvalidHeader(
1046 f"Invalid leading whitespace, reserved character(s), or return "
1047 f"character(s) in header {header_kind}: {header_part!r}"
1048 )
1049
1050
1051def urldefragauth(url):
1052 """
1053 Given a url remove the fragment and the authentication part.
1054
1055 :rtype: str
1056 """
1057 scheme, netloc, path, params, query, fragment = urlparse(url)
1058
1059 # see func:`prepend_scheme_if_needed`
1060 if not netloc:
1061 netloc, path = path, netloc
1062
1063 netloc = netloc.rsplit("@", 1)[-1]
1064
1065 return urlunparse((scheme, netloc, path, params, query, ""))
1066
1067
1068def rewind_body(prepared_request):
1069 """Move file pointer back to its recorded starting position
1070 so it can be read again on redirect.
1071 """
1072 body_seek = getattr(prepared_request.body, "seek", None)
1073 if body_seek is not None and isinstance(
1074 prepared_request._body_position, integer_types
1075 ):
1076 try:
1077 body_seek(prepared_request._body_position)
1078 except OSError:
1079 raise UnrewindableBodyError(
1080 "An error occurred when rewinding request body for redirect."
1081 )
1082 else:
1083 raise UnrewindableBodyError("Unable to rewind request body for redirect.")