Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/requests/utils.py: 15%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

477 statements  

1""" 

2requests.utils 

3~~~~~~~~~~~~~~ 

4 

5This module provides utility functions that are used within Requests 

6that are also useful for external consumption. 

7""" 

8 

9import codecs 

10import contextlib 

11import io 

12import os 

13import re 

14import socket 

15import struct 

16import sys 

17import tempfile 

18import warnings 

19import zipfile 

20from collections import OrderedDict 

21 

22from urllib3.util import make_headers, parse_url 

23 

24from . import certs 

25from .__version__ import __version__ 

26 

27# to_native_string is unused here, but imported here for backwards compatibility 

28from ._internal_utils import ( # noqa: F401 

29 _HEADER_VALIDATORS_BYTE, 

30 _HEADER_VALIDATORS_STR, 

31 HEADER_VALIDATORS, 

32 to_native_string, 

33) 

34from .compat import ( 

35 Mapping, 

36 basestring, 

37 bytes, 

38 getproxies, 

39 getproxies_environment, 

40 integer_types, 

41 is_urllib3_1, 

42 proxy_bypass, 

43 proxy_bypass_environment, 

44 quote, 

45 str, 

46 unquote, 

47 urlparse, 

48 urlunparse, 

49) 

50from .compat import parse_http_list as _parse_list_header 

51from .cookies import cookiejar_from_dict 

52from .exceptions import ( 

53 FileModeWarning, 

54 InvalidHeader, 

55 InvalidURL, 

56 UnrewindableBodyError, 

57) 

58from .structures import CaseInsensitiveDict 

59 

60NETRC_FILES = (".netrc", "_netrc") 

61 

62# Certificate is extracted by certifi when needed. 

63DEFAULT_CA_BUNDLE_PATH = certs.where() 

64 

65DEFAULT_PORTS = {"http": 80, "https": 443} 

66 

67# Ensure that ', ' is used to preserve previous delimiter behavior. 

68DEFAULT_ACCEPT_ENCODING = ", ".join( 

69 re.split(r",\s*", make_headers(accept_encoding=True)["accept-encoding"]) 

70) 

71 

72 

73if sys.platform == "win32": 

74 # provide a proxy_bypass version on Windows without DNS lookups 

75 

76 def proxy_bypass_registry(host): 

77 try: 

78 import winreg 

79 except ImportError: 

80 return False 

81 

82 try: 

83 internetSettings = winreg.OpenKey( 

84 winreg.HKEY_CURRENT_USER, 

85 r"Software\Microsoft\Windows\CurrentVersion\Internet Settings", 

86 ) 

87 # ProxyEnable could be REG_SZ or REG_DWORD, normalizing it 

88 proxyEnable = int(winreg.QueryValueEx(internetSettings, "ProxyEnable")[0]) 

89 # ProxyOverride is almost always a string 

90 proxyOverride = winreg.QueryValueEx(internetSettings, "ProxyOverride")[0] 

91 except (OSError, ValueError): 

92 return False 

93 if not proxyEnable or not proxyOverride: 

94 return False 

95 

96 # make a check value list from the registry entry: replace the 

97 # '<local>' string by the localhost entry and the corresponding 

98 # canonical entry. 

99 proxyOverride = proxyOverride.split(";") 

100 # filter out empty strings to avoid re.match return true in the following code. 

101 proxyOverride = filter(None, proxyOverride) 

102 # now check if we match one of the registry values. 

103 for test in proxyOverride: 

104 if test == "<local>": 

105 if "." not in host: 

106 return True 

107 test = test.replace(".", r"\.") # mask dots 

108 test = test.replace("*", r".*") # change glob sequence 

109 test = test.replace("?", r".") # change glob char 

110 if re.match(test, host, re.I): 

111 return True 

112 return False 

113 

114 def proxy_bypass(host): # noqa 

115 """Return True, if the host should be bypassed. 

116 

117 Checks proxy settings gathered from the environment, if specified, 

118 or the registry. 

119 """ 

120 if getproxies_environment(): 

121 return proxy_bypass_environment(host) 

122 else: 

123 return proxy_bypass_registry(host) 

124 

125 

126def dict_to_sequence(d): 

127 """Returns an internal sequence dictionary update.""" 

128 

129 if hasattr(d, "items"): 

130 d = d.items() 

131 

132 return d 

133 

134 

135def super_len(o): 

136 total_length = None 

137 current_position = 0 

138 

139 if not is_urllib3_1 and isinstance(o, str): 

140 # urllib3 2.x+ treats all strings as utf-8 instead 

141 # of latin-1 (iso-8859-1) like http.client. 

