Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/requests/utils.py: 15%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

486 statements  

1""" 

2requests.utils 

3~~~~~~~~~~~~~~ 

4 

5This module provides utility functions that are used within Requests 

6that are also useful for external consumption. 

7""" 

8 

9import codecs 

10import contextlib 

11import io 

12import os 

13import re 

14import socket 

15import struct 

16import sys 

17import tempfile 

18import warnings 

19import zipfile 

20from collections import OrderedDict 

21 

22from urllib3.util import make_headers, parse_url 

23 

24from . import certs 

25from .__version__ import __version__ 

26 

27# to_native_string is unused here, but imported here for backwards compatibility 

28from ._internal_utils import ( # noqa: F401 

29 _HEADER_VALIDATORS_BYTE, 

30 _HEADER_VALIDATORS_STR, 

31 HEADER_VALIDATORS, 

32 to_native_string, 

33) 

34from .compat import ( 

35 Mapping, 

36 basestring, 

37 bytes, 

38 getproxies, 

39 getproxies_environment, 

40 integer_types, 

41) 

42from .compat import parse_http_list as _parse_list_header 

43from .compat import ( 

44 proxy_bypass, 

45 proxy_bypass_environment, 

46 quote, 

47 str, 

48 unquote, 

49 urlparse, 

50 urlunparse, 

51) 

52from .cookies import cookiejar_from_dict 

53from .exceptions import ( 

54 FileModeWarning, 

55 InvalidHeader, 

56 InvalidURL, 

57 UnrewindableBodyError, 

58) 

59from .structures import CaseInsensitiveDict 

60 

61NETRC_FILES = (".netrc", "_netrc") 

62 

63DEFAULT_CA_BUNDLE_PATH = certs.where() 

64 

65DEFAULT_PORTS = {"http": 80, "https": 443} 

66 

67# Ensure that ', ' is used to preserve previous delimiter behavior. 

68DEFAULT_ACCEPT_ENCODING = ", ".join( 

69 re.split(r",\s*", make_headers(accept_encoding=True)["accept-encoding"]) 

70) 

71 

72 

73if sys.platform == "win32": 

74 # provide a proxy_bypass version on Windows without DNS lookups 

75 

76 def proxy_bypass_registry(host): 

77 try: 

78 import winreg 

79 except ImportError: 

80 return False 

81 

82 try: 

83 internetSettings = winreg.OpenKey( 

84 winreg.HKEY_CURRENT_USER, 

85 r"Software\Microsoft\Windows\CurrentVersion\Internet Settings", 

86 ) 

87 # ProxyEnable could be REG_SZ or REG_DWORD, normalizing it 

88 proxyEnable = int(winreg.QueryValueEx(internetSettings, "ProxyEnable")[0]) 

89 # ProxyOverride is almost always a string 

90 proxyOverride = winreg.QueryValueEx(internetSettings, "ProxyOverride")[0] 

91 except (OSError, ValueError): 

92 return False 

93 if not proxyEnable or not proxyOverride: 

94 return False 

95 

96 # make a check value list from the registry entry: replace the 

97 # '<local>' string by the localhost entry and the corresponding 

98 # canonical entry. 

99 proxyOverride = proxyOverride.split(";") 

100 # filter out empty strings to avoid re.match return true in the following code. 

101 proxyOverride = filter(None, proxyOverride) 

102 # now check if we match one of the registry values. 

103 for test in proxyOverride: 

104 if test == "<local>": 

105 if "." not in host: 

106 return True 

107 test = test.replace(".", r"\.") # mask dots 

108 test = test.replace("*", r".*") # change glob sequence 

109 test = test.replace("?", r".") # change glob char 

110 if re.match(test, host, re.I): 

111 return True 

112 return False 

113 

114 def proxy_bypass(host): # noqa 

115 """Return True, if the host should be bypassed. 

116 

117 Checks proxy settings gathered from the environment, if specified, 

118 or the registry. 

119 """ 

120 if getproxies_environment(): 

121 return proxy_bypass_environment(host) 

122 else: 

123 return proxy_bypass_registry(host) 

124 

125 

126def dict_to_sequence(d): 

127 """Returns an internal sequence dictionary update.""" 

128 

129 if hasattr(d, "items"): 

130 d = d.items() 

131 

132 return d 

133 

134 

135def super_len(o): 

136 total_length = None 

137 current_position = 0 

138 

139 if isinstance(o, str): 

140 o = o.encode("utf-8") 

141 

142 if hasattr(o, "__len__"): 

143 total_length = len(o) 

144 

145 elif hasattr(o, "len"): 

146 total_length = o.len 

147 

148 elif hasattr(o, "fileno"): 

149 try: 

