Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/requests/utils.py: 32%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

480 statements  

1""" 

2requests.utils 

3~~~~~~~~~~~~~~ 

4 

5This module provides utility functions that are used within Requests 

6that are also useful for external consumption. 

7""" 

8 

9import codecs 

10import contextlib 

11import io 

12import os 

13import re 

14import socket 

15import struct 

16import sys 

17import tempfile 

18import warnings 

19import zipfile 

20from collections import OrderedDict 

21 

22from urllib3.util import make_headers, parse_url 

23 

24from . import certs 

25from .__version__ import __version__ 

26 

27# to_native_string is unused here, but imported here for backwards compatibility 

28from ._internal_utils import ( # noqa: F401 

29 _HEADER_VALIDATORS_BYTE, 

30 _HEADER_VALIDATORS_STR, 

31 HEADER_VALIDATORS, 

32 to_native_string, 

33) 

34from .compat import ( 

35 Mapping, 

36 basestring, 

37 bytes, 

38 getproxies, 

39 getproxies_environment, 

40 integer_types, 

41 is_urllib3_1, 

42 proxy_bypass, 

43 proxy_bypass_environment, 

44 quote, 

45 str, 

46 unquote, 

47 urlparse, 

48 urlunparse, 

49) 

50from .compat import parse_http_list as _parse_list_header 

51from .cookies import cookiejar_from_dict 

52from .exceptions import ( 

53 FileModeWarning, 

54 InvalidHeader, 

55 InvalidURL, 

56 UnrewindableBodyError, 

57) 

58from .structures import CaseInsensitiveDict 

59 

60NETRC_FILES = (".netrc", "_netrc") 

61 

62DEFAULT_CA_BUNDLE_PATH = certs.where() 

63 

64DEFAULT_PORTS = {"http": 80, "https": 443} 

65 

66# Ensure that ', ' is used to preserve previous delimiter behavior. 

67DEFAULT_ACCEPT_ENCODING = ", ".join( 

68 re.split(r",\s*", make_headers(accept_encoding=True)["accept-encoding"]) 

69) 

70 

71 

72if sys.platform == "win32": 

73 # provide a proxy_bypass version on Windows without DNS lookups 

74 

75 def proxy_bypass_registry(host): 

76 try: 

77 import winreg 

78 except ImportError: 

79 return False 

80 

81 try: 

82 internetSettings = winreg.OpenKey( 

83 winreg.HKEY_CURRENT_USER, 

84 r"Software\Microsoft\Windows\CurrentVersion\Internet Settings", 

85 ) 

86 # ProxyEnable could be REG_SZ or REG_DWORD, normalizing it 

87 proxyEnable = int(winreg.QueryValueEx(internetSettings, "ProxyEnable")[0]) 

88 # ProxyOverride is almost always a string 

89 proxyOverride = winreg.QueryValueEx(internetSettings, "ProxyOverride")[0] 

90 except (OSError, ValueError): 

91 return False 

92 if not proxyEnable or not proxyOverride: 

93 return False 

94 

95 # make a check value list from the registry entry: replace the 

96 # '<local>' string by the localhost entry and the corresponding 

97 # canonical entry. 

98 proxyOverride = proxyOverride.split(";") 

99 # filter out empty strings to avoid re.match return true in the following code. 

100 proxyOverride = filter(None, proxyOverride) 

101 # now check if we match one of the registry values. 

102 for test in proxyOverride: 

103 if test == "<local>": 

104 if "." not in host: 

105 return True 

106 test = test.replace(".", r"\.") # mask dots 

107 test = test.replace("*", r".*") # change glob sequence 

108 test = test.replace("?", r".") # change glob char 

109 if re.match(test, host, re.I): 

110 return True 

111 return False 

112 

113 def proxy_bypass(host): # noqa 

114 """Return True, if the host should be bypassed. 

115 

116 Checks proxy settings gathered from the environment, if specified, 

117 or the registry. 

118 """ 

119 if getproxies_environment(): 

120 return proxy_bypass_environment(host) 

121 else: 

122 return proxy_bypass_registry(host) 

123 

124 

125def dict_to_sequence(d): 

126 """Returns an internal sequence dictionary update.""" 

127 

128 if hasattr(d, "items"): 

129 d = d.items() 

130 

131 return d 

132 

133 

134def super_len(o): 

135 total_length = None 

136 current_position = 0 

137 

138 if not is_urllib3_1 and isinstance(o, str): 

139 # urllib3 2.x+ treats all strings as utf-8 instead 

140 # of latin-1 (iso-8859-1) like http.client. 

141 o = o.encode("utf-8") 

142 

