Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/requests/utils.py: 15%

485 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1""" 

2requests.utils 

3~~~~~~~~~~~~~~ 

4 

5This module provides utility functions that are used within Requests 

6that are also useful for external consumption. 

7""" 

8 

9import codecs 

10import contextlib 

11import io 

12import os 

13import re 

14import socket 

15import struct 

16import sys 

17import tempfile 

18import warnings 

19import zipfile 

20from collections import OrderedDict 

21 

22from urllib3.util import make_headers, parse_url 

23 

24from . import certs 

25from .__version__ import __version__ 

26 

27# to_native_string is unused here, but imported here for backwards compatibility 

28from ._internal_utils import ( # noqa: F401 

29 _HEADER_VALIDATORS_BYTE, 

30 _HEADER_VALIDATORS_STR, 

31 HEADER_VALIDATORS, 

32 to_native_string, 

33) 

34from .compat import ( 

35 Mapping, 

36 basestring, 

37 bytes, 

38 getproxies, 

39 getproxies_environment, 

40 integer_types, 

41) 

42from .compat import parse_http_list as _parse_list_header 

43from .compat import ( 

44 proxy_bypass, 

45 proxy_bypass_environment, 

46 quote, 

47 str, 

48 unquote, 

49 urlparse, 

50 urlunparse, 

51) 

52from .cookies import cookiejar_from_dict 

53from .exceptions import ( 

54 FileModeWarning, 

55 InvalidHeader, 

56 InvalidURL, 

57 UnrewindableBodyError, 

58) 

59from .structures import CaseInsensitiveDict 

60 

61NETRC_FILES = (".netrc", "_netrc") 

62 

63DEFAULT_CA_BUNDLE_PATH = certs.where() 

64 

65DEFAULT_PORTS = {"http": 80, "https": 443} 

66 

67# Ensure that ', ' is used to preserve previous delimiter behavior. 

68DEFAULT_ACCEPT_ENCODING = ", ".join( 

69 re.split(r",\s*", make_headers(accept_encoding=True)["accept-encoding"]) 

70) 

71 

72 

73if sys.platform == "win32": 

74 # provide a proxy_bypass version on Windows without DNS lookups 

75 

76 def proxy_bypass_registry(host): 

77 try: 

78 import winreg 

79 except ImportError: 

80 return False 

81 

82 try: 

83 internetSettings = winreg.OpenKey( 

84 winreg.HKEY_CURRENT_USER, 

85 r"Software\Microsoft\Windows\CurrentVersion\Internet Settings", 

86 ) 

87 # ProxyEnable could be REG_SZ or REG_DWORD, normalizing it 

88 proxyEnable = int(winreg.QueryValueEx(internetSettings, "ProxyEnable")[0]) 

89 # ProxyOverride is almost always a string 

90 proxyOverride = winreg.QueryValueEx(internetSettings, "ProxyOverride")[0] 

91 except (OSError, ValueError): 

92 return False 

93 if not proxyEnable or not proxyOverride: 

94 return False 

95 

96 # make a check value list from the registry entry: replace the 

97 # '<local>' string by the localhost entry and the corresponding 

98 # canonical entry. 

99 proxyOverride = proxyOverride.split(";") 

100 # now check if we match one of the registry values. 

101 for test in proxyOverride: 

102 if test == "<local>": 

103 if "." not in host: 

104 return True 

105 test = test.replace(".", r"\.") # mask dots 

106 test = test.replace("*", r".*") # change glob sequence 

107 test = test.replace("?", r".") # change glob char 

108 if re.match(test, host, re.I): 

109 return True 

110 return False 

111 

112 def proxy_bypass(host): # noqa 

113 """Return True, if the host should be bypassed. 

114 

115 Checks proxy settings gathered from the environment, if specified, 

116 or the registry. 

117 """ 

118 if getproxies_environment(): 

119 return proxy_bypass_environment(host) 

120 else: 

121 return proxy_bypass_registry(host) 

122 

123 

124def dict_to_sequence(d): 

125 """Returns an internal sequence dictionary update.""" 

126 

127 if hasattr(d, "items"): 

128 d = d.items() 

129 

130 return d 

131 

132 

133def super_len(o): 

134 total_length = None 

135 current_position = 0 

136 

137 if hasattr(o, "__len__"): 

138 total_length = len(o) 

139 

140 elif hasattr(o, "len"): 

141 total_length = o.len 

142 

143 elif hasattr(o, "fileno"): 

144 try: 

145 fileno = o.fileno() 

146 except (io.UnsupportedOperation, AttributeError): 

147 # AttributeError is a surprising exception, seeing as how we've just checked 