150 fileno = o.fileno() 

151 except (io.UnsupportedOperation, AttributeError): 

152 # AttributeError is a surprising exception, seeing as how we've just checked 

153 # that `hasattr(o, 'fileno')`. It happens for objects obtained via 

154 # `Tarfile.extractfile()`, per issue 5229. 

155 pass 

156 else: 

157 total_length = os.fstat(fileno).st_size 

158 

159 # Having used fstat to determine the file length, we need to 

160 # confirm that this file was opened up in binary mode. 

161 if "b" not in o.mode: 

162 warnings.warn( 

163 ( 

164 "Requests has determined the content-length for this " 

165 "request using the binary size of the file: however, the " 

166 "file has been opened in text mode (i.e. without the 'b' " 

167 "flag in the mode). This may lead to an incorrect " 

168 "content-length. In Requests 3.0, support will be removed " 

169 "for files in text mode." 

170 ), 

171 FileModeWarning, 

172 ) 

173 

174 if hasattr(o, "tell"): 

175 try: 

176 current_position = o.tell() 

177 except OSError: 

178 # This can happen in some weird situations, such as when the file 

179 # is actually a special file descriptor like stdin. In this 

180 # instance, we don't know what the length is, so set it to zero and 

181 # let requests chunk it instead. 

182 if total_length is not None: 

183 current_position = total_length 

184 else: 

185 if hasattr(o, "seek") and total_length is None: 

186 # StringIO and BytesIO have seek but no usable fileno 

187 try: 

188 # seek to end of file 

189 o.seek(0, 2) 

190 total_length = o.tell() 

191 

192 # seek back to current position to support 

193 # partially read file-like objects 

194 o.seek(current_position or 0) 

195 except OSError: 

196 total_length = 0 

197 

198 if total_length is None: 

199 total_length = 0 

200 

201 return max(0, total_length - current_position) 

202 

203 

204def get_netrc_auth(url, raise_errors=False): 

205 """Returns the Requests tuple auth for a given url from netrc.""" 

206 

207 netrc_file = os.environ.get("NETRC") 

208 if netrc_file is not None: 

209 netrc_locations = (netrc_file,) 

210 else: 

211 netrc_locations = (f"~/{f}" for f in NETRC_FILES) 

212 

213 try: 

214 from netrc import NetrcParseError, netrc 

215 

216 netrc_path = None 

217 

218 for f in netrc_locations: 

219 try: 

220 loc = os.path.expanduser(f) 

221 except KeyError: 

222 # os.path.expanduser can fail when $HOME is undefined and 

223 # getpwuid fails. See https://bugs.python.org/issue20164 & 

224 # https://github.com/psf/requests/issues/1846 

225 return 

226 

227 if os.path.exists(loc): 

228 netrc_path = loc 

229 break 

230 

231 # Abort early if there isn't one. 

232 if netrc_path is None: 

233 return 

234 

235 ri = urlparse(url) 

236 

237 # Strip port numbers from netloc. This weird `if...encode`` dance is 

238 # used for Python 3.2, which doesn't support unicode literals. 

239 splitstr = b":" 

240 if isinstance(url, str): 

241 splitstr = splitstr.decode("ascii") 

242 host = ri.netloc.split(splitstr)[0] 

243 

244 try: 

245 _netrc = netrc(netrc_path).authenticators(host) 

246 if _netrc: 

247 # Return with login / password 

248 login_i = 0 if _netrc[0] else 1 

249 return (_netrc[login_i], _netrc[2]) 

250 except (NetrcParseError, OSError): 

251 # If there was a parsing error or a permissions issue reading the file, 

252 # we'll just skip netrc auth unless explicitly asked to raise errors. 

253 if raise_errors: 

254 raise 

255 

256 # App Engine hackiness. 

257 except (ImportError, AttributeError): 

258 pass 

259 

260 

261def guess_filename(obj): 

262 """Tries to guess the filename of the given object.""" 

263 name = getattr(obj, "name", None) 

264 if name and isinstance(name, basestring) and name[0] != "<" and name[-1] != ">": 

265 return os.path.basename(name) 

266 

267 

268def extract_zipped_paths(path): 

269 """Replace nonexistent paths that look like they refer to a member of a zip 

270 archive with the location of an extracted copy of the target, or else 

271 just return the provided path unchanged. 

272 """ 

273 if os.path.exists(path): 

274 # this is already a valid path, no need to do anything further 

275 return path 

276 

277 # find the first valid part of the provided path and treat that as a zip archive 

278 # assume the rest of the path is the name of a member in the archive 

279 archive, member = os.path.split(path) 

280 while archive and not os.path.exists(archive): 

281 archive, prefix = os.path.split(archive) 

