Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/requests/utils.py: 16%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

481 statements  

1""" 

2requests.utils 

3~~~~~~~~~~~~~~ 

4 

5This module provides utility functions that are used within Requests 

6that are also useful for external consumption. 

7""" 

8 

9import codecs 

10import contextlib 

11import io 

12import os 

13import re 

14import socket 

15import struct 

16import sys 

17import tempfile 

18import warnings 

19import zipfile 

20from collections import OrderedDict 

21 

22from urllib3.util import make_headers, parse_url 

23 

24from . import certs 

25from .__version__ import __version__ 

26 

27# to_native_string is unused here, but imported here for backwards compatibility 

28from ._internal_utils import ( # noqa: F401 

29 _HEADER_VALIDATORS_BYTE, 

30 _HEADER_VALIDATORS_STR, 

31 HEADER_VALIDATORS, 

32 to_native_string, 

33) 

34from .compat import ( 

35 Mapping, 

36 basestring, 

37 bytes, 

38 getproxies, 

39 getproxies_environment, 

40 integer_types, 

41 is_urllib3_1, 

42) 

43from .compat import parse_http_list as _parse_list_header 

44from .compat import ( 

45 proxy_bypass, 

46 proxy_bypass_environment, 

47 quote, 

48 str, 

49 unquote, 

50 urlparse, 

51 urlunparse, 

52) 

53from .cookies import cookiejar_from_dict 

54from .exceptions import ( 

55 FileModeWarning, 

56 InvalidHeader, 

57 InvalidURL, 

58 UnrewindableBodyError, 

59) 

60from .structures import CaseInsensitiveDict 

61 

62NETRC_FILES = (".netrc", "_netrc") 

63 

64DEFAULT_CA_BUNDLE_PATH = certs.where() 

65 

66DEFAULT_PORTS = {"http": 80, "https": 443} 

67 

68# Ensure that ', ' is used to preserve previous delimiter behavior. 

69DEFAULT_ACCEPT_ENCODING = ", ".join( 

70 re.split(r",\s*", make_headers(accept_encoding=True)["accept-encoding"]) 

71) 

72 

73 

74if sys.platform == "win32": 

75 # provide a proxy_bypass version on Windows without DNS lookups 

76 

77 def proxy_bypass_registry(host): 

78 try: 

79 import winreg 

80 except ImportError: 

81 return False 

82 

83 try: 

84 internetSettings = winreg.OpenKey( 

85 winreg.HKEY_CURRENT_USER, 

86 r"Software\Microsoft\Windows\CurrentVersion\Internet Settings", 

87 ) 

88 # ProxyEnable could be REG_SZ or REG_DWORD, normalizing it 

89 proxyEnable = int(winreg.QueryValueEx(internetSettings, "ProxyEnable")[0]) 

90 # ProxyOverride is almost always a string 

91 proxyOverride = winreg.QueryValueEx(internetSettings, "ProxyOverride")[0] 

92 except (OSError, ValueError): 

93 return False 

94 if not proxyEnable or not proxyOverride: 

95 return False 

96 

97 # make a check value list from the registry entry: replace the 

98 # '<local>' string by the localhost entry and the corresponding 

99 # canonical entry. 

100 proxyOverride = proxyOverride.split(";") 

101 # filter out empty strings to avoid re.match return true in the following code. 

102 proxyOverride = filter(None, proxyOverride) 

103 # now check if we match one of the registry values. 

104 for test in proxyOverride: 

105 if test == "<local>": 

106 if "." not in host: 

107 return True 

108 test = test.replace(".", r"\.") # mask dots 

109 test = test.replace("*", r".*") # change glob sequence 

110 test = test.replace("?", r".") # change glob char 

111 if re.match(test, host, re.I): 

112 return True 

113 return False 

114 

115 def proxy_bypass(host): # noqa 

116 """Return True, if the host should be bypassed. 

117 

118 Checks proxy settings gathered from the environment, if specified, 

119 or the registry. 

120 """ 

121 if getproxies_environment(): 

122 return proxy_bypass_environment(host) 

123 else: 

124 return proxy_bypass_registry(host) 

125 

126 

127def dict_to_sequence(d): 

128 """Returns an internal sequence dictionary update.""" 

129 

130 if hasattr(d, "items"): 

131 d = d.items() 

132 

133 return d 

134 

135 

136def super_len(o): 

137 total_length = None 

138 current_position = 0 

139 

140 if not is_urllib3_1 and isinstance(o, str): 

141 # urllib3 2.x+ treats all strings as utf-8 instead 

142 # of latin-1 (iso-8859-1) like http.client. 

143 o = o.encode("utf-8") 