148 # that `hasattr(o, 'fileno')`. It happens for objects obtained via 

149 # `Tarfile.extractfile()`, per issue 5229. 

150 pass 

151 else: 

152 total_length = os.fstat(fileno).st_size 

153 

154 # Having used fstat to determine the file length, we need to 

155 # confirm that this file was opened up in binary mode. 

156 if "b" not in o.mode: 

157 warnings.warn( 

158 ( 

159 "Requests has determined the content-length for this " 

160 "request using the binary size of the file: however, the " 

161 "file has been opened in text mode (i.e. without the 'b' " 

162 "flag in the mode). This may lead to an incorrect " 

163 "content-length. In Requests 3.0, support will be removed " 

164 "for files in text mode." 

165 ), 

166 FileModeWarning, 

167 ) 

168 

169 if hasattr(o, "tell"): 

170 try: 

171 current_position = o.tell() 

172 except OSError: 

173 # This can happen in some weird situations, such as when the file 

174 # is actually a special file descriptor like stdin. In this 

175 # instance, we don't know what the length is, so set it to zero and 

176 # let requests chunk it instead. 

177 if total_length is not None: 

178 current_position = total_length 

179 else: 

180 if hasattr(o, "seek") and total_length is None: 

181 # StringIO and BytesIO have seek but no usable fileno 

182 try: 

183 # seek to end of file 

184 o.seek(0, 2) 

185 total_length = o.tell() 

186 

187 # seek back to current position to support 

188 # partially read file-like objects 

189 o.seek(current_position or 0) 

190 except OSError: 

191 total_length = 0 

192 

193 if total_length is None: 

194 total_length = 0 

195 

196 return max(0, total_length - current_position) 

197 

198 

199def get_netrc_auth(url, raise_errors=False): 

200 """Returns the Requests tuple auth for a given url from netrc.""" 

201 

202 netrc_file = os.environ.get("NETRC") 

203 if netrc_file is not None: 

204 netrc_locations = (netrc_file,) 

205 else: 

206 netrc_locations = (f"~/{f}" for f in NETRC_FILES) 

207 

208 try: 

209 from netrc import NetrcParseError, netrc 

210 

211 netrc_path = None 

212 

213 for f in netrc_locations: 

214 try: 

215 loc = os.path.expanduser(f) 

216 except KeyError: 

217 # os.path.expanduser can fail when $HOME is undefined and 

218 # getpwuid fails. See https://bugs.python.org/issue20164 & 

219 # https://github.com/psf/requests/issues/1846 

220 return 

221 

222 if os.path.exists(loc): 

223 netrc_path = loc 

224 break 

225 

226 # Abort early if there isn't one. 

227 if netrc_path is None: 

228 return 

229 

230 ri = urlparse(url) 

231 

232 # Strip port numbers from netloc. This weird `if...encode`` dance is 

233 # used for Python 3.2, which doesn't support unicode literals. 

234 splitstr = b":" 

235 if isinstance(url, str): 

236 splitstr = splitstr.decode("ascii") 

237 host = ri.netloc.split(splitstr)[0] 

238 

239 try: 

240 _netrc = netrc(netrc_path).authenticators(host) 

241 if _netrc: 

242 # Return with login / password 

243 login_i = 0 if _netrc[0] else 1 

244 return (_netrc[login_i], _netrc[2]) 

245 except (NetrcParseError, OSError): 

246 # If there was a parsing error or a permissions issue reading the file, 

247 # we'll just skip netrc auth unless explicitly asked to raise errors. 

248 if raise_errors: 

249 raise 

250 

251 # App Engine hackiness. 

252 except (ImportError, AttributeError): 

253 pass 

254 

255 

256def guess_filename(obj): 

257 """Tries to guess the filename of the given object.""" 

258 name = getattr(obj, "name", None) 

259 if name and isinstance(name, basestring) and name[0] != "<" and name[-1] != ">": 

260 return os.path.basename(name) 

261 

262 

263def extract_zipped_paths(path): 

264 """Replace nonexistent paths that look like they refer to a member of a zip 

265 archive with the location of an extracted copy of the target, or else 

266 just return the provided path unchanged. 

267 """ 

268 if os.path.exists(path): 

269 # this is already a valid path, no need to do anything further 

270 return path 

271 

272 # find the first valid part of the provided path and treat that as a zip archive 

273 # assume the rest of the path is the name of a member in the archive 

274 archive, member = os.path.split(path) 

275 while archive and not os.path.exists(archive): 

276 archive, prefix = os.path.split(archive) 

277 if not prefix: 

278 # If we don't check for an empty prefix after the split (in other words, archive remains unchanged after the split), 

279 # we _can_ end up in an infinite loop on a rare corner case affecting a small number of users 