143 if hasattr(o, "__len__"): 

144 total_length = len(o) 

145 

146 elif hasattr(o, "len"): 

147 total_length = o.len 

148 

149 elif hasattr(o, "fileno"): 

150 try: 

151 fileno = o.fileno() 

152 except (io.UnsupportedOperation, AttributeError): 

153 # AttributeError is a surprising exception, seeing as how we've just checked 

154 # that `hasattr(o, 'fileno')`. It happens for objects obtained via 

155 # `Tarfile.extractfile()`, per issue 5229. 

156 pass 

157 else: 

158 total_length = os.fstat(fileno).st_size 

159 

160 # Having used fstat to determine the file length, we need to 

161 # confirm that this file was opened up in binary mode. 

162 if "b" not in o.mode: 

163 warnings.warn( 

164 ( 

165 "Requests has determined the content-length for this " 

166 "request using the binary size of the file: however, the " 

167 "file has been opened in text mode (i.e. without the 'b' " 

168 "flag in the mode). This may lead to an incorrect " 

169 "content-length. In Requests 3.0, support will be removed " 

170 "for files in text mode." 

171 ), 

172 FileModeWarning, 

173 ) 

174 

175 if hasattr(o, "tell"): 

176 try: 

177 current_position = o.tell() 

178 except OSError: 

179 # This can happen in some weird situations, such as when the file 

180 # is actually a special file descriptor like stdin. In this 

181 # instance, we don't know what the length is, so set it to zero and 

182 # let requests chunk it instead. 

183 if total_length is not None: 

184 current_position = total_length 

185 else: 

186 if hasattr(o, "seek") and total_length is None: 

187 # StringIO and BytesIO have seek but no usable fileno 

188 try: 

189 # seek to end of file 

190 o.seek(0, 2) 

191 total_length = o.tell() 

192 

193 # seek back to current position to support 

194 # partially read file-like objects 

195 o.seek(current_position or 0) 

196 except OSError: 

197 total_length = 0 

198 

199 if total_length is None: 

200 total_length = 0 

201 

202 return max(0, total_length - current_position) 

203 

204 

205def get_netrc_auth(url, raise_errors=False): 

206 """Returns the Requests tuple auth for a given url from netrc.""" 

207 

208 netrc_file = os.environ.get("NETRC") 

209 if netrc_file is not None: 

210 netrc_locations = (netrc_file,) 

211 else: 

212 netrc_locations = (f"~/{f}" for f in NETRC_FILES) 

213 

214 try: 

215 from netrc import NetrcParseError, netrc 

216 

217 netrc_path = None 

218 

219 for f in netrc_locations: 

220 loc = os.path.expanduser(f) 

221 if os.path.exists(loc): 

222 netrc_path = loc 

223 break 

224 

225 # Abort early if there isn't one. 

226 if netrc_path is None: 

227 return 

228 

229 ri = urlparse(url) 

230 host = ri.hostname 

231 

232 try: 

233 _netrc = netrc(netrc_path).authenticators(host) 

234 if _netrc and any(_netrc): 

235 # Return with login / password 

236 login_i = 0 if _netrc[0] else 1 

237 return (_netrc[login_i], _netrc[2]) 

238 except (NetrcParseError, OSError): 

239 # If there was a parsing error or a permissions issue reading the file, 

240 # we'll just skip netrc auth unless explicitly asked to raise errors. 

241 if raise_errors: 

242 raise 

243 

244 # App Engine hackiness. 

245 except (ImportError, AttributeError): 

246 pass 

247 

248 

249def guess_filename(obj): 

250 """Tries to guess the filename of the given object.""" 

251 name = getattr(obj, "name", None) 

252 if name and isinstance(name, basestring) and name[0] != "<" and name[-1] != ">": 

253 return os.path.basename(name) 

254 

255 

256def extract_zipped_paths(path): 

257 """Replace nonexistent paths that look like they refer to a member of a zip 

258 archive with the location of an extracted copy of the target, or else 

259 just return the provided path unchanged. 

260 """ 

261 if os.path.exists(path): 

262 # this is already a valid path, no need to do anything further 

263 return path 

264 

265 # find the first valid part of the provided path and treat that as a zip archive 

266 # assume the rest of the path is the name of a member in the archive 

267 archive, member = os.path.split(path) 

268 while archive and not os.path.exists(archive): 

269 archive, prefix = os.path.split(archive) 

270 if not prefix: 

271 # If we don't check for an empty prefix after the split (in other words, archive remains unchanged after the split), 

272 # we _can_ end up in an infinite loop on a rare corner case affecting a small number of users 

273 break 

274 member = "/".join([prefix, member]) 