144 

145 if hasattr(o, "__len__"): 

146 total_length = len(o) 

147 

148 elif hasattr(o, "len"): 

149 total_length = o.len 

150 

151 elif hasattr(o, "fileno"): 

152 try: 

153 fileno = o.fileno() 

154 except (io.UnsupportedOperation, AttributeError): 

155 # AttributeError is a surprising exception, seeing as how we've just checked 

156 # that `hasattr(o, 'fileno')`. It happens for objects obtained via 

157 # `Tarfile.extractfile()`, per issue 5229. 

158 pass 

159 else: 

160 total_length = os.fstat(fileno).st_size 

161 

162 # Having used fstat to determine the file length, we need to 

163 # confirm that this file was opened up in binary mode. 

164 if "b" not in o.mode: 

165 warnings.warn( 

166 ( 

167 "Requests has determined the content-length for this " 

168 "request using the binary size of the file: however, the " 

169 "file has been opened in text mode (i.e. without the 'b' " 

170 "flag in the mode). This may lead to an incorrect " 

171 "content-length. In Requests 3.0, support will be removed " 

172 "for files in text mode." 

173 ), 

174 FileModeWarning, 

175 ) 

176 

177 if hasattr(o, "tell"): 

178 try: 

179 current_position = o.tell() 

180 except OSError: 

181 # This can happen in some weird situations, such as when the file 

182 # is actually a special file descriptor like stdin. In this 

183 # instance, we don't know what the length is, so set it to zero and 

184 # let requests chunk it instead. 

185 if total_length is not None: 

186 current_position = total_length 

187 else: 

188 if hasattr(o, "seek") and total_length is None: 

189 # StringIO and BytesIO have seek but no usable fileno 

190 try: 

191 # seek to end of file 

192 o.seek(0, 2) 

193 total_length = o.tell() 

194 

195 # seek back to current position to support 

196 # partially read file-like objects 

197 o.seek(current_position or 0) 

198 except OSError: 

199 total_length = 0 

200 

201 if total_length is None: 

202 total_length = 0 

203 

204 return max(0, total_length - current_position) 

205 

206 

207def get_netrc_auth(url, raise_errors=False): 

208 """Returns the Requests tuple auth for a given url from netrc.""" 

209 

210 netrc_file = os.environ.get("NETRC") 

211 if netrc_file is not None: 

212 netrc_locations = (netrc_file,) 

213 else: 

214 netrc_locations = (f"~/{f}" for f in NETRC_FILES) 

215 

216 try: 

217 from netrc import NetrcParseError, netrc 

218 

219 netrc_path = None 

220 

221 for f in netrc_locations: 

222 loc = os.path.expanduser(f) 

223 if os.path.exists(loc): 

224 netrc_path = loc 

225 break 

226 

227 # Abort early if there isn't one. 

228 if netrc_path is None: 

229 return 

230 

231 ri = urlparse(url) 

232 host = ri.hostname 

233 

234 try: 

235 _netrc = netrc(netrc_path).authenticators(host) 

236 if _netrc: 

237 # Return with login / password 

238 login_i = 0 if _netrc[0] else 1 

239 return (_netrc[login_i], _netrc[2]) 

240 except (NetrcParseError, OSError): 

241 # If there was a parsing error or a permissions issue reading the file, 

242 # we'll just skip netrc auth unless explicitly asked to raise errors. 

243 if raise_errors: 

244 raise 

245 

246 # App Engine hackiness. 

247 except (ImportError, AttributeError): 

248 pass 

249 

250 

251def guess_filename(obj): 

252 """Tries to guess the filename of the given object.""" 

253 name = getattr(obj, "name", None) 

254 if name and isinstance(name, basestring) and name[0] != "<" and name[-1] != ">": 

255 return os.path.basename(name) 

256 

257 

258def extract_zipped_paths(path): 

259 """Replace nonexistent paths that look like they refer to a member of a zip 

260 archive with the location of an extracted copy of the target, or else 

261 just return the provided path unchanged. 

262 """ 

263 if os.path.exists(path): 

264 # this is already a valid path, no need to do anything further 

265 return path 

266 

267 # find the first valid part of the provided path and treat that as a zip archive 

268 # assume the rest of the path is the name of a member in the archive 

269 archive, member = os.path.split(path) 

270 while archive and not os.path.exists(archive): 

271 archive, prefix = os.path.split(archive) 

272 if not prefix: 

273 # If we don't check for an empty prefix after the split (in other words, archive remains unchanged after the split), 

274 # we _can_ end up in an infinite loop on a rare corner case affecting a small number of users 

275 break 

276 member = "/".join([prefix, member]) 