280 break 

281 member = "/".join([prefix, member]) 

282 

283 if not zipfile.is_zipfile(archive): 

284 return path 

285 

286 zip_file = zipfile.ZipFile(archive) 

287 if member not in zip_file.namelist(): 

288 return path 

289 

290 # we have a valid zip archive and a valid member of that archive 

291 tmp = tempfile.gettempdir() 

292 extracted_path = os.path.join(tmp, member.split("/")[-1]) 

293 if not os.path.exists(extracted_path): 

294 # use read + write to avoid the creating nested folders, we only want the file, avoids mkdir racing condition 

295 with atomic_open(extracted_path) as file_handler: 

296 file_handler.write(zip_file.read(member)) 

297 return extracted_path 

298 

299 

300@contextlib.contextmanager 

301def atomic_open(filename): 

302 """Write a file to the disk in an atomic fashion""" 

303 tmp_descriptor, tmp_name = tempfile.mkstemp(dir=os.path.dirname(filename)) 

304 try: 

305 with os.fdopen(tmp_descriptor, "wb") as tmp_handler: 

306 yield tmp_handler 

307 os.replace(tmp_name, filename) 

308 except BaseException: 

309 os.remove(tmp_name) 

310 raise 

311 

312 

313def from_key_val_list(value): 

314 """Take an object and test to see if it can be represented as a 

315 dictionary. Unless it can not be represented as such, return an 

316 OrderedDict, e.g., 

317 

318 :: 

319 

320 >>> from_key_val_list([('key', 'val')]) 

321 OrderedDict([('key', 'val')]) 

322 >>> from_key_val_list('string') 

323 Traceback (most recent call last): 

324 ... 

325 ValueError: cannot encode objects that are not 2-tuples 

326 >>> from_key_val_list({'key': 'val'}) 

327 OrderedDict([('key', 'val')]) 

328 

329 :rtype: OrderedDict 

330 """ 

331 if value is None: 

332 return None 

333 

334 if isinstance(value, (str, bytes, bool, int)): 

335 raise ValueError("cannot encode objects that are not 2-tuples") 

336 

337 return OrderedDict(value) 

338 

339 

340def to_key_val_list(value): 

341 """Take an object and test to see if it can be represented as a 

342 dictionary. If it can be, return a list of tuples, e.g., 

343 

344 :: 

345 

346 >>> to_key_val_list([('key', 'val')]) 

347 [('key', 'val')] 

348 >>> to_key_val_list({'key': 'val'}) 

349 [('key', 'val')] 

350 >>> to_key_val_list('string') 

351 Traceback (most recent call last): 

352 ... 

353 ValueError: cannot encode objects that are not 2-tuples 

354 

355 :rtype: list 

356 """ 

357 if value is None: 

358 return None 

359 

360 if isinstance(value, (str, bytes, bool, int)): 

361 raise ValueError("cannot encode objects that are not 2-tuples") 

362 

363 if isinstance(value, Mapping): 

364 value = value.items() 

365 

366 return list(value) 

367 

368 

369# From mitsuhiko/werkzeug (used with permission). 

370def parse_list_header(value): 

371 """Parse lists as described by RFC 2068 Section 2. 

372 

373 In particular, parse comma-separated lists where the elements of 

374 the list may include quoted-strings. A quoted-string could 

375 contain a comma. A non-quoted string could have quotes in the 

376 middle. Quotes are removed automatically after parsing. 

377 

378 It basically works like :func:`parse_set_header` just that items 

379 may appear multiple times and case sensitivity is preserved. 

380 

381 The return value is a standard :class:`list`: 

382 

383 >>> parse_list_header('token, "quoted value"') 

384 ['token', 'quoted value'] 

385 

386 To create a header from the :class:`list` again, use the 

387 :func:`dump_header` function. 

388 

389 :param value: a string with a list header. 

390 :return: :class:`list` 

391 :rtype: list 

392 """ 

393 result = [] 

394 for item in _parse_list_header(value): 

395 if item[:1] == item[-1:] == '"': 

396 item = unquote_header_value(item[1:-1]) 

397 result.append(item) 

398 return result 

399 

400 

401# From mitsuhiko/werkzeug (used with permission). 

402def parse_dict_header(value): 

403 """Parse lists of key, value pairs as described by RFC 2068 Section 2 and 

404 convert them into a python dict: 

405 

406 >>> d = parse_dict_header('foo="is a fish", bar="as well"') 

407 >>> type(d) is dict 

408 True 

409 >>> sorted(d.items()) 

410 [('bar', 'as well'), ('foo', 'is a fish')] 

411 

412 If there is no value for a key it will be `None`: 

413 

414 >>> parse_dict_header('key_without_value') 

415 {'key_without_value': None} 

416 

417 To create a header from the :class:`dict` again, use the 

418 :func:`dump_header` function. 

419 

420 :param value: a string with a dict header. 

421 :return: :class:`dict` 

422 :rtype: dict 

423 """ 