282 if not prefix: 

283 # If we don't check for an empty prefix after the split (in other words, archive remains unchanged after the split), 

284 # we _can_ end up in an infinite loop on a rare corner case affecting a small number of users 

285 break 

286 member = "/".join([prefix, member]) 

287 

288 if not zipfile.is_zipfile(archive): 

289 return path 

290 

291 zip_file = zipfile.ZipFile(archive) 

292 if member not in zip_file.namelist(): 

293 return path 

294 

295 # we have a valid zip archive and a valid member of that archive 

296 tmp = tempfile.gettempdir() 

297 extracted_path = os.path.join(tmp, member.split("/")[-1]) 

298 if not os.path.exists(extracted_path): 

299 # use read + write to avoid the creating nested folders, we only want the file, avoids mkdir racing condition 

300 with atomic_open(extracted_path) as file_handler: 

301 file_handler.write(zip_file.read(member)) 

302 return extracted_path 

303 

304 

305@contextlib.contextmanager 

306def atomic_open(filename): 

307 """Write a file to the disk in an atomic fashion""" 

308 tmp_descriptor, tmp_name = tempfile.mkstemp(dir=os.path.dirname(filename)) 

309 try: 

310 with os.fdopen(tmp_descriptor, "wb") as tmp_handler: 

311 yield tmp_handler 

312 os.replace(tmp_name, filename) 

313 except BaseException: 

314 os.remove(tmp_name) 

315 raise 

316 

317 

318def from_key_val_list(value): 

319 """Take an object and test to see if it can be represented as a 

320 dictionary. Unless it can not be represented as such, return an 

321 OrderedDict, e.g., 

322 

323 :: 

324 

325 >>> from_key_val_list([('key', 'val')]) 

326 OrderedDict([('key', 'val')]) 

327 >>> from_key_val_list('string') 

328 Traceback (most recent call last): 

329 ... 

330 ValueError: cannot encode objects that are not 2-tuples 

331 >>> from_key_val_list({'key': 'val'}) 

332 OrderedDict([('key', 'val')]) 

333 

334 :rtype: OrderedDict 

335 """ 

336 if value is None: 

337 return None 

338 

339 if isinstance(value, (str, bytes, bool, int)): 

340 raise ValueError("cannot encode objects that are not 2-tuples") 

341 

342 return OrderedDict(value) 

343 

344 

345def to_key_val_list(value): 

346 """Take an object and test to see if it can be represented as a 

347 dictionary. If it can be, return a list of tuples, e.g., 

348 

349 :: 

350 

351 >>> to_key_val_list([('key', 'val')]) 

352 [('key', 'val')] 

353 >>> to_key_val_list({'key': 'val'}) 

354 [('key', 'val')] 

355 >>> to_key_val_list('string') 

356 Traceback (most recent call last): 

357 ... 

358 ValueError: cannot encode objects that are not 2-tuples 

359 

360 :rtype: list 

361 """ 

362 if value is None: 

363 return None 

364 

365 if isinstance(value, (str, bytes, bool, int)): 

366 raise ValueError("cannot encode objects that are not 2-tuples") 

367 

368 if isinstance(value, Mapping): 

369 value = value.items() 

370 

371 return list(value) 

372 

373 

374# From mitsuhiko/werkzeug (used with permission). 

375def parse_list_header(value): 

376 """Parse lists as described by RFC 2068 Section 2. 

377 

378 In particular, parse comma-separated lists where the elements of 

379 the list may include quoted-strings. A quoted-string could 

380 contain a comma. A non-quoted string could have quotes in the 

381 middle. Quotes are removed automatically after parsing. 

382 

383 It basically works like :func:`parse_set_header` just that items 

384 may appear multiple times and case sensitivity is preserved. 

385 

386 The return value is a standard :class:`list`: 

387 

388 >>> parse_list_header('token, "quoted value"') 

389 ['token', 'quoted value'] 

390 

391 To create a header from the :class:`list` again, use the 

392 :func:`dump_header` function. 

393 

394 :param value: a string with a list header. 

395 :return: :class:`list` 

396 :rtype: list 

397 """ 

398 result = [] 

399 for item in _parse_list_header(value): 

400 if item[:1] == item[-1:] == '"': 

401 item = unquote_header_value(item[1:-1]) 

402 result.append(item) 

403 return result 

404 

405 

406# From mitsuhiko/werkzeug (used with permission). 

407def parse_dict_header(value): 