277 

278 if not zipfile.is_zipfile(archive): 

279 return path 

280 

281 zip_file = zipfile.ZipFile(archive) 

282 if member not in zip_file.namelist(): 

283 return path 

284 

285 # we have a valid zip archive and a valid member of that archive 

286 tmp = tempfile.gettempdir() 

287 extracted_path = os.path.join(tmp, member.split("/")[-1]) 

288 if not os.path.exists(extracted_path): 

289 # use read + write to avoid the creating nested folders, we only want the file, avoids mkdir racing condition 

290 with atomic_open(extracted_path) as file_handler: 

291 file_handler.write(zip_file.read(member)) 

292 return extracted_path 

293 

294 

295@contextlib.contextmanager 

296def atomic_open(filename): 

297 """Write a file to the disk in an atomic fashion""" 

298 tmp_descriptor, tmp_name = tempfile.mkstemp(dir=os.path.dirname(filename)) 

299 try: 

300 with os.fdopen(tmp_descriptor, "wb") as tmp_handler: 

301 yield tmp_handler 

302 os.replace(tmp_name, filename) 

303 except BaseException: 

304 os.remove(tmp_name) 

305 raise 

306 

307 

308def from_key_val_list(value): 

309 """Take an object and test to see if it can be represented as a 

310 dictionary. Unless it can not be represented as such, return an 

311 OrderedDict, e.g., 

312 

313 :: 

314 

315 >>> from_key_val_list([('key', 'val')]) 

316 OrderedDict([('key', 'val')]) 

317 >>> from_key_val_list('string') 

318 Traceback (most recent call last): 

319 ... 

320 ValueError: cannot encode objects that are not 2-tuples 

321 >>> from_key_val_list({'key': 'val'}) 

322 OrderedDict([('key', 'val')]) 

323 

324 :rtype: OrderedDict 

325 """ 

326 if value is None: 

327 return None 

328 

329 if isinstance(value, (str, bytes, bool, int)): 

330 raise ValueError("cannot encode objects that are not 2-tuples") 

331 

332 return OrderedDict(value) 

333 

334 

335def to_key_val_list(value): 

336 """Take an object and test to see if it can be represented as a 

337 dictionary. If it can be, return a list of tuples, e.g., 

338 

339 :: 

340 

341 >>> to_key_val_list([('key', 'val')]) 

342 [('key', 'val')] 

343 >>> to_key_val_list({'key': 'val'}) 

344 [('key', 'val')] 

345 >>> to_key_val_list('string') 

346 Traceback (most recent call last): 

347 ... 

348 ValueError: cannot encode objects that are not 2-tuples 

349 

350 :rtype: list 

351 """ 

352 if value is None: 

353 return None 

354 

355 if isinstance(value, (str, bytes, bool, int)): 

356 raise ValueError("cannot encode objects that are not 2-tuples") 

357 

358 if isinstance(value, Mapping): 

359 value = value.items() 

360 

361 return list(value) 

362 

363 

364# From mitsuhiko/werkzeug (used with permission). 

365def parse_list_header(value): 

366 """Parse lists as described by RFC 2068 Section 2. 

367 

368 In particular, parse comma-separated lists where the elements of 

369 the list may include quoted-strings. A quoted-string could 

370 contain a comma. A non-quoted string could have quotes in the 

371 middle. Quotes are removed automatically after parsing. 

372 

373 It basically works like :func:`parse_set_header` just that items 

374 may appear multiple times and case sensitivity is preserved. 

375 

376 The return value is a standard :class:`list`: 

377 

378 >>> parse_list_header('token, "quoted value"') 

379 ['token', 'quoted value'] 

380 

381 To create a header from the :class:`list` again, use the 

382 :func:`dump_header` function. 

383 

384 :param value: a string with a list header. 

385 :return: :class:`list` 

386 :rtype: list 

387 """ 

388 result = [] 

389 for item in _parse_list_header(value): 

390 if item[:1] == item[-1:] == '"': 

391 item = unquote_header_value(item[1:-1]) 

392 result.append(item) 

393 return result 

394 

395 

396# From mitsuhiko/werkzeug (used with permission). 

397def parse_dict_header(value): 

398 """Parse lists of key, value pairs as described by RFC 2068 Section 2 and 

399 convert them into a python dict: 

400 

401 >>> d = parse_dict_header('foo="is a fish", bar="as well"') 

402 >>> type(d) is dict 

403 True 

404 >>> sorted(d.items()) 

405 [('bar', 'as well'), ('foo', 'is a fish')] 

406 

407 If there is no value for a key it will be `None`: 

408 

409 >>> parse_dict_header('key_without_value') 

410 {'key_without_value': None} 

411 

412 To create a header from the :class:`dict` again, use the 

413 :func:`dump_header` function. 

414 

415 :param value: a string with a dict header. 

416 :return: :class:`dict` 

417 :rtype: dict 

418 """ 