142 o = o.encode("utf-8") 

143 

144 if hasattr(o, "__len__"): 

145 total_length = len(o) 

146 

147 elif hasattr(o, "len"): 

148 total_length = o.len 

149 

150 elif hasattr(o, "fileno"): 

151 try: 

152 fileno = o.fileno() 

153 except (io.UnsupportedOperation, AttributeError): 

154 # AttributeError is a surprising exception, seeing as how we've just checked 

155 # that `hasattr(o, 'fileno')`. It happens for objects obtained via 

156 # `Tarfile.extractfile()`, per issue 5229. 

157 pass 

158 else: 

159 total_length = os.fstat(fileno).st_size 

160 

161 # Having used fstat to determine the file length, we need to 

162 # confirm that this file was opened up in binary mode. 

163 if "b" not in o.mode: 

164 warnings.warn( 

165 ( 

166 "Requests has determined the content-length for this " 

167 "request using the binary size of the file: however, the " 

168 "file has been opened in text mode (i.e. without the 'b' " 

169 "flag in the mode). This may lead to an incorrect " 

170 "content-length. In Requests 3.0, support will be removed " 

171 "for files in text mode." 

172 ), 

173 FileModeWarning, 

174 ) 

175 

176 if hasattr(o, "tell"): 

177 try: 

178 current_position = o.tell() 

179 except OSError: 

180 # This can happen in some weird situations, such as when the file 

181 # is actually a special file descriptor like stdin. In this 

182 # instance, we don't know what the length is, so set it to zero and 

183 # let requests chunk it instead. 

184 if total_length is not None: 

185 current_position = total_length 

186 else: 

187 if hasattr(o, "seek") and total_length is None: 

188 # StringIO and BytesIO have seek but no usable fileno 

189 try: 

190 # seek to end of file 

191 o.seek(0, 2) 

192 total_length = o.tell() 

193 

194 # seek back to current position to support 

195 # partially read file-like objects 

196 o.seek(current_position or 0) 

197 except OSError: 

198 total_length = 0 

199 

200 if total_length is None: 

201 total_length = 0 

202 

203 return max(0, total_length - current_position) 

204 

205 

206def get_netrc_auth(url, raise_errors=False): 

207 """Returns the Requests tuple auth for a given url from netrc.""" 

208 

209 netrc_file = os.environ.get("NETRC") 

210 if netrc_file is not None: 

211 netrc_locations = (netrc_file,) 

212 else: 

213 netrc_locations = (f"~/{f}" for f in NETRC_FILES) 

214 

215 try: 

216 from netrc import NetrcParseError, netrc 

217 

218 netrc_path = None 

219 

220 for f in netrc_locations: 

221 loc = os.path.expanduser(f) 

222 if os.path.exists(loc): 

223 netrc_path = loc 

224 break 

225 

226 # Abort early if there isn't one. 

227 if netrc_path is None: 

228 return 

229 

230 ri = urlparse(url) 

231 host = ri.hostname 

232 

233 try: 

234 _netrc = netrc(netrc_path).authenticators(host) 

235 if _netrc and any(_netrc): 

236 # Return with login / password 

237 login_i = 0 if _netrc[0] else 1 

238 return (_netrc[login_i], _netrc[2]) 

239 except (NetrcParseError, OSError): 

240 # If there was a parsing error or a permissions issue reading the file, 

241 # we'll just skip netrc auth unless explicitly asked to raise errors. 

242 if raise_errors: 

243 raise 

244 

245 # App Engine hackiness. 

246 except (ImportError, AttributeError): 

247 pass 

248 

249 

250def guess_filename(obj): 

251 """Tries to guess the filename of the given object.""" 

252 name = getattr(obj, "name", None) 

253 if name and isinstance(name, basestring) and name[0] != "<" and name[-1] != ">": 

254 return os.path.basename(name) 

255 

256 

257def extract_zipped_paths(path): 

258 """Replace nonexistent paths that look like they refer to a member of a zip 

259 archive with the location of an extracted copy of the target, or else 

260 just return the provided path unchanged. 

261 """ 

262 if os.path.exists(path): 

263 # this is already a valid path, no need to do anything further 

264 return path 

265 

266 # find the first valid part of the provided path and treat that as a zip archive 

267 # assume the rest of the path is the name of a member in the archive 

268 archive, member = os.path.split(path) 

269 while archive and not os.path.exists(archive): 

270 archive, prefix = os.path.split(archive) 

271 if not prefix: 

272 # If we don't check for an empty prefix after the split (in other words, archive remains unchanged after the split), 

273 # we _can_ end up in an infinite loop on a rare corner case affecting a small number of users 