424 result = {} 

425 for item in _parse_list_header(value): 

426 if "=" not in item: 

427 result[item] = None 

428 continue 

429 name, value = item.split("=", 1) 

430 if value[:1] == value[-1:] == '"': 

431 value = unquote_header_value(value[1:-1]) 

432 result[name] = value 

433 return result 

434 

435 

436# From mitsuhiko/werkzeug (used with permission). 

437def unquote_header_value(value, is_filename=False): 

438 r"""Unquotes a header value. (Reversal of :func:`quote_header_value`). 

439 This does not use the real unquoting but what browsers are actually 

440 using for quoting. 

441 

442 :param value: the header value to unquote. 

443 :rtype: str 

444 """ 

445 if value and value[0] == value[-1] == '"': 

446 # this is not the real unquoting, but fixing this so that the 

447 # RFC is met will result in bugs with internet explorer and 

448 # probably some other browsers as well. IE for example is 

449 # uploading files with "C:\foo\bar.txt" as filename 

450 value = value[1:-1] 

451 

452 # if this is a filename and the starting characters look like 

453 # a UNC path, then just return the value without quotes. Using the 

454 # replace sequence below on a UNC path has the effect of turning 

455 # the leading double slash into a single slash and then 

456 # _fix_ie_filename() doesn't work correctly. See #458. 

457 if not is_filename or value[:2] != "\\\\": 

458 return value.replace("\\\\", "\\").replace('\\"', '"') 

459 return value 

460 

461 

462def dict_from_cookiejar(cj): 

463 """Returns a key/value dictionary from a CookieJar. 

464 

465 :param cj: CookieJar object to extract cookies from. 

466 :rtype: dict 

467 """ 

468 

469 cookie_dict = {} 

470 

471 for cookie in cj: 

472 cookie_dict[cookie.name] = cookie.value 

473 

474 return cookie_dict 

475 

476 

477def add_dict_to_cookiejar(cj, cookie_dict): 

478 """Returns a CookieJar from a key/value dictionary. 

479 

480 :param cj: CookieJar to insert cookies into. 

481 :param cookie_dict: Dict of key/values to insert into CookieJar. 

482 :rtype: CookieJar 

483 """ 

484 

485 return cookiejar_from_dict(cookie_dict, cj) 

486 

487 

488def get_encodings_from_content(content): 

489 """Returns encodings from given content string. 

490 

491 :param content: bytestring to extract encodings from. 

492 """ 

493 warnings.warn( 

494 ( 

495 "In requests 3.0, get_encodings_from_content will be removed. For " 

496 "more information, please see the discussion on issue #2266. (This" 

497 " warning should only appear once.)" 

498 ), 

499 DeprecationWarning, 

500 ) 

501 

502 charset_re = re.compile(r'<meta.*?charset=["\']*(.+?)["\'>]', flags=re.I) 

503 pragma_re = re.compile(r'<meta.*?content=["\']*;?charset=(.+?)["\'>]', flags=re.I) 

504 xml_re = re.compile(r'^<\?xml.*?encoding=["\']*(.+?)["\'>]') 

505 

506 return ( 

507 charset_re.findall(content) 

508 + pragma_re.findall(content) 

509 + xml_re.findall(content) 

510 ) 

511 

512 

513def _parse_content_type_header(header): 

514 """Returns content type and parameters from given header 

515 

516 :param header: string 

517 :return: tuple containing content type and dictionary of 

518 parameters 

519 """ 

520 

521 tokens = header.split(";") 

522 content_type, params = tokens[0].strip(), tokens[1:] 

523 params_dict = {} 

524 items_to_strip = "\"' " 

525 

526 for param in params: 

527 param = param.strip() 

528 if param: 

529 key, value = param, True 

530 index_of_equals = param.find("=") 

531 if index_of_equals != -1: 

532 key = param[:index_of_equals].strip(items_to_strip) 

533 value = param[index_of_equals + 1 :].strip(items_to_strip) 

534 params_dict[key.lower()] = value 

535 return content_type, params_dict 

536 

537 

538def get_encoding_from_headers(headers): 

539 """Returns encodings from given HTTP Header Dict. 

540 

541 :param headers: dictionary to extract encoding from. 

542 :rtype: str 

543 """ 

544 

545 content_type = headers.get("content-type") 

546 

547 if not content_type: 