275 

276 if not zipfile.is_zipfile(archive): 

277 return path 

278 

279 zip_file = zipfile.ZipFile(archive) 

280 if member not in zip_file.namelist(): 

281 return path 

282 

283 # we have a valid zip archive and a valid member of that archive 

284 tmp = tempfile.gettempdir() 

285 extracted_path = os.path.join(tmp, member.split("/")[-1]) 

286 if not os.path.exists(extracted_path): 

287 # use read + write to avoid the creating nested folders, we only want the file, avoids mkdir racing condition 

288 with atomic_open(extracted_path) as file_handler: 

289 file_handler.write(zip_file.read(member)) 

290 return extracted_path 

291 

292 

293@contextlib.contextmanager 

294def atomic_open(filename): 

295 """Write a file to the disk in an atomic fashion""" 

296 tmp_descriptor, tmp_name = tempfile.mkstemp(dir=os.path.dirname(filename)) 

297 try: 

298 with os.fdopen(tmp_descriptor, "wb") as tmp_handler: 

299 yield tmp_handler 

300 os.replace(tmp_name, filename) 

301 except BaseException: 

302 os.remove(tmp_name) 

303 raise 

304 

305 

306def from_key_val_list(value): 

307 """Take an object and test to see if it can be represented as a 

308 dictionary. Unless it can not be represented as such, return an 

309 OrderedDict, e.g., 

310 

311 :: 

312 

313 >>> from_key_val_list([('key', 'val')]) 

314 OrderedDict([('key', 'val')]) 

315 >>> from_key_val_list('string') 

316 Traceback (most recent call last): 

317 ... 

318 ValueError: cannot encode objects that are not 2-tuples 

319 >>> from_key_val_list({'key': 'val'}) 

320 OrderedDict([('key', 'val')]) 

321 

322 :rtype: OrderedDict 

323 """ 

324 if value is None: 

325 return None 

326 

327 if isinstance(value, (str, bytes, bool, int)): 

328 raise ValueError("cannot encode objects that are not 2-tuples") 

329 

330 return OrderedDict(value) 

331 

332 

333def to_key_val_list(value): 

334 """Take an object and test to see if it can be represented as a 

335 dictionary. If it can be, return a list of tuples, e.g., 

336 

337 :: 

338 

339 >>> to_key_val_list([('key', 'val')]) 

340 [('key', 'val')] 

341 >>> to_key_val_list({'key': 'val'}) 

342 [('key', 'val')] 

343 >>> to_key_val_list('string') 

344 Traceback (most recent call last): 

345 ... 

346 ValueError: cannot encode objects that are not 2-tuples 

347 

348 :rtype: list 

349 """ 

350 if value is None: 

351 return None 

352 

353 if isinstance(value, (str, bytes, bool, int)): 

354 raise ValueError("cannot encode objects that are not 2-tuples") 

355 

356 if isinstance(value, Mapping): 

357 value = value.items() 

358 

359 return list(value) 

360 

361 

362# From mitsuhiko/werkzeug (used with permission). 

363def parse_list_header(value): 

364 """Parse lists as described by RFC 2068 Section 2. 

365 

366 In particular, parse comma-separated lists where the elements of 

367 the list may include quoted-strings. A quoted-string could 

368 contain a comma. A non-quoted string could have quotes in the 

369 middle. Quotes are removed automatically after parsing. 

370 

371 It basically works like :func:`parse_set_header` just that items 

372 may appear multiple times and case sensitivity is preserved. 

373 

374 The return value is a standard :class:`list`: 

375 

376 >>> parse_list_header('token, "quoted value"') 

377 ['token', 'quoted value'] 

378 

379 To create a header from the :class:`list` again, use the 

380 :func:`dump_header` function. 

381 

382 :param value: a string with a list header. 

383 :return: :class:`list` 

384 :rtype: list 

385 """ 

386 result = [] 

387 for item in _parse_list_header(value): 

388 if item[:1] == item[-1:] == '"': 

389 item = unquote_header_value(item[1:-1]) 

390 result.append(item) 

391 return result 

392 

393 

394# From mitsuhiko/werkzeug (used with permission). 

395def parse_dict_header(value): 

396 """Parse lists of key, value pairs as described by RFC 2068 Section 2 and 

397 convert them into a python dict: 

398 

399 >>> d = parse_dict_header('foo="is a fish", bar="as well"') 

400 >>> type(d) is dict 

401 True 

402 >>> sorted(d.items()) 

403 [('bar', 'as well'), ('foo', 'is a fish')] 

404 

405 If there is no value for a key it will be `None`: 

406 

407 >>> parse_dict_header('key_without_value') 

408 {'key_without_value': None} 

409 

410 To create a header from the :class:`dict` again, use the 

411 :func:`dump_header` function. 

412 

413 :param value: a string with a dict header. 

414 :return: :class:`dict` 

415 :rtype: dict 

416 """ 