274 break 

275 member = "/".join([prefix, member]) 

276 

277 if not zipfile.is_zipfile(archive): 

278 return path 

279 

280 zip_file = zipfile.ZipFile(archive) 

281 if member not in zip_file.namelist(): 

282 return path 

283 

284 # we have a valid zip archive and a valid member of that archive 

285 suffix = os.path.splitext(member.split("/")[-1])[-1] 

286 fd, extracted_path = tempfile.mkstemp(suffix=suffix) 

287 try: 

288 os.write(fd, zip_file.read(member)) 

289 finally: 

290 os.close(fd) 

291 

292 return extracted_path 

293 

294 

295@contextlib.contextmanager 

296def atomic_open(filename): 

297 """Write a file to the disk in an atomic fashion""" 

298 tmp_descriptor, tmp_name = tempfile.mkstemp(dir=os.path.dirname(filename)) 

299 try: 

300 with os.fdopen(tmp_descriptor, "wb") as tmp_handler: 

301 yield tmp_handler 

302 os.replace(tmp_name, filename) 

303 except BaseException: 

304 os.remove(tmp_name) 

305 raise 

306 

307 

308def from_key_val_list(value): 

309 """Take an object and test to see if it can be represented as a 

310 dictionary. Unless it can not be represented as such, return an 

311 OrderedDict, e.g., 

312 

313 :: 

314 

315 >>> from_key_val_list([('key', 'val')]) 

316 OrderedDict([('key', 'val')]) 

317 >>> from_key_val_list('string') 

318 Traceback (most recent call last): 

319 ... 

320 ValueError: cannot encode objects that are not 2-tuples 

321 >>> from_key_val_list({'key': 'val'}) 

322 OrderedDict([('key', 'val')]) 

323 

324 :rtype: OrderedDict 

325 """ 

326 if value is None: 

327 return None 

328 

329 if isinstance(value, (str, bytes, bool, int)): 

330 raise ValueError("cannot encode objects that are not 2-tuples") 

331 

332 return OrderedDict(value) 

333 

334 

335def to_key_val_list(value): 

336 """Take an object and test to see if it can be represented as a 

337 dictionary. If it can be, return a list of tuples, e.g., 

338 

339 :: 

340 

341 >>> to_key_val_list([('key', 'val')]) 

342 [('key', 'val')] 

343 >>> to_key_val_list({'key': 'val'}) 

344 [('key', 'val')] 

345 >>> to_key_val_list('string') 

346 Traceback (most recent call last): 

347 ... 

348 ValueError: cannot encode objects that are not 2-tuples 

349 

350 :rtype: list 

351 """ 

352 if value is None: 

353 return None 

354 

355 if isinstance(value, (str, bytes, bool, int)): 

356 raise ValueError("cannot encode objects that are not 2-tuples") 

357 

358 if isinstance(value, Mapping): 

359 value = value.items() 

360 

361 return list(value) 

362 

363 

364# From mitsuhiko/werkzeug (used with permission). 

365def parse_list_header(value): 

366 """Parse lists as described by RFC 2068 Section 2. 

367 

368 In particular, parse comma-separated lists where the elements of 

369 the list may include quoted-strings. A quoted-string could 

370 contain a comma. A non-quoted string could have quotes in the 

371 middle. Quotes are removed automatically after parsing. 

372 

373 It basically works like :func:`parse_set_header` just that items 

374 may appear multiple times and case sensitivity is preserved. 

375 

376 The return value is a standard :class:`list`: 

377 

378 >>> parse_list_header('token, "quoted value"') 

379 ['token', 'quoted value'] 

380 

381 To create a header from the :class:`list` again, use the 

382 :func:`dump_header` function. 

383 

384 :param value: a string with a list header. 

385 :return: :class:`list` 

386 :rtype: list 

387 """ 

388 result = [] 

389 for item in _parse_list_header(value): 

390 if item[:1] == item[-1:] == '"': 

391 item = unquote_header_value(item[1:-1]) 

392 result.append(item) 

393 return result 

394 

395 

396# From mitsuhiko/werkzeug (used with permission). 

397def parse_dict_header(value): 

398 """Parse lists of key, value pairs as described by RFC 2068 Section 2 and 

399 convert them into a python dict: 

400 

401 >>> d = parse_dict_header('foo="is a fish", bar="as well"') 

402 >>> type(d) is dict 

403 True 

404 >>> sorted(d.items()) 

405 [('bar', 'as well'), ('foo', 'is a fish')] 

406 

407 If there is no value for a key it will be `None`: 

408 

409 >>> parse_dict_header('key_without_value') 

410 {'key_without_value': None} 

411 

412 To create a header from the :class:`dict` again, use the 

413 :func:`dump_header` function. 

414 

415 :param value: a string with a dict header. 

416 :return: :class:`dict` 

417 :rtype: dict 

418 """ 