548 return None 

549 

550 content_type, params = _parse_content_type_header(content_type) 

551 

552 if "charset" in params: 

553 return params["charset"].strip("'\"") 

554 

555 if "text" in content_type: 

556 return "ISO-8859-1" 

557 

558 if "application/json" in content_type: 

559 # Assume UTF-8 based on RFC 4627: https://www.ietf.org/rfc/rfc4627.txt since the charset was unset 

560 return "utf-8" 

561 

562 

563def stream_decode_response_unicode(iterator, r): 

564 """Stream decodes an iterator.""" 

565 

566 if r.encoding is None: 

567 yield from iterator 

568 return 

569 

570 decoder = codecs.getincrementaldecoder(r.encoding)(errors="replace") 

571 for chunk in iterator: 

572 rv = decoder.decode(chunk) 

573 if rv: 

574 yield rv 

575 rv = decoder.decode(b"", final=True) 

576 if rv: 

577 yield rv 

578 

579 

580def iter_slices(string, slice_length): 

581 """Iterate over slices of a string.""" 

582 pos = 0 

583 if slice_length is None or slice_length <= 0: 

584 slice_length = len(string) 

585 while pos < len(string): 

586 yield string[pos : pos + slice_length] 

587 pos += slice_length 

588 

589 

590def get_unicode_from_response(r): 

591 """Returns the requested content back in unicode. 

592 

593 :param r: Response object to get unicode content from. 

594 

595 Tried: 

596 

597 1. charset from content-type 

598 2. fall back and replace all unicode characters 

599 

600 :rtype: str 

601 """ 

602 warnings.warn( 

603 ( 

604 "In requests 3.0, get_unicode_from_response will be removed. For " 

605 "more information, please see the discussion on issue #2266. (This" 

606 " warning should only appear once.)" 

607 ), 

608 DeprecationWarning, 

609 ) 

610 

611 tried_encodings = [] 

612 

613 # Try charset from content-type 

614 encoding = get_encoding_from_headers(r.headers) 

615 

616 if encoding: 

617 try: 

618 return str(r.content, encoding) 

619 except UnicodeError: 

620 tried_encodings.append(encoding) 

621 

622 # Fall back: 

623 try: 

624 return str(r.content, encoding, errors="replace") 

625 except TypeError: 

626 return r.content 

627 

628 

629# The unreserved URI characters (RFC 3986) 

630UNRESERVED_SET = frozenset( 

631 "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" + "0123456789-._~" 

632) 

633 

634 

635def unquote_unreserved(uri): 

636 """Un-escape any percent-escape sequences in a URI that are unreserved 

637 characters. This leaves all reserved, illegal and non-ASCII bytes encoded. 

638 

639 :rtype: str 

640 """ 

641 parts = uri.split("%") 

642 for i in range(1, len(parts)): 

643 h = parts[i][0:2] 

644 if len(h) == 2 and h.isalnum(): 

645 try: 

646 c = chr(int(h, 16)) 

647 except ValueError: 

648 raise InvalidURL(f"Invalid percent-escape sequence: '{h}'") 

649 

650 if c in UNRESERVED_SET: 

651 parts[i] = c + parts[i][2:] 

652 else: 

653 parts[i] = f"%{parts[i]}" 

654 else: 

655 parts[i] = f"%{parts[i]}" 

656 return "".join(parts) 

657 

658 

659def requote_uri(uri): 

660 """Re-quote the given URI. 

661 

662 This function passes the given URI through an unquote/quote cycle to 

663 ensure that it is fully and consistently quoted. 

664 

665 :rtype: str 

666 """ 

667 safe_with_percent = "!#$%&'()*+,/:;=?@[]~" 

668 safe_without_percent = "!#$&'()*+,/:;=?@[]~" 

669 try: 

670 # Unquote only the unreserved characters 

671 # Then quote only illegal characters (do not quote reserved, 

672 # unreserved, or '%') 

673 return quote(unquote_unreserved(uri), safe=safe_with_percent) 

674 except InvalidURL: 

675 # We couldn't unquote the given URI, so let's try quoting it, but 

676 # there may be unquoted '%'s in the URI. We need to make sure they're 

677 # properly quoted so they do not cause issues elsewhere. 

678 return quote(uri, safe=safe_without_percent) 

679 

680 

681def address_in_network(ip, net): 

682 """This function allows you to check if an IP belongs to a network subnet 

683 

684 Example: returns True if ip = 192.168.1.1 and net = 192.168.1.0/24 

685 returns False if ip = 192.168.1.1 and net = 192.168.100.0/24 

686 

687 :rtype: bool 

688 """ 

689 ipaddr = struct.unpack("=L", socket.inet_aton(ip))[0] 