419 result = {} 

420 for item in _parse_list_header(value): 

421 if "=" not in item: 

422 result[item] = None 

423 continue 

424 name, value = item.split("=", 1) 

425 if value[:1] == value[-1:] == '"': 

426 value = unquote_header_value(value[1:-1]) 

427 result[name] = value 

428 return result 

429 

430 

431# From mitsuhiko/werkzeug (used with permission). 

432def unquote_header_value(value, is_filename=False): 

433 r"""Unquotes a header value. (Reversal of :func:`quote_header_value`). 

434 This does not use the real unquoting but what browsers are actually 

435 using for quoting. 

436 

437 :param value: the header value to unquote. 

438 :rtype: str 

439 """ 

440 if value and value[0] == value[-1] == '"': 

441 # this is not the real unquoting, but fixing this so that the 

442 # RFC is met will result in bugs with internet explorer and 

443 # probably some other browsers as well. IE for example is 

444 # uploading files with "C:\foo\bar.txt" as filename 

445 value = value[1:-1] 

446 

447 # if this is a filename and the starting characters look like 

448 # a UNC path, then just return the value without quotes. Using the 

449 # replace sequence below on a UNC path has the effect of turning 

450 # the leading double slash into a single slash and then 

451 # _fix_ie_filename() doesn't work correctly. See #458. 

452 if not is_filename or value[:2] != "\\\\": 

453 return value.replace("\\\\", "\\").replace('\\"', '"') 

454 return value 

455 

456 

457def dict_from_cookiejar(cj): 

458 """Returns a key/value dictionary from a CookieJar. 

459 

460 :param cj: CookieJar object to extract cookies from. 

461 :rtype: dict 

462 """ 

463 

464 cookie_dict = {cookie.name: cookie.value for cookie in cj} 

465 return cookie_dict 

466 

467 

468def add_dict_to_cookiejar(cj, cookie_dict): 

469 """Returns a CookieJar from a key/value dictionary. 

470 

471 :param cj: CookieJar to insert cookies into. 

472 :param cookie_dict: Dict of key/values to insert into CookieJar. 

473 :rtype: CookieJar 

474 """ 

475 

476 return cookiejar_from_dict(cookie_dict, cj) 

477 

478 

479def get_encodings_from_content(content): 

480 """Returns encodings from given content string. 

481 

482 :param content: bytestring to extract encodings from. 

483 """ 

484 warnings.warn( 

485 ( 

486 "In requests 3.0, get_encodings_from_content will be removed. For " 

487 "more information, please see the discussion on issue #2266. (This" 

488 " warning should only appear once.)" 

489 ), 

490 DeprecationWarning, 

491 ) 

492 

493 charset_re = re.compile(r'<meta.*?charset=["\']*(.+?)["\'>]', flags=re.I) 

494 pragma_re = re.compile(r'<meta.*?content=["\']*;?charset=(.+?)["\'>]', flags=re.I) 

495 xml_re = re.compile(r'^<\?xml.*?encoding=["\']*(.+?)["\'>]') 

496 

497 return ( 

498 charset_re.findall(content) 

499 + pragma_re.findall(content) 

500 + xml_re.findall(content) 

501 ) 

502 

503 

504def _parse_content_type_header(header): 

505 """Returns content type and parameters from given header 

506 

507 :param header: string 

508 :return: tuple containing content type and dictionary of 

509 parameters 

510 """ 

511 

512 tokens = header.split(";") 

513 content_type, params = tokens[0].strip(), tokens[1:] 

514 params_dict = {} 

515 items_to_strip = "\"' " 

516 

517 for param in params: 

518 param = param.strip() 

519 if param: 

520 key, value = param, True 

521 index_of_equals = param.find("=") 

522 if index_of_equals != -1: 

523 key = param[:index_of_equals].strip(items_to_strip) 

524 value = param[index_of_equals + 1 :].strip(items_to_strip) 

525 params_dict[key.lower()] = value 

526 return content_type, params_dict 

527 

528 

529def get_encoding_from_headers(headers): 

530 """Returns encodings from given HTTP Header Dict. 

531 

532 :param headers: dictionary to extract encoding from. 

533 :rtype: str 

534 """ 

535 

536 content_type = headers.get("content-type") 

537 

538 if not content_type: 

539 return None 

540 

541 content_type, params = _parse_content_type_header(content_type) 

542 