419 result = {} 

420 for item in _parse_list_header(value): 

421 if "=" not in item: 

422 result[item] = None 

423 continue 

424 name, value = item.split("=", 1) 

425 if value[:1] == value[-1:] == '"': 

426 value = unquote_header_value(value[1:-1]) 

427 result[name] = value 

428 return result 

429 

430 

431# From mitsuhiko/werkzeug (used with permission). 

432def unquote_header_value(value, is_filename=False): 

433 r"""Unquotes a header value. (Reversal of :func:`quote_header_value`). 

434 This does not use the real unquoting but what browsers are actually 

435 using for quoting. 

436 

437 :param value: the header value to unquote. 

438 :rtype: str 

439 """ 

440 if value and value[0] == value[-1] == '"': 

441 # this is not the real unquoting, but fixing this so that the 

442 # RFC is met will result in bugs with internet explorer and 

443 # probably some other browsers as well. IE for example is 

444 # uploading files with "C:\foo\bar.txt" as filename 

445 value = value[1:-1] 

446 

447 # if this is a filename and the starting characters look like 

448 # a UNC path, then just return the value without quotes. Using the 

449 # replace sequence below on a UNC path has the effect of turning 

450 # the leading double slash into a single slash and then 

451 # _fix_ie_filename() doesn't work correctly. See #458. 

452 if not is_filename or value[:2] != "\\\\": 

453 return value.replace("\\\\", "\\").replace('\\"', '"') 

454 return value 

455 

456 

457def dict_from_cookiejar(cj): 

458 """Returns a key/value dictionary from a CookieJar. 

459 

460 :param cj: CookieJar object to extract cookies from. 

461 :rtype: dict 

462 """ 

463 

464 cookie_dict = {cookie.name: cookie.value for cookie in cj} 

465 return cookie_dict 

466 

467 

468def add_dict_to_cookiejar(cj, cookie_dict): 

469 """Returns a CookieJar from a key/value dictionary. 

470 

471 :param cj: CookieJar to insert cookies into. 

472 :param cookie_dict: Dict of key/values to insert into CookieJar. 

473 :rtype: CookieJar 

474 """ 

475 

476 return cookiejar_from_dict(cookie_dict, cj) 

477 

478 

479def get_encodings_from_content(content): 

480 """Returns encodings from given content string. 

481 

482 :param content: bytestring to extract encodings from. 

483 """ 

484 warnings.warn( 

485 ( 

486 "In requests 3.0, get_encodings_from_content will be removed. For " 

487 "more information, please see the discussion on issue #2266. (This" 

488 " warning should only appear once.)" 

489 ), 

490 DeprecationWarning, 

491 ) 

492 

493 charset_re = re.compile(r'<meta.*?charset=["\']*(.+?)["\'>]', flags=re.I) 

494 pragma_re = re.compile(r'<meta.*?content=["\']*;?charset=(.+?)["\'>]', flags=re.I) 

495 xml_re = re.compile(r'^<\?xml.*?encoding=["\']*(.+?)["\'>]') 

496 

497 return ( 

498 charset_re.findall(content) 

499 + pragma_re.findall(content) 

500 + xml_re.findall(content) 

501 ) 

502 

503 

504def _parse_content_type_header(header): 

505 """Returns content type and parameters from given header. 

506 

507 :param header: string 

508 :return: tuple containing content type and dictionary of 

509 parameters. 

510 """ 

511 

512 tokens = header.split(";") 

513 content_type, params = tokens[0].strip(), tokens[1:] 

514 params_dict = {} 

515 strip_chars = "\"' " 

516 

517 for param in params: 

518 param = param.strip() 

519 if param and (idx := param.find("=")) != -1: 

520 key = param[:idx].strip(strip_chars) 

521 value = param[idx + 1 :].strip(strip_chars) 

522 params_dict[key.lower()] = value 

523 return content_type, params_dict 

524 

525 

526def get_encoding_from_headers(headers): 

527 """Returns encodings from given HTTP Header Dict. 

528 

529 :param headers: dictionary to extract encoding from. 

530 :rtype: str 

531 """ 

532 

533 content_type = headers.get("content-type") 

534 

535 if not content_type: 

536 return None 

537 

538 content_type, params = _parse_content_type_header(content_type) 

539 

540 if "charset" in params: 

541 return params["charset"].strip("'\"") 

542 