690 netaddr, bits = net.split("/") 

691 netmask = struct.unpack("=L", socket.inet_aton(dotted_netmask(int(bits))))[0] 

692 network = struct.unpack("=L", socket.inet_aton(netaddr))[0] & netmask 

693 return (ipaddr & netmask) == (network & netmask) 

694 

695 

696def dotted_netmask(mask): 

697 """Converts mask from /xx format to xxx.xxx.xxx.xxx 

698 

699 Example: if mask is 24 function returns 255.255.255.0 

700 

701 :rtype: str 

702 """ 

703 bits = 0xFFFFFFFF ^ (1 << 32 - mask) - 1 

704 return socket.inet_ntoa(struct.pack(">I", bits)) 

705 

706 

707def is_ipv4_address(string_ip): 

708 """ 

709 :rtype: bool 

710 """ 

711 try: 

712 socket.inet_aton(string_ip) 

713 except OSError: 

714 return False 

715 return True 

716 

717 

718def is_valid_cidr(string_network): 

719 """ 

720 Very simple check of the cidr format in no_proxy variable. 

721 

722 :rtype: bool 

723 """ 

724 if string_network.count("/") == 1: 

725 try: 

726 mask = int(string_network.split("/")[1]) 

727 except ValueError: 

728 return False 

729 

730 if mask < 1 or mask > 32: 

731 return False 

732 

733 try: 

734 socket.inet_aton(string_network.split("/")[0]) 

735 except OSError: 

736 return False 

737 else: 

738 return False 

739 return True 

740 

741 

742@contextlib.contextmanager 

743def set_environ(env_name, value): 

744 """Set the environment variable 'env_name' to 'value' 

745 

746 Save previous value, yield, and then restore the previous value stored in 

747 the environment variable 'env_name'. 

748 

749 If 'value' is None, do nothing""" 

750 value_changed = value is not None 

751 if value_changed: 

752 old_value = os.environ.get(env_name) 

753 os.environ[env_name] = value 

754 try: 

755 yield 

756 finally: 

757 if value_changed: 

758 if old_value is None: 

759 del os.environ[env_name] 

760 else: 

761 os.environ[env_name] = old_value 

762 

763 

764def should_bypass_proxies(url, no_proxy): 

765 """ 

766 Returns whether we should bypass proxies or not. 

767 

768 :rtype: bool 

769 """ 

770 # Prioritize lowercase environment variables over uppercase 

771 # to keep a consistent behaviour with other http projects (curl, wget). 

772 def get_proxy(key): 

773 return os.environ.get(key) or os.environ.get(key.upper()) 

774 

775 # First check whether no_proxy is defined. If it is, check that the URL 

776 # we're getting isn't in the no_proxy list. 

777 no_proxy_arg = no_proxy 

778 if no_proxy is None: 

779 no_proxy = get_proxy("no_proxy") 

780 parsed = urlparse(url) 

781 

782 if parsed.hostname is None: 

783 # URLs don't always have hostnames, e.g. file:/// urls. 

784 return True 

785 

786 if no_proxy: 

787 # We need to check whether we match here. We need to see if we match 

788 # the end of the hostname, both with and without the port. 

789 no_proxy = (host for host in no_proxy.replace(" ", "").split(",") if host) 

790 

791 if is_ipv4_address(parsed.hostname): 

792 for proxy_ip in no_proxy: 

793 if is_valid_cidr(proxy_ip): 

794 if address_in_network(parsed.hostname, proxy_ip): 

795 return True 

796 elif parsed.hostname == proxy_ip: 

797 # If no_proxy ip was defined in plain IP notation instead of cidr notation & 

798 # matches the IP of the index 

799 return True 

800 else: 

801 host_with_port = parsed.hostname 

802 if parsed.port: 

803 host_with_port += f":{parsed.port}" 

804 

805 for host in no_proxy: 

806 if parsed.hostname.endswith(host) or host_with_port.endswith(host): 

807 # The URL does match something in no_proxy, so we don't want 

808 # to apply the proxies on this URL. 

809 return True 

810 

811 with set_environ("no_proxy", no_proxy_arg): 

812 # parsed.hostname can be `None` in cases such as a file URI. 

813 try: 

814 bypass = proxy_bypass(parsed.hostname) 

815 except (TypeError, socket.gaierror): 

816 bypass = False 

817 

818 if bypass: 

819 return True 

820 

821 return False 

822 

823 

824def get_environ_proxies(url, no_proxy=None): 

825 """ 

826 Return a dict of environment proxies. 

827 

828 :rtype: dict 

829 """ 

830 if should_bypass_proxies(url, no_proxy=no_proxy): 