543 if "charset" in params: 

544 return params["charset"].strip("'\"") 

545 

546 if "text" in content_type: 

547 return "ISO-8859-1" 

548 

549 if "application/json" in content_type: 

550 # Assume UTF-8 based on RFC 4627: https://www.ietf.org/rfc/rfc4627.txt since the charset was unset 

551 return "utf-8" 

552 

553 

554def stream_decode_response_unicode(iterator, r): 

555 """Stream decodes an iterator.""" 

556 

557 if r.encoding is None: 

558 yield from iterator 

559 return 

560 

561 decoder = codecs.getincrementaldecoder(r.encoding)(errors="replace") 

562 for chunk in iterator: 

563 rv = decoder.decode(chunk) 

564 if rv: 

565 yield rv 

566 rv = decoder.decode(b"", final=True) 

567 if rv: 

568 yield rv 

569 

570 

571def iter_slices(string, slice_length): 

572 """Iterate over slices of a string.""" 

573 pos = 0 

574 if slice_length is None or slice_length <= 0: 

575 slice_length = len(string) 

576 while pos < len(string): 

577 yield string[pos : pos + slice_length] 

578 pos += slice_length 

579 

580 

581def get_unicode_from_response(r): 

582 """Returns the requested content back in unicode. 

583 

584 :param r: Response object to get unicode content from. 

585 

586 Tried: 

587 

588 1. charset from content-type 

589 2. fall back and replace all unicode characters 

590 

591 :rtype: str 

592 """ 

593 warnings.warn( 

594 ( 

595 "In requests 3.0, get_unicode_from_response will be removed. For " 

596 "more information, please see the discussion on issue #2266. (This" 

597 " warning should only appear once.)" 

598 ), 

599 DeprecationWarning, 

600 ) 

601 

602 tried_encodings = [] 

603 

604 # Try charset from content-type 

605 encoding = get_encoding_from_headers(r.headers) 

606 

607 if encoding: 

608 try: 

609 return str(r.content, encoding) 

610 except UnicodeError: 

611 tried_encodings.append(encoding) 

612 

613 # Fall back: 

614 try: 

615 return str(r.content, encoding, errors="replace") 

616 except TypeError: 

617 return r.content 

618 

619 

620# The unreserved URI characters (RFC 3986) 

621UNRESERVED_SET = frozenset( 

622 "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" + "0123456789-._~" 

623) 

624 

625 

626def unquote_unreserved(uri): 

627 """Un-escape any percent-escape sequences in a URI that are unreserved 

628 characters. This leaves all reserved, illegal and non-ASCII bytes encoded. 

629 

630 :rtype: str 

631 """ 

632 parts = uri.split("%") 

633 for i in range(1, len(parts)): 

634 h = parts[i][0:2] 

635 if len(h) == 2 and h.isalnum(): 

636 try: 

637 c = chr(int(h, 16)) 

638 except ValueError: 

639 raise InvalidURL(f"Invalid percent-escape sequence: '{h}'") 

640 

641 if c in UNRESERVED_SET: 

642 parts[i] = c + parts[i][2:] 

643 else: 

644 parts[i] = f"%{parts[i]}" 

645 else: 

646 parts[i] = f"%{parts[i]}" 

647 return "".join(parts) 

648 

649 

650def requote_uri(uri): 

651 """Re-quote the given URI. 

652 

653 This function passes the given URI through an unquote/quote cycle to 

654 ensure that it is fully and consistently quoted. 

655 

656 :rtype: str 

657 """ 

658 safe_with_percent = "!#$%&'()*+,/:;=?@[]~" 

659 safe_without_percent = "!#$&'()*+,/:;=?@[]~" 

660 try: 

661 # Unquote only the unreserved characters 

662 # Then quote only illegal characters (do not quote reserved, 

663 # unreserved, or '%') 

664 return quote(unquote_unreserved(uri), safe=safe_with_percent) 

665 except InvalidURL: 

666 # We couldn't unquote the given URI, so let's try quoting it, but 

667 # there may be unquoted '%'s in the URI. We need to make sure they're 

668 # properly quoted so they do not cause issues elsewhere. 

669 return quote(uri, safe=safe_without_percent) 

670 

671 

672def address_in_network(ip, net): 

673 """This function allows you to check if an IP belongs to a network subnet 

674 

675 Example: returns True if ip = 192.168.1.1 and net = 192.168.1.0/24 

676 returns False if ip = 192.168.1.1 and net = 192.168.100.0/24 

677 

678 :rtype: bool 

679 """ 

680 ipaddr = struct.unpack("=L", socket.inet_aton(ip))[0] 

681 netaddr, bits = net.split("/") 