408 """Parse lists of key, value pairs as described by RFC 2068 Section 2 and 

409 convert them into a python dict: 

410 

411 >>> d = parse_dict_header('foo="is a fish", bar="as well"') 

412 >>> type(d) is dict 

413 True 

414 >>> sorted(d.items()) 

415 [('bar', 'as well'), ('foo', 'is a fish')] 

416 

417 If there is no value for a key it will be `None`: 

418 

419 >>> parse_dict_header('key_without_value') 

420 {'key_without_value': None} 

421 

422 To create a header from the :class:`dict` again, use the 

423 :func:`dump_header` function. 

424 

425 :param value: a string with a dict header. 

426 :return: :class:`dict` 

427 :rtype: dict 

428 """ 

429 result = {} 

430 for item in _parse_list_header(value): 

431 if "=" not in item: 

432 result[item] = None 

433 continue 

434 name, value = item.split("=", 1) 

435 if value[:1] == value[-1:] == '"': 

436 value = unquote_header_value(value[1:-1]) 

437 result[name] = value 

438 return result 

439 

440 

441# From mitsuhiko/werkzeug (used with permission). 

442def unquote_header_value(value, is_filename=False): 

443 r"""Unquotes a header value. (Reversal of :func:`quote_header_value`). 

444 This does not use the real unquoting but what browsers are actually 

445 using for quoting. 

446 

447 :param value: the header value to unquote. 

448 :rtype: str 

449 """ 

450 if value and value[0] == value[-1] == '"': 

451 # this is not the real unquoting, but fixing this so that the 

452 # RFC is met will result in bugs with internet explorer and 

453 # probably some other browsers as well. IE for example is 

454 # uploading files with "C:\foo\bar.txt" as filename 

455 value = value[1:-1] 

456 

457 # if this is a filename and the starting characters look like 

458 # a UNC path, then just return the value without quotes. Using the 

459 # replace sequence below on a UNC path has the effect of turning 

460 # the leading double slash into a single slash and then 

461 # _fix_ie_filename() doesn't work correctly. See #458. 

462 if not is_filename or value[:2] != "\\\\": 

463 return value.replace("\\\\", "\\").replace('\\"', '"') 

464 return value 

465 

466 

467def dict_from_cookiejar(cj): 

468 """Returns a key/value dictionary from a CookieJar. 

469 

470 :param cj: CookieJar object to extract cookies from. 

471 :rtype: dict 

472 """ 

473 

474 cookie_dict = {cookie.name: cookie.value for cookie in cj} 

475 return cookie_dict 

476 

477 

478def add_dict_to_cookiejar(cj, cookie_dict): 

479 """Returns a CookieJar from a key/value dictionary. 

480 

481 :param cj: CookieJar to insert cookies into. 

482 :param cookie_dict: Dict of key/values to insert into CookieJar. 

483 :rtype: CookieJar 

484 """ 

485 

486 return cookiejar_from_dict(cookie_dict, cj) 

487 

488 

489def get_encodings_from_content(content): 

490 """Returns encodings from given content string. 

491 

492 :param content: bytestring to extract encodings from. 

493 """ 

494 warnings.warn( 

495 ( 

496 "In requests 3.0, get_encodings_from_content will be removed. For " 

497 "more information, please see the discussion on issue #2266. (This" 

498 " warning should only appear once.)" 

499 ), 

500 DeprecationWarning, 

501 ) 

502 

503 charset_re = re.compile(r'<meta.*?charset=["\']*(.+?)["\'>]', flags=re.I) 

504 pragma_re = re.compile(r'<meta.*?content=["\']*;?charset=(.+?)["\'>]', flags=re.I) 

505 xml_re = re.compile(r'^<\?xml.*?encoding=["\']*(.+?)["\'>]') 

506 

507 return ( 

508 charset_re.findall(content) 

509 + pragma_re.findall(content) 

510 + xml_re.findall(content) 

511 ) 

512 

513 

514def _parse_content_type_header(header): 

515 """Returns content type and parameters from given header 

516 

517 :param header: string 

518 :return: tuple containing content type and dictionary of 

519 parameters 

520 """ 

521 

522 tokens = header.split(";") 

523 content_type, params = tokens[0].strip(), tokens[1:] 

524 params_dict = {} 

525 items_to_strip = "\"' " 

526 

527 for param in params: 

528 param = param.strip() 

529 if param: 

530 key, value = param, True 

531 index_of_equals = param.find("=") 

532 if index_of_equals != -1: 

533 key = param[:index_of_equals].strip(items_to_strip) 

534 value = param[index_of_equals + 1 :].strip(items_to_strip) 

535 params_dict[key.lower()] = value 

536 return content_type, params_dict 

537 

538 

539def get_encoding_from_headers(headers): 

540 """Returns encodings from given HTTP Header Dict. 

541 

542 :param headers: dictionary to extract encoding from. 

543 :rtype: str 

544 """ 

545 