831 return {} 

832 else: 

833 return getproxies() 

834 

835 

836def select_proxy(url, proxies): 

837 """Select a proxy for the url, if applicable. 

838 

839 :param url: The url being for the request 

840 :param proxies: A dictionary of schemes or schemes and hosts to proxy URLs 

841 """ 

842 proxies = proxies or {} 

843 urlparts = urlparse(url) 

844 if urlparts.hostname is None: 

845 return proxies.get(urlparts.scheme, proxies.get("all")) 

846 

847 proxy_keys = [ 

848 urlparts.scheme + "://" + urlparts.hostname, 

849 urlparts.scheme, 

850 "all://" + urlparts.hostname, 

851 "all", 

852 ] 

853 proxy = None 

854 for proxy_key in proxy_keys: 

855 if proxy_key in proxies: 

856 proxy = proxies[proxy_key] 

857 break 

858 

859 return proxy 

860 

861 

862def resolve_proxies(request, proxies, trust_env=True): 

863 """This method takes proxy information from a request and configuration 

864 input to resolve a mapping of target proxies. This will consider settings 

865 such a NO_PROXY to strip proxy configurations. 

866 

867 :param request: Request or PreparedRequest 

868 :param proxies: A dictionary of schemes or schemes and hosts to proxy URLs 

869 :param trust_env: Boolean declaring whether to trust environment configs 

870 

871 :rtype: dict 

872 """ 

873 proxies = proxies if proxies is not None else {} 

874 url = request.url 

875 scheme = urlparse(url).scheme 

876 no_proxy = proxies.get("no_proxy") 

877 new_proxies = proxies.copy() 

878 

879 if trust_env and not should_bypass_proxies(url, no_proxy=no_proxy): 

880 environ_proxies = get_environ_proxies(url, no_proxy=no_proxy) 

881 

882 proxy = environ_proxies.get(scheme, environ_proxies.get("all")) 

883 

884 if proxy: 

885 new_proxies.setdefault(scheme, proxy) 

886 return new_proxies 

887 

888 

889def default_user_agent(name="python-requests"): 

890 """ 

891 Return a string representing the default user agent. 

892 

893 :rtype: str 

894 """ 

895 return f"{name}/{__version__}" 

896 

897 

898def default_headers(): 

899 """ 

900 :rtype: requests.structures.CaseInsensitiveDict 

901 """ 

902 return CaseInsensitiveDict( 

903 { 

904 "User-Agent": default_user_agent(), 

905 "Accept-Encoding": DEFAULT_ACCEPT_ENCODING, 

906 "Accept": "*/*", 

907 "Connection": "keep-alive", 

908 } 

909 ) 

910 

911 

912def parse_header_links(value): 

913 """Return a list of parsed link headers proxies. 

914 

915 i.e. Link: <http:/.../front.jpeg>; rel=front; type="image/jpeg",<http://.../back.jpeg>; rel=back;type="image/jpeg" 

916 

917 :rtype: list 

918 """ 

919 

920 links = [] 

921 

922 replace_chars = " '\"" 

923 

924 value = value.strip(replace_chars) 

925 if not value: 

926 return links 

927 

928 for val in re.split(", *<", value): 

929 try: 

930 url, params = val.split(";", 1) 

931 except ValueError: 

932 url, params = val, "" 

933 

934 link = {"url": url.strip("<> '\"")} 

935 

936 for param in params.split(";"): 

937 try: 

938 key, value = param.split("=") 

939 except ValueError: 

940 break 

941 

942 link[key.strip(replace_chars)] = value.strip(replace_chars) 

943 

944 links.append(link) 

945 

946 return links 

947 

948 

949# Null bytes; no need to recreate these on each call to guess_json_utf 

950_null = "\x00".encode("ascii") # encoding to ASCII for Python 3 

951_null2 = _null * 2 

952_null3 = _null * 3 

953 

954 

955def guess_json_utf(data): 

956 """ 

957 :rtype: str 

958 """ 

959 # JSON always starts with two ASCII characters, so detection is as 

960 # easy as counting the nulls and from their location and count 

961 # determine the encoding. Also detect a BOM, if present. 

962 sample = data[:4] 

963 if sample in (codecs.BOM_UTF32_LE, codecs.BOM_UTF32_BE): 

964 return "utf-32" # BOM included 

965 if sample[:3] == codecs.BOM_UTF8: 

966 return "utf-8-sig" # BOM included, MS style (discouraged) 

967 if sample[:2] in (codecs.BOM_UTF16_LE, codecs.BOM_UTF16_BE): 

968 return "utf-16" # BOM included 

969 nullcount = sample.count(_null) 