417 result = {} 

418 for item in _parse_list_header(value): 

419 if "=" not in item: 

420 result[item] = None 

421 continue 

422 name, value = item.split("=", 1) 

423 if value[:1] == value[-1:] == '"': 

424 value = unquote_header_value(value[1:-1]) 

425 result[name] = value 

426 return result 

427 

428 

429# From mitsuhiko/werkzeug (used with permission). 

430def unquote_header_value(value, is_filename=False): 

431 r"""Unquotes a header value. (Reversal of :func:`quote_header_value`). 

432 This does not use the real unquoting but what browsers are actually 

433 using for quoting. 

434 

435 :param value: the header value to unquote. 

436 :rtype: str 

437 """ 

438 if value and value[0] == value[-1] == '"': 

439 # this is not the real unquoting, but fixing this so that the 

440 # RFC is met will result in bugs with internet explorer and 

441 # probably some other browsers as well. IE for example is 

442 # uploading files with "C:\foo\bar.txt" as filename 

443 value = value[1:-1] 

444 

445 # if this is a filename and the starting characters look like 

446 # a UNC path, then just return the value without quotes. Using the 

447 # replace sequence below on a UNC path has the effect of turning 

448 # the leading double slash into a single slash and then 

449 # _fix_ie_filename() doesn't work correctly. See #458. 

450 if not is_filename or value[:2] != "\\\\": 

451 return value.replace("\\\\", "\\").replace('\\"', '"') 

452 return value 

453 

454 

455def dict_from_cookiejar(cj): 

456 """Returns a key/value dictionary from a CookieJar. 

457 

458 :param cj: CookieJar object to extract cookies from. 

459 :rtype: dict 

460 """ 

461 

462 cookie_dict = {cookie.name: cookie.value for cookie in cj} 

463 return cookie_dict 

464 

465 

466def add_dict_to_cookiejar(cj, cookie_dict): 

467 """Returns a CookieJar from a key/value dictionary. 

468 

469 :param cj: CookieJar to insert cookies into. 

470 :param cookie_dict: Dict of key/values to insert into CookieJar. 

471 :rtype: CookieJar 

472 """ 

473 

474 return cookiejar_from_dict(cookie_dict, cj) 

475 

476 

477def get_encodings_from_content(content): 

478 """Returns encodings from given content string. 

479 

480 :param content: bytestring to extract encodings from. 

481 """ 

482 warnings.warn( 

483 ( 

484 "In requests 3.0, get_encodings_from_content will be removed. For " 

485 "more information, please see the discussion on issue #2266. (This" 

486 " warning should only appear once.)" 

487 ), 

488 DeprecationWarning, 

489 ) 

490 

491 charset_re = re.compile(r'<meta.*?charset=["\']*(.+?)["\'>]', flags=re.I) 

492 pragma_re = re.compile(r'<meta.*?content=["\']*;?charset=(.+?)["\'>]', flags=re.I) 

493 xml_re = re.compile(r'^<\?xml.*?encoding=["\']*(.+?)["\'>]') 

494 

495 return ( 

496 charset_re.findall(content) 

497 + pragma_re.findall(content) 

498 + xml_re.findall(content) 

499 ) 

500 

501 

502def _parse_content_type_header(header): 

503 """Returns content type and parameters from given header 

504 

505 :param header: string 

506 :return: tuple containing content type and dictionary of 

507 parameters 

508 """ 

509 

510 tokens = header.split(";") 

511 content_type, params = tokens[0].strip(), tokens[1:] 

512 params_dict = {} 

513 items_to_strip = "\"' " 

514 

515 for param in params: 

516 param = param.strip() 

517 if param: 

518 key, value = param, True 

519 index_of_equals = param.find("=") 

520 if index_of_equals != -1: 

521 key = param[:index_of_equals].strip(items_to_strip) 

522 value = param[index_of_equals + 1 :].strip(items_to_strip) 

523 params_dict[key.lower()] = value 

524 return content_type, params_dict 

525 

526 

527def get_encoding_from_headers(headers): 

528 """Returns encodings from given HTTP Header Dict. 

529 

530 :param headers: dictionary to extract encoding from. 

531 :rtype: str 

532 """ 

533 

534 content_type = headers.get("content-type") 

535 

536 if not content_type: 

537 return None 

538 

539 content_type, params = _parse_content_type_header(content_type) 

540 