682 netmask = struct.unpack("=L", socket.inet_aton(dotted_netmask(int(bits))))[0] 

683 network = struct.unpack("=L", socket.inet_aton(netaddr))[0] & netmask 

684 return (ipaddr & netmask) == (network & netmask) 

685 

686 

687def dotted_netmask(mask): 

688 """Converts mask from /xx format to xxx.xxx.xxx.xxx 

689 

690 Example: if mask is 24 function returns 255.255.255.0 

691 

692 :rtype: str 

693 """ 

694 bits = 0xFFFFFFFF ^ (1 << 32 - mask) - 1 

695 return socket.inet_ntoa(struct.pack(">I", bits)) 

696 

697 

698def is_ipv4_address(string_ip): 

699 """ 

700 :rtype: bool 

701 """ 

702 try: 

703 socket.inet_aton(string_ip) 

704 except OSError: 

705 return False 

706 return True 

707 

708 

709def is_valid_cidr(string_network): 

710 """ 

711 Very simple check of the cidr format in no_proxy variable. 

712 

713 :rtype: bool 

714 """ 

715 if string_network.count("/") == 1: 

716 try: 

717 mask = int(string_network.split("/")[1]) 

718 except ValueError: 

719 return False 

720 

721 if mask < 1 or mask > 32: 

722 return False 

723 

724 try: 

725 socket.inet_aton(string_network.split("/")[0]) 

726 except OSError: 

727 return False 

728 else: 

729 return False 

730 return True 

731 

732 

733@contextlib.contextmanager 

734def set_environ(env_name, value): 

735 """Set the environment variable 'env_name' to 'value' 

736 

737 Save previous value, yield, and then restore the previous value stored in 

738 the environment variable 'env_name'. 

739 

740 If 'value' is None, do nothing""" 

741 value_changed = value is not None 

742 if value_changed: 

743 old_value = os.environ.get(env_name) 

744 os.environ[env_name] = value 

745 try: 

746 yield 

747 finally: 

748 if value_changed: 

749 if old_value is None: 

750 del os.environ[env_name] 

751 else: 

752 os.environ[env_name] = old_value 

753 

754 

755def should_bypass_proxies(url, no_proxy): 

756 """ 

757 Returns whether we should bypass proxies or not. 

758 

759 :rtype: bool 

760 """ 

761 

762 # Prioritize lowercase environment variables over uppercase 

763 # to keep a consistent behaviour with other http projects (curl, wget). 

764 def get_proxy(key): 

765 return os.environ.get(key) or os.environ.get(key.upper()) 

766 

767 # First check whether no_proxy is defined. If it is, check that the URL 

768 # we're getting isn't in the no_proxy list. 

769 no_proxy_arg = no_proxy 

770 if no_proxy is None: 

771 no_proxy = get_proxy("no_proxy") 

772 parsed = urlparse(url) 

773 

774 if parsed.hostname is None: 

775 # URLs don't always have hostnames, e.g. file:/// urls. 

776 return True 

777 

778 if no_proxy: 

779 # We need to check whether we match here. We need to see if we match 

780 # the end of the hostname, both with and without the port. 

781 no_proxy = (host for host in no_proxy.replace(" ", "").split(",") if host) 

782 

783 if is_ipv4_address(parsed.hostname): 

784 for proxy_ip in no_proxy: 

785 if is_valid_cidr(proxy_ip): 

786 if address_in_network(parsed.hostname, proxy_ip): 

787 return True 

788 elif parsed.hostname == proxy_ip: 

789 # If no_proxy ip was defined in plain IP notation instead of cidr notation & 

790 # matches the IP of the index 

791 return True 

792 else: 

793 host_with_port = parsed.hostname 

794 if parsed.port: 

795 host_with_port += f":{parsed.port}" 

796 

797 for host in no_proxy: 

798 if parsed.hostname.endswith(host) or host_with_port.endswith(host): 

799 # The URL does match something in no_proxy, so we don't want 

800 # to apply the proxies on this URL. 

801 return True 

802 

803 with set_environ("no_proxy", no_proxy_arg): 

804 # parsed.hostname can be `None` in cases such as a file URI. 

805 try: 

806 bypass = proxy_bypass(parsed.hostname) 

807 except (TypeError, socket.gaierror): 

808 bypass = False 

809 

810 if bypass: 

811 return True 

812 

813 return False 

814 

815 

816def get_environ_proxies(url, no_proxy=None): 

817 """ 

818 Return a dict of environment proxies. 

819 

820 :rtype: dict 

821 """ 

822 if should_bypass_proxies(url, no_proxy=no_proxy): 

823 return {} 