543 if "text" in content_type: 

544 return "ISO-8859-1" 

545 

546 if "application/json" in content_type: 

547 # Assume UTF-8 based on RFC 4627: https://www.ietf.org/rfc/rfc4627.txt since the charset was unset 

548 return "utf-8" 

549 

550 

551def stream_decode_response_unicode(iterator, r): 

552 """Stream decodes an iterator.""" 

553 

554 if r.encoding is None: 

555 yield from iterator 

556 return 

557 

558 decoder = codecs.getincrementaldecoder(r.encoding)(errors="replace") 

559 for chunk in iterator: 

560 rv = decoder.decode(chunk) 

561 if rv: 

562 yield rv 

563 rv = decoder.decode(b"", final=True) 

564 if rv: 

565 yield rv 

566 

567 

568def iter_slices(string, slice_length): 

569 """Iterate over slices of a string.""" 

570 pos = 0 

571 if slice_length is None or slice_length <= 0: 

572 slice_length = len(string) 

573 while pos < len(string): 

574 yield string[pos : pos + slice_length] 

575 pos += slice_length 

576 

577 

578def get_unicode_from_response(r): 

579 """Returns the requested content back in unicode. 

580 

581 :param r: Response object to get unicode content from. 

582 

583 Tried: 

584 

585 1. charset from content-type 

586 2. fall back and replace all unicode characters 

587 

588 :rtype: str 

589 """ 

590 warnings.warn( 

591 ( 

592 "In requests 3.0, get_unicode_from_response will be removed. For " 

593 "more information, please see the discussion on issue #2266. (This" 

594 " warning should only appear once.)" 

595 ), 

596 DeprecationWarning, 

597 ) 

598 

599 tried_encodings = [] 

600 

601 # Try charset from content-type 

602 encoding = get_encoding_from_headers(r.headers) 

603 

604 if encoding: 

605 try: 

606 return str(r.content, encoding) 

607 except UnicodeError: 

608 tried_encodings.append(encoding) 

609 

610 # Fall back: 

611 try: 

612 return str(r.content, encoding, errors="replace") 

613 except TypeError: 

614 return r.content 

615 

616 

617# The unreserved URI characters (RFC 3986) 

618UNRESERVED_SET = frozenset( 

619 "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" + "0123456789-._~" 

620) 

621 

622 

623def unquote_unreserved(uri): 

624 """Un-escape any percent-escape sequences in a URI that are unreserved 

625 characters. This leaves all reserved, illegal and non-ASCII bytes encoded. 

626 

627 :rtype: str 

628 """ 

629 parts = uri.split("%") 

630 for i in range(1, len(parts)): 

631 h = parts[i][0:2] 

632 if len(h) == 2 and h.isalnum(): 

633 try: 

634 c = chr(int(h, 16)) 

635 except ValueError: 

636 raise InvalidURL(f"Invalid percent-escape sequence: '{h}'") 

637 

638 if c in UNRESERVED_SET: 

639 parts[i] = c + parts[i][2:] 

640 else: 

641 parts[i] = f"%{parts[i]}" 

642 else: 

643 parts[i] = f"%{parts[i]}" 

644 return "".join(parts) 

645 

646 

647def requote_uri(uri): 

648 """Re-quote the given URI. 

649 

650 This function passes the given URI through an unquote/quote cycle to 

651 ensure that it is fully and consistently quoted. 

652 

653 :rtype: str 

654 """ 

655 safe_with_percent = "!#$%&'()*+,/:;=?@[]~" 

656 safe_without_percent = "!#$&'()*+,/:;=?@[]~" 

657 try: 

658 # Unquote only the unreserved characters 

659 # Then quote only illegal characters (do not quote reserved, 

660 # unreserved, or '%') 

661 return quote(unquote_unreserved(uri), safe=safe_with_percent) 

662 except InvalidURL: 

663 # We couldn't unquote the given URI, so let's try quoting it, but 

664 # there may be unquoted '%'s in the URI. We need to make sure they're 

665 # properly quoted so they do not cause issues elsewhere. 

666 return quote(uri, safe=safe_without_percent) 

667 

668 

669def address_in_network(ip, net): 

670 """This function allows you to check if an IP belongs to a network subnet 

671 

672 Example: returns True if ip = 192.168.1.1 and net = 192.168.1.0/24 

673 returns False if ip = 192.168.1.1 and net = 192.168.100.0/24 

674 

675 :rtype: bool 

676 """ 

677 ipaddr = struct.unpack("=L", socket.inet_aton(ip))[0] 

678 netaddr, bits = net.split("/") 