541 if "charset" in params: 

542 return params["charset"].strip("'\"") 

543 

544 if "text" in content_type: 

545 return "ISO-8859-1" 

546 

547 if "application/json" in content_type: 

548 # Assume UTF-8 based on RFC 4627: https://www.ietf.org/rfc/rfc4627.txt since the charset was unset 

549 return "utf-8" 

550 

551 

552def stream_decode_response_unicode(iterator, r): 

553 """Stream decodes an iterator.""" 

554 

555 if r.encoding is None: 

556 yield from iterator 

557 return 

558 

559 decoder = codecs.getincrementaldecoder(r.encoding)(errors="replace") 

560 for chunk in iterator: 

561 rv = decoder.decode(chunk) 

562 if rv: 

563 yield rv 

564 rv = decoder.decode(b"", final=True) 

565 if rv: 

566 yield rv 

567 

568 

569def iter_slices(string, slice_length): 

570 """Iterate over slices of a string.""" 

571 pos = 0 

572 if slice_length is None or slice_length <= 0: 

573 slice_length = len(string) 

574 while pos < len(string): 

575 yield string[pos : pos + slice_length] 

576 pos += slice_length 

577 

578 

579def get_unicode_from_response(r): 

580 """Returns the requested content back in unicode. 

581 

582 :param r: Response object to get unicode content from. 

583 

584 Tried: 

585 

586 1. charset from content-type 

587 2. fall back and replace all unicode characters 

588 

589 :rtype: str 

590 """ 

591 warnings.warn( 

592 ( 

593 "In requests 3.0, get_unicode_from_response will be removed. For " 

594 "more information, please see the discussion on issue #2266. (This" 

595 " warning should only appear once.)" 

596 ), 

597 DeprecationWarning, 

598 ) 

599 

600 tried_encodings = [] 

601 

602 # Try charset from content-type 

603 encoding = get_encoding_from_headers(r.headers) 

604 

605 if encoding: 

606 try: 

607 return str(r.content, encoding) 

608 except UnicodeError: 

609 tried_encodings.append(encoding) 

610 

611 # Fall back: 

612 try: 

613 return str(r.content, encoding, errors="replace") 

614 except TypeError: 

615 return r.content 

616 

617 

618# The unreserved URI characters (RFC 3986) 

619UNRESERVED_SET = frozenset( 

620 "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" + "0123456789-._~" 

621) 

622 

623 

624def unquote_unreserved(uri): 

625 """Un-escape any percent-escape sequences in a URI that are unreserved 

626 characters. This leaves all reserved, illegal and non-ASCII bytes encoded. 

627 

628 :rtype: str 

629 """ 

630 parts = uri.split("%") 

631 for i in range(1, len(parts)): 

632 h = parts[i][0:2] 

633 if len(h) == 2 and h.isalnum(): 

634 try: 

635 c = chr(int(h, 16)) 

636 except ValueError: 

637 raise InvalidURL(f"Invalid percent-escape sequence: '{h}'") 

638 

639 if c in UNRESERVED_SET: 

640 parts[i] = c + parts[i][2:] 

641 else: 

642 parts[i] = f"%{parts[i]}" 

643 else: 

644 parts[i] = f"%{parts[i]}" 

645 return "".join(parts) 

646 

647 

648def requote_uri(uri): 

649 """Re-quote the given URI. 

650 

651 This function passes the given URI through an unquote/quote cycle to 

652 ensure that it is fully and consistently quoted. 

653 

654 :rtype: str 

655 """ 

656 safe_with_percent = "!#$%&'()*+,/:;=?@[]~" 

657 safe_without_percent = "!#$&'()*+,/:;=?@[]~" 

658 try: 

659 # Unquote only the unreserved characters 

660 # Then quote only illegal characters (do not quote reserved, 

661 # unreserved, or '%') 

662 return quote(unquote_unreserved(uri), safe=safe_with_percent) 

663 except InvalidURL: 

664 # We couldn't unquote the given URI, so let's try quoting it, but 

665 # there may be unquoted '%'s in the URI. We need to make sure they're 

666 # properly quoted so they do not cause issues elsewhere. 

667 return quote(uri, safe=safe_without_percent) 

668 

669 

670def address_in_network(ip, net): 

671 """This function allows you to check if an IP belongs to a network subnet 

672 

673 Example: returns True if ip = 192.168.1.1 and net = 192.168.1.0/24 

674 returns False if ip = 192.168.1.1 and net = 192.168.100.0/24 

675 

676 :rtype: bool 

677 """ 

678 ipaddr = struct.unpack("=L", socket.inet_aton(ip))[0] 

679 netaddr, bits = net.split("/") 