824 else: 

825 return getproxies() 

826 

827 

828def select_proxy(url, proxies): 

829 """Select a proxy for the url, if applicable. 

830 

831 :param url: The url being for the request 

832 :param proxies: A dictionary of schemes or schemes and hosts to proxy URLs 

833 """ 

834 proxies = proxies or {} 

835 urlparts = urlparse(url) 

836 if urlparts.hostname is None: 

837 return proxies.get(urlparts.scheme, proxies.get("all")) 

838 

839 proxy_keys = [ 

840 urlparts.scheme + "://" + urlparts.hostname, 

841 urlparts.scheme, 

842 "all://" + urlparts.hostname, 

843 "all", 

844 ] 

845 proxy = None 

846 for proxy_key in proxy_keys: 

847 if proxy_key in proxies: 

848 proxy = proxies[proxy_key] 

849 break 

850 

851 return proxy 

852 

853 

854def resolve_proxies(request, proxies, trust_env=True): 

855 """This method takes proxy information from a request and configuration 

856 input to resolve a mapping of target proxies. This will consider settings 

857 such as NO_PROXY to strip proxy configurations. 

858 

859 :param request: Request or PreparedRequest 

860 :param proxies: A dictionary of schemes or schemes and hosts to proxy URLs 

861 :param trust_env: Boolean declaring whether to trust environment configs 

862 

863 :rtype: dict 

864 """ 

865 proxies = proxies if proxies is not None else {} 

866 url = request.url 

867 scheme = urlparse(url).scheme 

868 no_proxy = proxies.get("no_proxy") 

869 new_proxies = proxies.copy() 

870 

871 if trust_env and not should_bypass_proxies(url, no_proxy=no_proxy): 

872 environ_proxies = get_environ_proxies(url, no_proxy=no_proxy) 

873 

874 proxy = environ_proxies.get(scheme, environ_proxies.get("all")) 

875 

876 if proxy: 

877 new_proxies.setdefault(scheme, proxy) 

878 return new_proxies 

879 

880 

881def default_user_agent(name="python-requests"): 

882 """ 

883 Return a string representing the default user agent. 

884 

885 :rtype: str 

886 """ 

887 return f"{name}/{__version__}" 

888 

889 

890def default_headers(): 

891 """ 

892 :rtype: requests.structures.CaseInsensitiveDict 

893 """ 

894 return CaseInsensitiveDict( 

895 { 

896 "User-Agent": default_user_agent(), 

897 "Accept-Encoding": DEFAULT_ACCEPT_ENCODING, 

898 "Accept": "*/*", 

899 "Connection": "keep-alive", 

900 } 

901 ) 

902 

903 

904def parse_header_links(value): 

905 """Return a list of parsed link headers proxies. 

906 

907 i.e. Link: <http:/.../front.jpeg>; rel=front; type="image/jpeg",<http://.../back.jpeg>; rel=back;type="image/jpeg" 

908 

909 :rtype: list 

910 """ 

911 

912 links = [] 

913 

914 replace_chars = " '\"" 

915 

916 value = value.strip(replace_chars) 

917 if not value: 

918 return links 

919 

920 for val in re.split(", *<", value): 

921 try: 

922 url, params = val.split(";", 1) 

923 except ValueError: 

924 url, params = val, "" 

925 

926 link = {"url": url.strip("<> '\"")} 

927 

928 for param in params.split(";"): 

929 try: 

930 key, value = param.split("=") 

931 except ValueError: 

932 break 

933 

934 link[key.strip(replace_chars)] = value.strip(replace_chars) 

935 

936 links.append(link) 

937 

938 return links 

939 

940 

941# Null bytes; no need to recreate these on each call to guess_json_utf 

942_null = "\x00".encode("ascii") # encoding to ASCII for Python 3 

943_null2 = _null * 2 

944_null3 = _null * 3 

945 

946 

947def guess_json_utf(data): 

948 """ 

949 :rtype: str 

950 """ 

951 # JSON always starts with two ASCII characters, so detection is as 

952 # easy as counting the nulls and from their location and count 

953 # determine the encoding. Also detect a BOM, if present. 

954 sample = data[:4] 

955 if sample in (codecs.BOM_UTF32_LE, codecs.BOM_UTF32_BE): 

956 return "utf-32" # BOM included 

957 if sample[:3] == codecs.BOM_UTF8: 

958 return "utf-8-sig" # BOM included, MS style (discouraged) 

959 if sample[:2] in (codecs.BOM_UTF16_LE, codecs.BOM_UTF16_BE): 

960 return "utf-16" # BOM included 

961 nullcount = sample.count(_null) 