679 netmask = struct.unpack("=L", socket.inet_aton(dotted_netmask(int(bits))))[0] 

680 network = struct.unpack("=L", socket.inet_aton(netaddr))[0] & netmask 

681 return (ipaddr & netmask) == (network & netmask) 

682 

683 

684def dotted_netmask(mask): 

685 """Converts mask from /xx format to xxx.xxx.xxx.xxx 

686 

687 Example: if mask is 24 function returns 255.255.255.0 

688 

689 :rtype: str 

690 """ 

691 bits = 0xFFFFFFFF ^ (1 << 32 - mask) - 1 

692 return socket.inet_ntoa(struct.pack(">I", bits)) 

693 

694 

695def is_ipv4_address(string_ip): 

696 """ 

697 :rtype: bool 

698 """ 

699 try: 

700 socket.inet_aton(string_ip) 

701 except OSError: 

702 return False 

703 return True 

704 

705 

706def is_valid_cidr(string_network): 

707 """ 

708 Very simple check of the cidr format in no_proxy variable. 

709 

710 :rtype: bool 

711 """ 

712 if string_network.count("/") == 1: 

713 try: 

714 mask = int(string_network.split("/")[1]) 

715 except ValueError: 

716 return False 

717 

718 if mask < 1 or mask > 32: 

719 return False 

720 

721 try: 

722 socket.inet_aton(string_network.split("/")[0]) 

723 except OSError: 

724 return False 

725 else: 

726 return False 

727 return True 

728 

729 

730@contextlib.contextmanager 

731def set_environ(env_name, value): 

732 """Set the environment variable 'env_name' to 'value' 

733 

734 Save previous value, yield, and then restore the previous value stored in 

735 the environment variable 'env_name'. 

736 

737 If 'value' is None, do nothing""" 

738 value_changed = value is not None 

739 if value_changed: 

740 old_value = os.environ.get(env_name) 

741 os.environ[env_name] = value 

742 try: 

743 yield 

744 finally: 

745 if value_changed: 

746 if old_value is None: 

747 del os.environ[env_name] 

748 else: 

749 os.environ[env_name] = old_value 

750 

751 

752def should_bypass_proxies(url, no_proxy): 

753 """ 

754 Returns whether we should bypass proxies or not. 

755 

756 :rtype: bool 

757 """ 

758 

759 # Prioritize lowercase environment variables over uppercase 

760 # to keep a consistent behaviour with other http projects (curl, wget). 

761 def get_proxy(key): 

762 return os.environ.get(key) or os.environ.get(key.upper()) 

763 

764 # First check whether no_proxy is defined. If it is, check that the URL 

765 # we're getting isn't in the no_proxy list. 

766 no_proxy_arg = no_proxy 

767 if no_proxy is None: 

768 no_proxy = get_proxy("no_proxy") 

769 parsed = urlparse(url) 

770 

771 if parsed.hostname is None: 

772 # URLs don't always have hostnames, e.g. file:/// urls. 

773 return True 

774 

775 if no_proxy: 

776 # We need to check whether we match here. We need to see if we match 

777 # the end of the hostname, both with and without the port. 

778 no_proxy = (host for host in no_proxy.replace(" ", "").split(",") if host) 

779 

780 if is_ipv4_address(parsed.hostname): 

781 for proxy_ip in no_proxy: 

782 if is_valid_cidr(proxy_ip): 

783 if address_in_network(parsed.hostname, proxy_ip): 

784 return True 

785 elif parsed.hostname == proxy_ip: 

786 # If no_proxy ip was defined in plain IP notation instead of cidr notation & 

787 # matches the IP of the index 

788 return True 

789 else: 

790 host_with_port = parsed.hostname 

791 if parsed.port: 

792 host_with_port += f":{parsed.port}" 

793 

794 for host in no_proxy: 

795 if parsed.hostname.endswith(host) or host_with_port.endswith(host): 

796 # The URL does match something in no_proxy, so we don't want 

797 # to apply the proxies on this URL. 

798 return True 

799 

800 with set_environ("no_proxy", no_proxy_arg): 

801 # parsed.hostname can be `None` in cases such as a file URI. 

802 try: 

803 bypass = proxy_bypass(parsed.hostname) 

804 except (TypeError, socket.gaierror): 

805 bypass = False 

806 

807 if bypass: 

808 return True 

809 

810 return False 

811 

812 

813def get_environ_proxies(url, no_proxy=None): 

814 """ 

815 Return a dict of environment proxies. 

816 

817 :rtype: dict 

818 """ 

819 if should_bypass_proxies(url, no_proxy=no_proxy): 

820 return {} 

821 else: 

822 return getproxies() 