546 content_type = headers.get("content-type") 

547 

548 if not content_type: 

549 return None 

550 

551 content_type, params = _parse_content_type_header(content_type) 

552 

553 if "charset" in params: 

554 return params["charset"].strip("'\"") 

555 

556 if "text" in content_type: 

557 return "ISO-8859-1" 

558 

559 if "application/json" in content_type: 

560 # Assume UTF-8 based on RFC 4627: https://www.ietf.org/rfc/rfc4627.txt since the charset was unset 

561 return "utf-8" 

562 

563 

564def stream_decode_response_unicode(iterator, r): 

565 """Stream decodes an iterator.""" 

566 

567 if r.encoding is None: 

568 yield from iterator 

569 return 

570 

571 decoder = codecs.getincrementaldecoder(r.encoding)(errors="replace") 

572 for chunk in iterator: 

573 rv = decoder.decode(chunk) 

574 if rv: 

575 yield rv 

576 rv = decoder.decode(b"", final=True) 

577 if rv: 

578 yield rv 

579 

580 

581def iter_slices(string, slice_length): 

582 """Iterate over slices of a string.""" 

583 pos = 0 

584 if slice_length is None or slice_length <= 0: 

585 slice_length = len(string) 

586 while pos < len(string): 

587 yield string[pos : pos + slice_length] 

588 pos += slice_length 

589 

590 

591def get_unicode_from_response(r): 

592 """Returns the requested content back in unicode. 

593 

594 :param r: Response object to get unicode content from. 

595 

596 Tried: 

597 

598 1. charset from content-type 

599 2. fall back and replace all unicode characters 

600 

601 :rtype: str 

602 """ 

603 warnings.warn( 

604 ( 

605 "In requests 3.0, get_unicode_from_response will be removed. For " 

606 "more information, please see the discussion on issue #2266. (This" 

607 " warning should only appear once.)" 

608 ), 

609 DeprecationWarning, 

610 ) 

611 

612 tried_encodings = [] 

613 

614 # Try charset from content-type 

615 encoding = get_encoding_from_headers(r.headers) 

616 

617 if encoding: 

618 try: 

619 return str(r.content, encoding) 

620 except UnicodeError: 

621 tried_encodings.append(encoding) 

622 

623 # Fall back: 

624 try: 

625 return str(r.content, encoding, errors="replace") 

626 except TypeError: 

627 return r.content 

628 

629 

630# The unreserved URI characters (RFC 3986) 

631UNRESERVED_SET = frozenset( 

632 "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" + "0123456789-._~" 

633) 

634 

635 

636def unquote_unreserved(uri): 

637 """Un-escape any percent-escape sequences in a URI that are unreserved 

638 characters. This leaves all reserved, illegal and non-ASCII bytes encoded. 

639 

640 :rtype: str 

641 """ 

642 parts = uri.split("%") 

643 for i in range(1, len(parts)): 

644 h = parts[i][0:2] 

645 if len(h) == 2 and h.isalnum(): 

646 try: 

647 c = chr(int(h, 16)) 

648 except ValueError: 

649 raise InvalidURL(f"Invalid percent-escape sequence: '{h}'") 

650 

651 if c in UNRESERVED_SET: 

652 parts[i] = c + parts[i][2:] 

653 else: 

654 parts[i] = f"%{parts[i]}" 

655 else: 

656 parts[i] = f"%{parts[i]}" 

657 return "".join(parts) 

658 

659 

660def requote_uri(uri): 

661 """Re-quote the given URI. 

662 

663 This function passes the given URI through an unquote/quote cycle to 

664 ensure that it is fully and consistently quoted. 

665 

666 :rtype: str 

667 """ 

668 safe_with_percent = "!#$%&'()*+,/:;=?@[]~" 

669 safe_without_percent = "!#$&'()*+,/:;=?@[]~" 

670 try: 

671 # Unquote only the unreserved characters 

672 # Then quote only illegal characters (do not quote reserved, 

673 # unreserved, or '%') 

674 return quote(unquote_unreserved(uri), safe=safe_with_percent) 

675 except InvalidURL: 

676 # We couldn't unquote the given URI, so let's try quoting it, but 

677 # there may be unquoted '%'s in the URI. We need to make sure they're 

678 # properly quoted so they do not cause issues elsewhere. 

679 return quote(uri, safe=safe_without_percent) 

680 

681 

682def address_in_network(ip, net): 

683 """This function allows you to check if an IP belongs to a network subnet 

684 

685 Example: returns True if ip = 192.168.1.1 and net = 192.168.1.0/24 

686 returns False if ip = 192.168.1.1 and net = 192.168.100.0/24 

687 

688 :rtype: bool 

689 """ 