970 if nullcount == 0: 

971 return "utf-8" 

972 if nullcount == 2: 

973 if sample[::2] == _null2: # 1st and 3rd are null 

974 return "utf-16-be" 

975 if sample[1::2] == _null2: # 2nd and 4th are null 

976 return "utf-16-le" 

977 # Did not detect 2 valid UTF-16 ascii-range characters 

978 if nullcount == 3: 

979 if sample[:3] == _null3: 

980 return "utf-32-be" 

981 if sample[1:] == _null3: 

982 return "utf-32-le" 

983 # Did not detect a valid UTF-32 ascii-range character 

984 return None 

985 

986 

987def prepend_scheme_if_needed(url, new_scheme): 

988 """Given a URL that may or may not have a scheme, prepend the given scheme. 

989 Does not replace a present scheme with the one provided as an argument. 

990 

991 :rtype: str 

992 """ 

993 parsed = parse_url(url) 

994 scheme, auth, host, port, path, query, fragment = parsed 

995 

996 # A defect in urlparse determines that there isn't a netloc present in some 

997 # urls. We previously assumed parsing was overly cautious, and swapped the 

998 # netloc and path. Due to a lack of tests on the original defect, this is 

999 # maintained with parse_url for backwards compatibility. 

1000 netloc = parsed.netloc 

1001 if not netloc: 

1002 netloc, path = path, netloc 

1003 

1004 if auth: 

1005 # parse_url doesn't provide the netloc with auth 

1006 # so we'll add it ourselves. 

1007 netloc = "@".join([auth, netloc]) 

1008 if scheme is None: 

1009 scheme = new_scheme 

1010 if path is None: 

1011 path = "" 

1012 

1013 return urlunparse((scheme, netloc, path, "", query, fragment)) 

1014 

1015 

1016def get_auth_from_url(url): 

1017 """Given a url with authentication components, extract them into a tuple of 

1018 username,password. 

1019 

1020 :rtype: (str,str) 

1021 """ 

1022 parsed = urlparse(url) 

1023 

1024 try: 

1025 auth = (unquote(parsed.username), unquote(parsed.password)) 

1026 except (AttributeError, TypeError): 

1027 auth = ("", "") 

1028 

1029 return auth 

1030 

1031 

1032def check_header_validity(header): 

1033 """Verifies that header parts don't contain leading whitespace 

1034 reserved characters, or return characters. 

1035 

1036 :param header: tuple, in the format (name, value). 

1037 """ 

1038 name, value = header 

1039 _validate_header_part(header, name, 0) 

1040 _validate_header_part(header, value, 1) 

1041 

1042 

1043def _validate_header_part(header, header_part, header_validator_index): 

1044 if isinstance(header_part, str): 

1045 validator = _HEADER_VALIDATORS_STR[header_validator_index] 

1046 elif isinstance(header_part, bytes): 

1047 validator = _HEADER_VALIDATORS_BYTE[header_validator_index] 

1048 else: 

1049 raise InvalidHeader( 

1050 f"Header part ({header_part!r}) from {header} " 

1051 f"must be of type str or bytes, not {type(header_part)}" 

1052 ) 

1053 

1054 if not validator.match(header_part): 

1055 header_kind = "name" if header_validator_index == 0 else "value" 

1056 raise InvalidHeader( 

1057 f"Invalid leading whitespace, reserved character(s), or return" 

1058 f"character(s) in header {header_kind}: {header_part!r}" 

1059 ) 

1060 

1061 

1062def urldefragauth(url): 

1063 """ 

1064 Given a url remove the fragment and the authentication part. 

1065 

1066 :rtype: str 

1067 """ 

1068 scheme, netloc, path, params, query, fragment = urlparse(url) 

1069 

1070 # see func:`prepend_scheme_if_needed` 

1071 if not netloc: 

1072 netloc, path = path, netloc 

1073 

1074 netloc = netloc.rsplit("@", 1)[-1] 

1075 

1076 return urlunparse((scheme, netloc, path, params, query, "")) 

1077 

1078 

1079def rewind_body(prepared_request): 

1080 """Move file pointer back to its recorded starting position 

1081 so it can be read again on redirect. 

1082 """ 

1083 body_seek = getattr(prepared_request.body, "seek", None) 

1084 if body_seek is not None and isinstance( 

1085 prepared_request._body_position, integer_types 

1086 ): 

1087 try: 

1088 body_seek(prepared_request._body_position) 

1089 except OSError: 

1090 raise UnrewindableBodyError( 

1091 "An error occurred when rewinding request body for redirect." 

1092 ) 

1093 else: 

1094 raise UnrewindableBodyError("Unable to rewind request body for redirect.")