823 

824 

825def select_proxy(url, proxies): 

826 """Select a proxy for the url, if applicable. 

827 

828 :param url: The url being for the request 

829 :param proxies: A dictionary of schemes or schemes and hosts to proxy URLs 

830 """ 

831 proxies = proxies or {} 

832 urlparts = urlparse(url) 

833 if urlparts.hostname is None: 

834 return proxies.get(urlparts.scheme, proxies.get("all")) 

835 

836 proxy_keys = [ 

837 urlparts.scheme + "://" + urlparts.hostname, 

838 urlparts.scheme, 

839 "all://" + urlparts.hostname, 

840 "all", 

841 ] 

842 proxy = None 

843 for proxy_key in proxy_keys: 

844 if proxy_key in proxies: 

845 proxy = proxies[proxy_key] 

846 break 

847 

848 return proxy 

849 

850 

851def resolve_proxies(request, proxies, trust_env=True): 

852 """This method takes proxy information from a request and configuration 

853 input to resolve a mapping of target proxies. This will consider settings 

854 such as NO_PROXY to strip proxy configurations. 

855 

856 :param request: Request or PreparedRequest 

857 :param proxies: A dictionary of schemes or schemes and hosts to proxy URLs 

858 :param trust_env: Boolean declaring whether to trust environment configs 

859 

860 :rtype: dict 

861 """ 

862 proxies = proxies if proxies is not None else {} 

863 url = request.url 

864 scheme = urlparse(url).scheme 

865 no_proxy = proxies.get("no_proxy") 

866 new_proxies = proxies.copy() 

867 

868 if trust_env and not should_bypass_proxies(url, no_proxy=no_proxy): 

869 environ_proxies = get_environ_proxies(url, no_proxy=no_proxy) 

870 

871 proxy = environ_proxies.get(scheme, environ_proxies.get("all")) 

872 

873 if proxy: 

874 new_proxies.setdefault(scheme, proxy) 

875 return new_proxies 

876 

877 

878def default_user_agent(name="python-requests"): 

879 """ 

880 Return a string representing the default user agent. 

881 

882 :rtype: str 

883 """ 

884 return f"{name}/{__version__}" 

885 

886 

887def default_headers(): 

888 """ 

889 :rtype: requests.structures.CaseInsensitiveDict 

890 """ 

891 return CaseInsensitiveDict( 

892 { 

893 "User-Agent": default_user_agent(), 

894 "Accept-Encoding": DEFAULT_ACCEPT_ENCODING, 

895 "Accept": "*/*", 

896 "Connection": "keep-alive", 

897 } 

898 ) 

899 

900 

901def parse_header_links(value): 

902 """Return a list of parsed link headers proxies. 

903 

904 i.e. Link: <http:/.../front.jpeg>; rel=front; type="image/jpeg",<http://.../back.jpeg>; rel=back;type="image/jpeg" 

905 

906 :rtype: list 

907 """ 

908 

909 links = [] 

910 

911 replace_chars = " '\"" 

912 

913 value = value.strip(replace_chars) 

914 if not value: 

915 return links 

916 

917 for val in re.split(", *<", value): 

918 try: 

919 url, params = val.split(";", 1) 

920 except ValueError: 

921 url, params = val, "" 

922 

923 link = {"url": url.strip("<> '\"")} 

924 

925 for param in params.split(";"): 

926 try: 

927 key, value = param.split("=") 

928 except ValueError: 

929 break 

930 

931 link[key.strip(replace_chars)] = value.strip(replace_chars) 

932 

933 links.append(link) 

934 

935 return links 

936 

937 

938# Null bytes; no need to recreate these on each call to guess_json_utf 

939_null = "\x00".encode("ascii") # encoding to ASCII for Python 3 

940_null2 = _null * 2 

941_null3 = _null * 3 

942 

943 

944def guess_json_utf(data): 

945 """ 

946 :rtype: str 

947 """ 

948 # JSON always starts with two ASCII characters, so detection is as 

949 # easy as counting the nulls and from their location and count 

950 # determine the encoding. Also detect a BOM, if present. 

951 sample = data[:4] 

952 if sample in (codecs.BOM_UTF32_LE, codecs.BOM_UTF32_BE): 

953 return "utf-32" # BOM included 

954 if sample[:3] == codecs.BOM_UTF8: 

955 return "utf-8-sig" # BOM included, MS style (discouraged) 

956 if sample[:2] in (codecs.BOM_UTF16_LE, codecs.BOM_UTF16_BE): 

957 return "utf-16" # BOM included 

958 nullcount = sample.count(_null) 

959 if nullcount == 0: 