690 ipaddr = struct.unpack("=L", socket.inet_aton(ip))[0] 

691 netaddr, bits = net.split("/") 

692 netmask = struct.unpack("=L", socket.inet_aton(dotted_netmask(int(bits))))[0] 

693 network = struct.unpack("=L", socket.inet_aton(netaddr))[0] & netmask 

694 return (ipaddr & netmask) == (network & netmask) 

695 

696 

697def dotted_netmask(mask): 

698 """Converts mask from /xx format to xxx.xxx.xxx.xxx 

699 

700 Example: if mask is 24 function returns 255.255.255.0 

701 

702 :rtype: str 

703 """ 

704 bits = 0xFFFFFFFF ^ (1 << 32 - mask) - 1 

705 return socket.inet_ntoa(struct.pack(">I", bits)) 

706 

707 

708def is_ipv4_address(string_ip): 

709 """ 

710 :rtype: bool 

711 """ 

712 try: 

713 socket.inet_aton(string_ip) 

714 except OSError: 

715 return False 

716 return True 

717 

718 

719def is_valid_cidr(string_network): 

720 """ 

721 Very simple check of the cidr format in no_proxy variable. 

722 

723 :rtype: bool 

724 """ 

725 if string_network.count("/") == 1: 

726 try: 

727 mask = int(string_network.split("/")[1]) 

728 except ValueError: 

729 return False 

730 

731 if mask < 1 or mask > 32: 

732 return False 

733 

734 try: 

735 socket.inet_aton(string_network.split("/")[0]) 

736 except OSError: 

737 return False 

738 else: 

739 return False 

740 return True 

741 

742 

743@contextlib.contextmanager 

744def set_environ(env_name, value): 

745 """Set the environment variable 'env_name' to 'value' 

746 

747 Save previous value, yield, and then restore the previous value stored in 

748 the environment variable 'env_name'. 

749 

750 If 'value' is None, do nothing""" 

751 value_changed = value is not None 

752 if value_changed: 

753 old_value = os.environ.get(env_name) 

754 os.environ[env_name] = value 

755 try: 

756 yield 

757 finally: 

758 if value_changed: 

759 if old_value is None: 

760 del os.environ[env_name] 

761 else: 

762 os.environ[env_name] = old_value 

763 

764 

765def should_bypass_proxies(url, no_proxy): 

766 """ 

767 Returns whether we should bypass proxies or not. 

768 

769 :rtype: bool 

770 """ 

771 

772 # Prioritize lowercase environment variables over uppercase 

773 # to keep a consistent behaviour with other http projects (curl, wget). 

774 def get_proxy(key): 

775 return os.environ.get(key) or os.environ.get(key.upper()) 

776 

777 # First check whether no_proxy is defined. If it is, check that the URL 

778 # we're getting isn't in the no_proxy list. 

779 no_proxy_arg = no_proxy 

780 if no_proxy is None: 

781 no_proxy = get_proxy("no_proxy") 

782 parsed = urlparse(url) 

783 

784 if parsed.hostname is None: 

785 # URLs don't always have hostnames, e.g. file:/// urls. 

786 return True 

787 

788 if no_proxy: 

789 # We need to check whether we match here. We need to see if we match 

790 # the end of the hostname, both with and without the port. 

791 no_proxy = (host for host in no_proxy.replace(" ", "").split(",") if host) 

792 

793 if is_ipv4_address(parsed.hostname): 

794 for proxy_ip in no_proxy: 

795 if is_valid_cidr(proxy_ip): 

796 if address_in_network(parsed.hostname, proxy_ip): 

797 return True 

798 elif parsed.hostname == proxy_ip: 

799 # If no_proxy ip was defined in plain IP notation instead of cidr notation & 

800 # matches the IP of the index 

801 return True 

802 else: 

803 host_with_port = parsed.hostname 

804 if parsed.port: 

805 host_with_port += f":{parsed.port}" 

806 

807 for host in no_proxy: 

808 if parsed.hostname.endswith(host) or host_with_port.endswith(host): 

809 # The URL does match something in no_proxy, so we don't want 

810 # to apply the proxies on this URL. 

811 return True 

812 

813 with set_environ("no_proxy", no_proxy_arg): 

814 # parsed.hostname can be `None` in cases such as a file URI. 

815 try: 

816 bypass = proxy_bypass(parsed.hostname) 

817 except (TypeError, socket.gaierror): 

818 bypass = False 

819 

820 if bypass: 

821 return True 

822 

823 return False 

824 

825 

826def get_environ_proxies(url, no_proxy=None): 

827 """ 

828 Return a dict of environment proxies. 

829 

830 :rtype: dict 

831 """ 