680 netmask = struct.unpack("=L", socket.inet_aton(dotted_netmask(int(bits))))[0] 

681 network = struct.unpack("=L", socket.inet_aton(netaddr))[0] & netmask 

682 return (ipaddr & netmask) == (network & netmask) 

683 

684 

685def dotted_netmask(mask): 

686 """Converts mask from /xx format to xxx.xxx.xxx.xxx 

687 

688 Example: if mask is 24 function returns 255.255.255.0 

689 

690 :rtype: str 

691 """ 

692 bits = 0xFFFFFFFF ^ (1 << 32 - mask) - 1 

693 return socket.inet_ntoa(struct.pack(">I", bits)) 

694 

695 

696def is_ipv4_address(string_ip): 

697 """ 

698 :rtype: bool 

699 """ 

700 try: 

701 socket.inet_aton(string_ip) 

702 except OSError: 

703 return False 

704 return True 

705 

706 

707def is_valid_cidr(string_network): 

708 """ 

709 Very simple check of the cidr format in no_proxy variable. 

710 

711 :rtype: bool 

712 """ 

713 if string_network.count("/") == 1: 

714 try: 

715 mask = int(string_network.split("/")[1]) 

716 except ValueError: 

717 return False 

718 

719 if mask < 1 or mask > 32: 

720 return False 

721 

722 try: 

723 socket.inet_aton(string_network.split("/")[0]) 

724 except OSError: 

725 return False 

726 else: 

727 return False 

728 return True 

729 

730 

731@contextlib.contextmanager 

732def set_environ(env_name, value): 

733 """Set the environment variable 'env_name' to 'value' 

734 

735 Save previous value, yield, and then restore the previous value stored in 

736 the environment variable 'env_name'. 

737 

738 If 'value' is None, do nothing""" 

739 value_changed = value is not None 

740 if value_changed: 

741 old_value = os.environ.get(env_name) 

742 os.environ[env_name] = value 

743 try: 

744 yield 

745 finally: 

746 if value_changed: 

747 if old_value is None: 

748 del os.environ[env_name] 

749 else: 

750 os.environ[env_name] = old_value 

751 

752 

753def should_bypass_proxies(url, no_proxy): 

754 """ 

755 Returns whether we should bypass proxies or not. 

756 

757 :rtype: bool 

758 """ 

759 

760 # Prioritize lowercase environment variables over uppercase 

761 # to keep a consistent behaviour with other http projects (curl, wget). 

762 def get_proxy(key): 

763 return os.environ.get(key) or os.environ.get(key.upper()) 

764 

765 # First check whether no_proxy is defined. If it is, check that the URL 

766 # we're getting isn't in the no_proxy list. 

767 no_proxy_arg = no_proxy 

768 if no_proxy is None: 

769 no_proxy = get_proxy("no_proxy") 

770 parsed = urlparse(url) 

771 

772 if parsed.hostname is None: 

773 # URLs don't always have hostnames, e.g. file:/// urls. 

774 return True 

775 

776 if no_proxy: 

777 # We need to check whether we match here. We need to see if we match 

778 # the end of the hostname, both with and without the port. 

779 no_proxy = (host for host in no_proxy.replace(" ", "").split(",") if host) 

780 

781 if is_ipv4_address(parsed.hostname): 

782 for proxy_ip in no_proxy: 

783 if is_valid_cidr(proxy_ip): 

784 if address_in_network(parsed.hostname, proxy_ip): 

785 return True 

786 elif parsed.hostname == proxy_ip: 

787 # If no_proxy ip was defined in plain IP notation instead of cidr notation & 

788 # matches the IP of the index 

789 return True 

790 else: 

791 host_with_port = parsed.hostname 

792 if parsed.port: 

793 host_with_port += f":{parsed.port}" 

794 

795 for host in no_proxy: 

796 if parsed.hostname.endswith(host) or host_with_port.endswith(host): 

797 # The URL does match something in no_proxy, so we don't want 

798 # to apply the proxies on this URL. 

799 return True 

800 

801 with set_environ("no_proxy", no_proxy_arg): 

802 # parsed.hostname can be `None` in cases such as a file URI. 

803 try: 

804 bypass = proxy_bypass(parsed.hostname) 

805 except (TypeError, socket.gaierror): 

806 bypass = False 

807 

808 if bypass: 

809 return True 

810 

811 return False 

812 

813 

814def get_environ_proxies(url, no_proxy=None): 

815 """ 

816 Return a dict of environment proxies. 

817 

818 :rtype: dict 

819 """ 

820 if should_bypass_proxies(url, no_proxy=no_proxy): 

821 return {} 