960 return "utf-8" 

961 if nullcount == 2: 

962 if sample[::2] == _null2: # 1st and 3rd are null 

963 return "utf-16-be" 

964 if sample[1::2] == _null2: # 2nd and 4th are null 

965 return "utf-16-le" 

966 # Did not detect 2 valid UTF-16 ascii-range characters 

967 if nullcount == 3: 

968 if sample[:3] == _null3: 

969 return "utf-32-be" 

970 if sample[1:] == _null3: 

971 return "utf-32-le" 

972 # Did not detect a valid UTF-32 ascii-range character 

973 return None 

974 

975 

976def prepend_scheme_if_needed(url, new_scheme): 

977 """Given a URL that may or may not have a scheme, prepend the given scheme. 

978 Does not replace a present scheme with the one provided as an argument. 

979 

980 :rtype: str 

981 """ 

982 parsed = parse_url(url) 

983 scheme, auth, host, port, path, query, fragment = parsed 

984 

985 # A defect in urlparse determines that there isn't a netloc present in some 

986 # urls. We previously assumed parsing was overly cautious, and swapped the 

987 # netloc and path. Due to a lack of tests on the original defect, this is 

988 # maintained with parse_url for backwards compatibility. 

989 netloc = parsed.netloc 

990 if not netloc: 

991 netloc, path = path, netloc 

992 

993 if auth: 

994 # parse_url doesn't provide the netloc with auth 

995 # so we'll add it ourselves. 

996 netloc = "@".join([auth, netloc]) 

997 if scheme is None: 

998 scheme = new_scheme 

999 if path is None: 

1000 path = "" 

1001 

1002 return urlunparse((scheme, netloc, path, "", query, fragment)) 

1003 

1004 

1005def get_auth_from_url(url): 

1006 """Given a url with authentication components, extract them into a tuple of 

1007 username,password. 

1008 

1009 :rtype: (str,str) 

1010 """ 

1011 parsed = urlparse(url) 

1012 

1013 try: 

1014 auth = (unquote(parsed.username), unquote(parsed.password)) 

1015 except (AttributeError, TypeError): 

1016 auth = ("", "") 

1017 

1018 return auth 

1019 

1020 

1021def check_header_validity(header): 

1022 """Verifies that header parts don't contain leading whitespace 

1023 reserved characters, or return characters. 

1024 

1025 :param header: tuple, in the format (name, value). 

1026 """ 

1027 name, value = header 

1028 _validate_header_part(header, name, 0) 

1029 _validate_header_part(header, value, 1) 

1030 

1031 

1032def _validate_header_part(header, header_part, header_validator_index): 

1033 if isinstance(header_part, str): 

1034 validator = _HEADER_VALIDATORS_STR[header_validator_index] 

1035 elif isinstance(header_part, bytes): 

1036 validator = _HEADER_VALIDATORS_BYTE[header_validator_index] 

1037 else: 

1038 raise InvalidHeader( 

1039 f"Header part ({header_part!r}) from {header} " 

1040 f"must be of type str or bytes, not {type(header_part)}" 

1041 ) 

1042 

1043 if not validator.match(header_part): 

1044 header_kind = "name" if header_validator_index == 0 else "value" 

1045 raise InvalidHeader( 

1046 f"Invalid leading whitespace, reserved character(s), or return " 

1047 f"character(s) in header {header_kind}: {header_part!r}" 

1048 ) 

1049 

1050 

1051def urldefragauth(url): 

1052 """ 

1053 Given a url remove the fragment and the authentication part. 

1054 

1055 :rtype: str 

1056 """ 

1057 scheme, netloc, path, params, query, fragment = urlparse(url) 

1058 

1059 # see func:`prepend_scheme_if_needed` 

1060 if not netloc: 

1061 netloc, path = path, netloc 

1062 

1063 netloc = netloc.rsplit("@", 1)[-1] 

1064 

1065 return urlunparse((scheme, netloc, path, params, query, "")) 

1066 

1067 

1068def rewind_body(prepared_request): 

1069 """Move file pointer back to its recorded starting position 

1070 so it can be read again on redirect. 

1071 """ 

1072 body_seek = getattr(prepared_request.body, "seek", None) 

1073 if body_seek is not None and isinstance( 

1074 prepared_request._body_position, integer_types 

1075 ): 

1076 try: 

1077 body_seek(prepared_request._body_position) 

1078 except OSError: 

1079 raise UnrewindableBodyError( 

1080 "An error occurred when rewinding request body for redirect." 

1081 ) 

1082 else: 

1083 raise UnrewindableBodyError("Unable to rewind request body for redirect.")