832 if should_bypass_proxies(url, no_proxy=no_proxy): 

833 return {} 

834 else: 

835 return getproxies() 

836 

837 

838def select_proxy(url, proxies): 

839 """Select a proxy for the url, if applicable. 

840 

841 :param url: The url being for the request 

842 :param proxies: A dictionary of schemes or schemes and hosts to proxy URLs 

843 """ 

844 proxies = proxies or {} 

845 urlparts = urlparse(url) 

846 if urlparts.hostname is None: 

847 return proxies.get(urlparts.scheme, proxies.get("all")) 

848 

849 proxy_keys = [ 

850 urlparts.scheme + "://" + urlparts.hostname, 

851 urlparts.scheme, 

852 "all://" + urlparts.hostname, 

853 "all", 

854 ] 

855 proxy = None 

856 for proxy_key in proxy_keys: 

857 if proxy_key in proxies: 

858 proxy = proxies[proxy_key] 

859 break 

860 

861 return proxy 

862 

863 

864def resolve_proxies(request, proxies, trust_env=True): 

865 """This method takes proxy information from a request and configuration 

866 input to resolve a mapping of target proxies. This will consider settings 

867 such as NO_PROXY to strip proxy configurations. 

868 

869 :param request: Request or PreparedRequest 

870 :param proxies: A dictionary of schemes or schemes and hosts to proxy URLs 

871 :param trust_env: Boolean declaring whether to trust environment configs 

872 

873 :rtype: dict 

874 """ 

875 proxies = proxies if proxies is not None else {} 

876 url = request.url 

877 scheme = urlparse(url).scheme 

878 no_proxy = proxies.get("no_proxy") 

879 new_proxies = proxies.copy() 

880 

881 if trust_env and not should_bypass_proxies(url, no_proxy=no_proxy): 

882 environ_proxies = get_environ_proxies(url, no_proxy=no_proxy) 

883 

884 proxy = environ_proxies.get(scheme, environ_proxies.get("all")) 

885 

886 if proxy: 

887 new_proxies.setdefault(scheme, proxy) 

888 return new_proxies 

889 

890 

891def default_user_agent(name="python-requests"): 

892 """ 

893 Return a string representing the default user agent. 

894 

895 :rtype: str 

896 """ 

897 return f"{name}/{__version__}" 

898 

899 

900def default_headers(): 

901 """ 

902 :rtype: requests.structures.CaseInsensitiveDict 

903 """ 

904 return CaseInsensitiveDict( 

905 { 

906 "User-Agent": default_user_agent(), 

907 "Accept-Encoding": DEFAULT_ACCEPT_ENCODING, 

908 "Accept": "*/*", 

909 "Connection": "keep-alive", 

910 } 

911 ) 

912 

913 

914def parse_header_links(value): 

915 """Return a list of parsed link headers proxies. 

916 

917 i.e. Link: <http:/.../front.jpeg>; rel=front; type="image/jpeg",<http://.../back.jpeg>; rel=back;type="image/jpeg" 

918 

919 :rtype: list 

920 """ 

921 

922 links = [] 

923 

924 replace_chars = " '\"" 

925 

926 value = value.strip(replace_chars) 

927 if not value: 

928 return links 

929 

930 for val in re.split(", *<", value): 

931 try: 

932 url, params = val.split(";", 1) 

933 except ValueError: 

934 url, params = val, "" 

935 

936 link = {"url": url.strip("<> '\"")} 

937 

938 for param in params.split(";"): 

939 try: 

940 key, value = param.split("=") 

941 except ValueError: 

942 break 

943 

944 link[key.strip(replace_chars)] = value.strip(replace_chars) 

945 

946 links.append(link) 

947 

948 return links 

949 

950 

951# Null bytes; no need to recreate these on each call to guess_json_utf 

952_null = "\x00".encode("ascii") # encoding to ASCII for Python 3 

953_null2 = _null * 2 

954_null3 = _null * 3 

955 

956 

957def guess_json_utf(data): 

958 """ 

959 :rtype: str 

960 """ 

961 # JSON always starts with two ASCII characters, so detection is as 

962 # easy as counting the nulls and from their location and count 

963 # determine the encoding. Also detect a BOM, if present. 

964 sample = data[:4] 

965 if sample in (codecs.BOM_UTF32_LE, codecs.BOM_UTF32_BE): 

966 return "utf-32" # BOM included 

967 if sample[:3] == codecs.BOM_UTF8: 

968 return "utf-8-sig" # BOM included, MS style (discouraged) 

969 if sample[:2] in (codecs.BOM_UTF16_LE, codecs.BOM_UTF16_BE): 

970 return "utf-16" # BOM included 

971 nullcount = sample.count(_null) 