822 else: 

823 return getproxies() 

824 

825 

826def select_proxy(url, proxies): 

827 """Select a proxy for the url, if applicable. 

828 

829 :param url: The url being for the request 

830 :param proxies: A dictionary of schemes or schemes and hosts to proxy URLs 

831 """ 

832 proxies = proxies or {} 

833 urlparts = urlparse(url) 

834 if urlparts.hostname is None: 

835 return proxies.get(urlparts.scheme, proxies.get("all")) 

836 

837 proxy_keys = [ 

838 urlparts.scheme + "://" + urlparts.hostname, 

839 urlparts.scheme, 

840 "all://" + urlparts.hostname, 

841 "all", 

842 ] 

843 proxy = None 

844 for proxy_key in proxy_keys: 

845 if proxy_key in proxies: 

846 proxy = proxies[proxy_key] 

847 break 

848 

849 return proxy 

850 

851 

852def resolve_proxies(request, proxies, trust_env=True): 

853 """This method takes proxy information from a request and configuration 

854 input to resolve a mapping of target proxies. This will consider settings 

855 such as NO_PROXY to strip proxy configurations. 

856 

857 :param request: Request or PreparedRequest 

858 :param proxies: A dictionary of schemes or schemes and hosts to proxy URLs 

859 :param trust_env: Boolean declaring whether to trust environment configs 

860 

861 :rtype: dict 

862 """ 

863 proxies = proxies if proxies is not None else {} 

864 url = request.url 

865 scheme = urlparse(url).scheme 

866 no_proxy = proxies.get("no_proxy") 

867 new_proxies = proxies.copy() 

868 

869 if trust_env and not should_bypass_proxies(url, no_proxy=no_proxy): 

870 environ_proxies = get_environ_proxies(url, no_proxy=no_proxy) 

871 

872 proxy = environ_proxies.get(scheme, environ_proxies.get("all")) 

873 

874 if proxy: 

875 new_proxies.setdefault(scheme, proxy) 

876 return new_proxies 

877 

878 

879def default_user_agent(name="python-requests"): 

880 """ 

881 Return a string representing the default user agent. 

882 

883 :rtype: str 

884 """ 

885 return f"{name}/{__version__}" 

886 

887 

888def default_headers(): 

889 """ 

890 :rtype: requests.structures.CaseInsensitiveDict 

891 """ 

892 return CaseInsensitiveDict( 

893 { 

894 "User-Agent": default_user_agent(), 

895 "Accept-Encoding": DEFAULT_ACCEPT_ENCODING, 

896 "Accept": "*/*", 

897 "Connection": "keep-alive", 

898 } 

899 ) 

900 

901 

902def parse_header_links(value): 

903 """Return a list of parsed link headers proxies. 

904 

905 i.e. Link: <http:/.../front.jpeg>; rel=front; type="image/jpeg",<http://.../back.jpeg>; rel=back;type="image/jpeg" 

906 

907 :rtype: list 

908 """ 

909 

910 links = [] 

911 

912 replace_chars = " '\"" 

913 

914 value = value.strip(replace_chars) 

915 if not value: 

916 return links 

917 

918 for val in re.split(", *<", value): 

919 try: 

920 url, params = val.split(";", 1) 

921 except ValueError: 

922 url, params = val, "" 

923 

924 link = {"url": url.strip("<> '\"")} 

925 

926 for param in params.split(";"): 

927 try: 

928 key, value = param.split("=") 

929 except ValueError: 

930 break 

931 

932 link[key.strip(replace_chars)] = value.strip(replace_chars) 

933 

934 links.append(link) 

935 

936 return links 

937 

938 

939# Null bytes; no need to recreate these on each call to guess_json_utf 

940_null = "\x00".encode("ascii") # encoding to ASCII for Python 3 

941_null2 = _null * 2 

942_null3 = _null * 3 

943 

944 

945def guess_json_utf(data): 

946 """ 

947 :rtype: str 

948 """ 

949 # JSON always starts with two ASCII characters, so detection is as 

950 # easy as counting the nulls and from their location and count 

951 # determine the encoding. Also detect a BOM, if present. 

952 sample = data[:4] 

953 if sample in (codecs.BOM_UTF32_LE, codecs.BOM_UTF32_BE): 

954 return "utf-32" # BOM included 

955 if sample[:3] == codecs.BOM_UTF8: 

956 return "utf-8-sig" # BOM included, MS style (discouraged) 

957 if sample[:2] in (codecs.BOM_UTF16_LE, codecs.BOM_UTF16_BE): 

958 return "utf-16" # BOM included 

959 nullcount = sample.count(_null) 