962 if nullcount == 0: 

963 return "utf-8" 

964 if nullcount == 2: 

965 if sample[::2] == _null2: # 1st and 3rd are null 

966 return "utf-16-be" 

967 if sample[1::2] == _null2: # 2nd and 4th are null 

968 return "utf-16-le" 

969 # Did not detect 2 valid UTF-16 ascii-range characters 

970 if nullcount == 3: 

971 if sample[:3] == _null3: 

972 return "utf-32-be" 

973 if sample[1:] == _null3: 

974 return "utf-32-le" 

975 # Did not detect a valid UTF-32 ascii-range character 

976 return None 

977 

978 

979def prepend_scheme_if_needed(url, new_scheme): 

980 """Given a URL that may or may not have a scheme, prepend the given scheme. 

981 Does not replace a present scheme with the one provided as an argument. 

982 

983 :rtype: str 

984 """ 

985 parsed = parse_url(url) 

986 scheme, auth, host, port, path, query, fragment = parsed 

987 

988 # A defect in urlparse determines that there isn't a netloc present in some 

989 # urls. We previously assumed parsing was overly cautious, and swapped the 

990 # netloc and path. Due to a lack of tests on the original defect, this is 

991 # maintained with parse_url for backwards compatibility. 

992 netloc = parsed.netloc 

993 if not netloc: 

994 netloc, path = path, netloc 

995 

996 if auth: 

997 # parse_url doesn't provide the netloc with auth 

998 # so we'll add it ourselves. 

999 netloc = "@".join([auth, netloc]) 

1000 if scheme is None: 

1001 scheme = new_scheme 

1002 if path is None: 

1003 path = "" 

1004 

1005 return urlunparse((scheme, netloc, path, "", query, fragment)) 

1006 

1007 

1008def get_auth_from_url(url): 

1009 """Given a url with authentication components, extract them into a tuple of 

1010 username,password. 

1011 

1012 :rtype: (str,str) 

1013 """ 

1014 parsed = urlparse(url) 

1015 

1016 try: 

1017 auth = (unquote(parsed.username), unquote(parsed.password)) 

1018 except (AttributeError, TypeError): 

1019 auth = ("", "") 

1020 

1021 return auth 

1022 

1023 

1024def check_header_validity(header): 

1025 """Verifies that header parts don't contain leading whitespace 

1026 reserved characters, or return characters. 

1027 

1028 :param header: tuple, in the format (name, value). 

1029 """ 

1030 name, value = header 

1031 _validate_header_part(header, name, 0) 

1032 _validate_header_part(header, value, 1) 

1033 

1034 

1035def _validate_header_part(header, header_part, header_validator_index): 

1036 if isinstance(header_part, str): 

1037 validator = _HEADER_VALIDATORS_STR[header_validator_index] 

1038 elif isinstance(header_part, bytes): 

1039 validator = _HEADER_VALIDATORS_BYTE[header_validator_index] 

1040 else: 

1041 raise InvalidHeader( 

1042 f"Header part ({header_part!r}) from {header} " 

1043 f"must be of type str or bytes, not {type(header_part)}" 

1044 ) 

1045 

1046 if not validator.match(header_part): 

1047 header_kind = "name" if header_validator_index == 0 else "value" 

1048 raise InvalidHeader( 

1049 f"Invalid leading whitespace, reserved character(s), or return " 

1050 f"character(s) in header {header_kind}: {header_part!r}" 

1051 ) 

1052 

1053 

1054def urldefragauth(url): 

1055 """ 

1056 Given a url remove the fragment and the authentication part. 

1057 

1058 :rtype: str 

1059 """ 

1060 scheme, netloc, path, params, query, fragment = urlparse(url) 

1061 

1062 # see func:`prepend_scheme_if_needed` 

1063 if not netloc: 

1064 netloc, path = path, netloc 

1065 

1066 netloc = netloc.rsplit("@", 1)[-1] 

1067 

1068 return urlunparse((scheme, netloc, path, params, query, "")) 

1069 

1070 

1071def rewind_body(prepared_request): 

1072 """Move file pointer back to its recorded starting position 

1073 so it can be read again on redirect. 

1074 """ 

1075 body_seek = getattr(prepared_request.body, "seek", None) 

1076 if body_seek is not None and isinstance( 

1077 prepared_request._body_position, integer_types 

1078 ): 

1079 try: 

1080 body_seek(prepared_request._body_position) 

1081 except OSError: 

1082 raise UnrewindableBodyError( 

1083 "An error occurred when rewinding request body for redirect." 

1084 ) 

1085 else: 

1086 raise UnrewindableBodyError("Unable to rewind request body for redirect.")