972 if nullcount == 0: 

973 return "utf-8" 

974 if nullcount == 2: 

975 if sample[::2] == _null2: # 1st and 3rd are null 

976 return "utf-16-be" 

977 if sample[1::2] == _null2: # 2nd and 4th are null 

978 return "utf-16-le" 

979 # Did not detect 2 valid UTF-16 ascii-range characters 

980 if nullcount == 3: 

981 if sample[:3] == _null3: 

982 return "utf-32-be" 

983 if sample[1:] == _null3: 

984 return "utf-32-le" 

985 # Did not detect a valid UTF-32 ascii-range character 

986 return None 

987 

988 

989def prepend_scheme_if_needed(url, new_scheme): 

990 """Given a URL that may or may not have a scheme, prepend the given scheme. 

991 Does not replace a present scheme with the one provided as an argument. 

992 

993 :rtype: str 

994 """ 

995 parsed = parse_url(url) 

996 scheme, auth, host, port, path, query, fragment = parsed 

997 

998 # A defect in urlparse determines that there isn't a netloc present in some 

999 # urls. We previously assumed parsing was overly cautious, and swapped the 

1000 # netloc and path. Due to a lack of tests on the original defect, this is 

1001 # maintained with parse_url for backwards compatibility. 

1002 netloc = parsed.netloc 

1003 if not netloc: 

1004 netloc, path = path, netloc 

1005 

1006 if auth: 

1007 # parse_url doesn't provide the netloc with auth 

1008 # so we'll add it ourselves. 

1009 netloc = "@".join([auth, netloc]) 

1010 if scheme is None: 

1011 scheme = new_scheme 

1012 if path is None: 

1013 path = "" 

1014 

1015 return urlunparse((scheme, netloc, path, "", query, fragment)) 

1016 

1017 

1018def get_auth_from_url(url): 

1019 """Given a url with authentication components, extract them into a tuple of 

1020 username,password. 

1021 

1022 :rtype: (str,str) 

1023 """ 

1024 parsed = urlparse(url) 

1025 

1026 try: 

1027 auth = (unquote(parsed.username), unquote(parsed.password)) 

1028 except (AttributeError, TypeError): 

1029 auth = ("", "") 

1030 

1031 return auth 

1032 

1033 

1034def check_header_validity(header): 

1035 """Verifies that header parts don't contain leading whitespace 

1036 reserved characters, or return characters. 

1037 

1038 :param header: tuple, in the format (name, value). 

1039 """ 

1040 name, value = header 

1041 _validate_header_part(header, name, 0) 

1042 _validate_header_part(header, value, 1) 

1043 

1044 

1045def _validate_header_part(header, header_part, header_validator_index): 

1046 if isinstance(header_part, str): 

1047 validator = _HEADER_VALIDATORS_STR[header_validator_index] 

1048 elif isinstance(header_part, bytes): 

1049 validator = _HEADER_VALIDATORS_BYTE[header_validator_index] 

1050 else: 

1051 raise InvalidHeader( 

1052 f"Header part ({header_part!r}) from {header} " 

1053 f"must be of type str or bytes, not {type(header_part)}" 

1054 ) 

1055 

1056 if not validator.match(header_part): 

1057 header_kind = "name" if header_validator_index == 0 else "value" 

1058 raise InvalidHeader( 

1059 f"Invalid leading whitespace, reserved character(s), or return " 

1060 f"character(s) in header {header_kind}: {header_part!r}" 

1061 ) 

1062 

1063 

1064def urldefragauth(url): 

1065 """ 

1066 Given a url remove the fragment and the authentication part. 

1067 

1068 :rtype: str 

1069 """ 

1070 scheme, netloc, path, params, query, fragment = urlparse(url) 

1071 

1072 # see func:`prepend_scheme_if_needed` 

1073 if not netloc: 

1074 netloc, path = path, netloc 

1075 

1076 netloc = netloc.rsplit("@", 1)[-1] 

1077 

1078 return urlunparse((scheme, netloc, path, params, query, "")) 

1079 

1080 

1081def rewind_body(prepared_request): 

1082 """Move file pointer back to its recorded starting position 

1083 so it can be read again on redirect. 

1084 """ 

1085 body_seek = getattr(prepared_request.body, "seek", None) 

1086 if body_seek is not None and isinstance( 

1087 prepared_request._body_position, integer_types 

1088 ): 

1089 try: 

1090 body_seek(prepared_request._body_position) 

1091 except OSError: 

1092 raise UnrewindableBodyError( 

1093 "An error occurred when rewinding request body for redirect." 

1094 ) 

1095 else: 

1096 raise UnrewindableBodyError("Unable to rewind request body for redirect.")