960 if nullcount == 0: 

961 return "utf-8" 

962 if nullcount == 2: 

963 if sample[::2] == _null2: # 1st and 3rd are null 

964 return "utf-16-be" 

965 if sample[1::2] == _null2: # 2nd and 4th are null 

966 return "utf-16-le" 

967 # Did not detect 2 valid UTF-16 ascii-range characters 

968 if nullcount == 3: 

969 if sample[:3] == _null3: 

970 return "utf-32-be" 

971 if sample[1:] == _null3: 

972 return "utf-32-le" 

973 # Did not detect a valid UTF-32 ascii-range character 

974 return None 

975 

976 

977def prepend_scheme_if_needed(url, new_scheme): 

978 """Given a URL that may or may not have a scheme, prepend the given scheme. 

979 Does not replace a present scheme with the one provided as an argument. 

980 

981 :rtype: str 

982 """ 

983 parsed = parse_url(url) 

984 scheme, auth, host, port, path, query, fragment = parsed 

985 

986 # A defect in urlparse determines that there isn't a netloc present in some 

987 # urls. We previously assumed parsing was overly cautious, and swapped the 

988 # netloc and path. Due to a lack of tests on the original defect, this is 

989 # maintained with parse_url for backwards compatibility. 

990 netloc = parsed.netloc 

991 if not netloc: 

992 netloc, path = path, netloc 

993 

994 if auth: 

995 # parse_url doesn't provide the netloc with auth 

996 # so we'll add it ourselves. 

997 netloc = "@".join([auth, netloc]) 

998 if scheme is None: 

999 scheme = new_scheme 

1000 if path is None: 

1001 path = "" 

1002 

1003 return urlunparse((scheme, netloc, path, "", query, fragment)) 

1004 

1005 

1006def get_auth_from_url(url): 

1007 """Given a url with authentication components, extract them into a tuple of 

1008 username,password. 

1009 

1010 :rtype: (str,str) 

1011 """ 

1012 parsed = urlparse(url) 

1013 

1014 try: 

1015 auth = (unquote(parsed.username), unquote(parsed.password)) 

1016 except (AttributeError, TypeError): 

1017 auth = ("", "") 

1018 

1019 return auth 

1020 

1021 

1022def check_header_validity(header): 

1023 """Verifies that header parts don't contain leading whitespace 

1024 reserved characters, or return characters. 

1025 

1026 :param header: tuple, in the format (name, value). 

1027 """ 

1028 name, value = header 

1029 _validate_header_part(header, name, 0) 

1030 _validate_header_part(header, value, 1) 

1031 

1032 

1033def _validate_header_part(header, header_part, header_validator_index): 

1034 if isinstance(header_part, str): 

1035 validator = _HEADER_VALIDATORS_STR[header_validator_index] 

1036 elif isinstance(header_part, bytes): 

1037 validator = _HEADER_VALIDATORS_BYTE[header_validator_index] 

1038 else: 

1039 raise InvalidHeader( 

1040 f"Header part ({header_part!r}) from {header} " 

1041 f"must be of type str or bytes, not {type(header_part)}" 

1042 ) 

1043 

1044 if not validator.match(header_part): 

1045 header_kind = "name" if header_validator_index == 0 else "value" 

1046 raise InvalidHeader( 

1047 f"Invalid leading whitespace, reserved character(s), or return " 

1048 f"character(s) in header {header_kind}: {header_part!r}" 

1049 ) 

1050 

1051 

1052def urldefragauth(url): 

1053 """ 

1054 Given a url remove the fragment and the authentication part. 

1055 

1056 :rtype: str 

1057 """ 

1058 scheme, netloc, path, params, query, fragment = urlparse(url) 

1059 

1060 # see func:`prepend_scheme_if_needed` 

1061 if not netloc: 

1062 netloc, path = path, netloc 

1063 

1064 netloc = netloc.rsplit("@", 1)[-1] 

1065 

1066 return urlunparse((scheme, netloc, path, params, query, "")) 

1067 

1068 

1069def rewind_body(prepared_request): 

1070 """Move file pointer back to its recorded starting position 

1071 so it can be read again on redirect. 

1072 """ 

1073 body_seek = getattr(prepared_request.body, "seek", None) 

1074 if body_seek is not None and isinstance( 

1075 prepared_request._body_position, integer_types 

1076 ): 

1077 try: 

1078 body_seek(prepared_request._body_position) 

1079 except OSError: 

1080 raise UnrewindableBodyError( 

1081 "An error occurred when rewinding request body for redirect." 

1082 ) 

1083 else: 

1084 raise UnrewindableBodyError("Unable to rewind request body for redirect.")