Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pip/_vendor/requests/utils.py: 16%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

477 statements  

1""" 

2requests.utils 

3~~~~~~~~~~~~~~ 

4 

5This module provides utility functions that are used within Requests 

6that are also useful for external consumption. 

7""" 

8 

9import codecs 

10import contextlib 

11import io 

12import os 

13import re 

14import socket 

15import struct 

16import sys 

17import tempfile 

18import warnings 

19import zipfile 

20from collections import OrderedDict 

21 

22from pip._vendor.urllib3.util import make_headers, parse_url 

23 

24from . import certs 

25from .__version__ import __version__ 

26 

27# to_native_string is unused here, but imported here for backwards compatibility 

28from ._internal_utils import ( # noqa: F401 

29 _HEADER_VALIDATORS_BYTE, 

30 _HEADER_VALIDATORS_STR, 

31 HEADER_VALIDATORS, 

32 to_native_string, 

33) 

34from .compat import ( 

35 Mapping, 

36 basestring, 

37 bytes, 

38 getproxies, 

39 getproxies_environment, 

40 integer_types, 

41 is_urllib3_1, 

42 proxy_bypass, 

43 proxy_bypass_environment, 

44 quote, 

45 str, 

46 unquote, 

47 urlparse, 

48 urlunparse, 

49) 

50from .compat import parse_http_list as _parse_list_header 

51from .cookies import cookiejar_from_dict 

52from .exceptions import ( 

53 FileModeWarning, 

54 InvalidHeader, 

55 InvalidURL, 

56 UnrewindableBodyError, 

57) 

58from .structures import CaseInsensitiveDict 

59 

60NETRC_FILES = (".netrc", "_netrc") 

61 

62# Certificate is extracted by certifi when needed. 

63DEFAULT_CA_BUNDLE_PATH = certs.where() 

64 

65DEFAULT_PORTS = {"http": 80, "https": 443} 

66 

67# Ensure that ', ' is used to preserve previous delimiter behavior. 

68DEFAULT_ACCEPT_ENCODING = ", ".join( 

69 re.split(r",\s*", make_headers(accept_encoding=True)["accept-encoding"]) 

70) 

71 

72 

73if sys.platform == "win32": 

74 # provide a proxy_bypass version on Windows without DNS lookups 

75 

76 def proxy_bypass_registry(host): 

77 try: 

78 import winreg 

79 except ImportError: 

80 return False 

81 

82 try: 

83 internetSettings = winreg.OpenKey( 

84 winreg.HKEY_CURRENT_USER, 

85 r"Software\Microsoft\Windows\CurrentVersion\Internet Settings", 

86 ) 

87 # ProxyEnable could be REG_SZ or REG_DWORD, normalizing it 

88 proxyEnable = int(winreg.QueryValueEx(internetSettings, "ProxyEnable")[0]) 

89 # ProxyOverride is almost always a string 

90 proxyOverride = winreg.QueryValueEx(internetSettings, "ProxyOverride")[0] 

91 except (OSError, ValueError): 

92 return False 

93 if not proxyEnable or not proxyOverride: 

94 return False 

95 

96 # make a check value list from the registry entry: replace the 

97 # '<local>' string by the localhost entry and the corresponding 

98 # canonical entry. 

99 proxyOverride = proxyOverride.split(";") 

100 # filter out empty strings to avoid re.match return true in the following code. 

101 proxyOverride = filter(None, proxyOverride) 

102 # now check if we match one of the registry values. 

103 for test in proxyOverride: 

104 if test == "<local>": 

105 if "." not in host: 

106 return True 

107 test = test.replace(".", r"\.") # mask dots 

108 test = test.replace("*", r".*") # change glob sequence 

109 test = test.replace("?", r".") # change glob char 

110 if re.match(test, host, re.I): 

111 return True 

112 return False 

113 

114 def proxy_bypass(host): # noqa 

115 """Return True, if the host should be bypassed. 

116 

117 Checks proxy settings gathered from the environment, if specified, 

118 or the registry. 

119 """ 

120 if getproxies_environment(): 

121 return proxy_bypass_environment(host) 

122 else: 

123 return proxy_bypass_registry(host) 

124 

125 

126def dict_to_sequence(d): 

127 """Returns an internal sequence dictionary update.""" 

128 

129 if hasattr(d, "items"): 

130 d = d.items() 

131 

132 return d 

133 

134 

135def super_len(o): 

136 total_length = None 

137 current_position = 0 

138 

139 if not is_urllib3_1 and isinstance(o, str): 

140 # urllib3 2.x+ treats all strings as utf-8 instead 

141 # of latin-1 (iso-8859-1) like http.client. 

142 o = o.encode("utf-8") 

143 

144 if hasattr(o, "__len__"): 

145 total_length = len(o) 

146 

147 elif hasattr(o, "len"): 

148 total_length = o.len 

149 

150 elif hasattr(o, "fileno"): 

151 try: 

152 fileno = o.fileno() 

153 except (io.UnsupportedOperation, AttributeError): 

154 # AttributeError is a surprising exception, seeing as how we've just checked 

155 # that `hasattr(o, 'fileno')`. It happens for objects obtained via 

156 # `Tarfile.extractfile()`, per issue 5229. 

157 pass 

158 else: 

159 total_length = os.fstat(fileno).st_size 

160 

161 # Having used fstat to determine the file length, we need to 

162 # confirm that this file was opened up in binary mode. 

163 if "b" not in o.mode: 

164 warnings.warn( 

165 ( 

166 "Requests has determined the content-length for this " 

167 "request using the binary size of the file: however, the " 

168 "file has been opened in text mode (i.e. without the 'b' " 

169 "flag in the mode). This may lead to an incorrect " 

170 "content-length. In Requests 3.0, support will be removed " 

171 "for files in text mode." 

172 ), 

173 FileModeWarning, 

174 ) 

175 

176 if hasattr(o, "tell"): 

177 try: 

178 current_position = o.tell() 

179 except OSError: 

180 # This can happen in some weird situations, such as when the file 

181 # is actually a special file descriptor like stdin. In this 

182 # instance, we don't know what the length is, so set it to zero and 

183 # let requests chunk it instead. 

184 if total_length is not None: 

185 current_position = total_length 

186 else: 

187 if hasattr(o, "seek") and total_length is None: 

188 # StringIO and BytesIO have seek but no usable fileno 

189 try: 

190 # seek to end of file 

191 o.seek(0, 2) 

192 total_length = o.tell() 

193 

194 # seek back to current position to support 

195 # partially read file-like objects 

196 o.seek(current_position or 0) 

197 except OSError: 

198 total_length = 0 

199 

200 if total_length is None: 

201 total_length = 0 

202 

203 return max(0, total_length - current_position) 

204 

205 

206def get_netrc_auth(url, raise_errors=False): 

207 """Returns the Requests tuple auth for a given url from netrc.""" 

208 

209 netrc_file = os.environ.get("NETRC") 

210 if netrc_file is not None: 

211 netrc_locations = (netrc_file,) 

212 else: 

213 netrc_locations = (f"~/{f}" for f in NETRC_FILES) 

214 

215 try: 

216 from netrc import NetrcParseError, netrc 

217 

218 netrc_path = None 

219 

220 for f in netrc_locations: 

221 loc = os.path.expanduser(f) 

222 if os.path.exists(loc): 

223 netrc_path = loc 

224 break 

225 

226 # Abort early if there isn't one. 

227 if netrc_path is None: 

228 return 

229 

230 ri = urlparse(url) 

231 host = ri.hostname 

232 

233 try: 

234 _netrc = netrc(netrc_path).authenticators(host) 

235 if _netrc and any(_netrc): 

236 # Return with login / password 

237 login_i = 0 if _netrc[0] else 1 

238 return (_netrc[login_i], _netrc[2]) 

239 except (NetrcParseError, OSError): 

240 # If there was a parsing error or a permissions issue reading the file, 

241 # we'll just skip netrc auth unless explicitly asked to raise errors. 

242 if raise_errors: 

243 raise 

244 

245 # App Engine hackiness. 

246 except (ImportError, AttributeError): 

247 pass 

248 

249 

250def guess_filename(obj): 

251 """Tries to guess the filename of the given object.""" 

252 name = getattr(obj, "name", None) 

253 if name and isinstance(name, basestring) and name[0] != "<" and name[-1] != ">": 

254 return os.path.basename(name) 

255 

256 

257def extract_zipped_paths(path): 

258 """Replace nonexistent paths that look like they refer to a member of a zip 

259 archive with the location of an extracted copy of the target, or else 

260 just return the provided path unchanged. 

261 """ 

262 if os.path.exists(path): 

263 # this is already a valid path, no need to do anything further 

264 return path 

265 

266 # find the first valid part of the provided path and treat that as a zip archive 

267 # assume the rest of the path is the name of a member in the archive 

268 archive, member = os.path.split(path) 

269 while archive and not os.path.exists(archive): 

270 archive, prefix = os.path.split(archive) 

271 if not prefix: 

272 # If we don't check for an empty prefix after the split (in other words, archive remains unchanged after the split), 

273 # we _can_ end up in an infinite loop on a rare corner case affecting a small number of users 

274 break 

275 member = "/".join([prefix, member]) 

276 

277 if not zipfile.is_zipfile(archive): 

278 return path 

279 

280 zip_file = zipfile.ZipFile(archive) 

281 if member not in zip_file.namelist(): 

282 return path 

283 

284 # we have a valid zip archive and a valid member of that archive 

285 suffix = os.path.splitext(member.split("/")[-1])[-1] 

286 fd, extracted_path = tempfile.mkstemp(suffix=suffix) 

287 try: 

288 os.write(fd, zip_file.read(member)) 

289 finally: 

290 os.close(fd) 

291 

292 return extracted_path 

293 

294 

295@contextlib.contextmanager 

296def atomic_open(filename): 

297 """Write a file to the disk in an atomic fashion""" 

298 tmp_descriptor, tmp_name = tempfile.mkstemp(dir=os.path.dirname(filename)) 

299 try: 

300 with os.fdopen(tmp_descriptor, "wb") as tmp_handler: 

301 yield tmp_handler 

302 os.replace(tmp_name, filename) 

303 except BaseException: 

304 os.remove(tmp_name) 

305 raise 

306 

307 

308def from_key_val_list(value): 

309 """Take an object and test to see if it can be represented as a 

310 dictionary. Unless it can not be represented as such, return an 

311 OrderedDict, e.g., 

312 

313 :: 

314 

315 >>> from_key_val_list([('key', 'val')]) 

316 OrderedDict([('key', 'val')]) 

317 >>> from_key_val_list('string') 

318 Traceback (most recent call last): 

319 ... 

320 ValueError: cannot encode objects that are not 2-tuples 

321 >>> from_key_val_list({'key': 'val'}) 

322 OrderedDict([('key', 'val')]) 

323 

324 :rtype: OrderedDict 

325 """ 

326 if value is None: 

327 return None 

328 

329 if isinstance(value, (str, bytes, bool, int)): 

330 raise ValueError("cannot encode objects that are not 2-tuples") 

331 

332 return OrderedDict(value) 

333 

334 

335def to_key_val_list(value): 

336 """Take an object and test to see if it can be represented as a 

337 dictionary. If it can be, return a list of tuples, e.g., 

338 

339 :: 

340 

341 >>> to_key_val_list([('key', 'val')]) 

342 [('key', 'val')] 

343 >>> to_key_val_list({'key': 'val'}) 

344 [('key', 'val')] 

345 >>> to_key_val_list('string') 

346 Traceback (most recent call last): 

347 ... 

348 ValueError: cannot encode objects that are not 2-tuples 

349 

350 :rtype: list 

351 """ 

352 if value is None: 

353 return None 

354 

355 if isinstance(value, (str, bytes, bool, int)): 

356 raise ValueError("cannot encode objects that are not 2-tuples") 

357 

358 if isinstance(value, Mapping): 

359 value = value.items() 

360 

361 return list(value) 

362 

363 

364# From mitsuhiko/werkzeug (used with permission). 

365def parse_list_header(value): 

366 """Parse lists as described by RFC 2068 Section 2. 

367 

368 In particular, parse comma-separated lists where the elements of 

369 the list may include quoted-strings. A quoted-string could 

370 contain a comma. A non-quoted string could have quotes in the 

371 middle. Quotes are removed automatically after parsing. 

372 

373 It basically works like :func:`parse_set_header` just that items 

374 may appear multiple times and case sensitivity is preserved. 

375 

376 The return value is a standard :class:`list`: 

377 

378 >>> parse_list_header('token, "quoted value"') 

379 ['token', 'quoted value'] 

380 

381 To create a header from the :class:`list` again, use the 

382 :func:`dump_header` function. 

383 

384 :param value: a string with a list header. 

385 :return: :class:`list` 

386 :rtype: list 

387 """ 

388 result = [] 

389 for item in _parse_list_header(value): 

390 if item[:1] == item[-1:] == '"': 

391 item = unquote_header_value(item[1:-1]) 

392 result.append(item) 

393 return result 

394 

395 

396# From mitsuhiko/werkzeug (used with permission). 

397def parse_dict_header(value): 

398 """Parse lists of key, value pairs as described by RFC 2068 Section 2 and 

399 convert them into a python dict: 

400 

401 >>> d = parse_dict_header('foo="is a fish", bar="as well"') 

402 >>> type(d) is dict 

403 True 

404 >>> sorted(d.items()) 

405 [('bar', 'as well'), ('foo', 'is a fish')] 

406 

407 If there is no value for a key it will be `None`: 

408 

409 >>> parse_dict_header('key_without_value') 

410 {'key_without_value': None} 

411 

412 To create a header from the :class:`dict` again, use the 

413 :func:`dump_header` function. 

414 

415 :param value: a string with a dict header. 

416 :return: :class:`dict` 

417 :rtype: dict 

418 """ 

419 result = {} 

420 for item in _parse_list_header(value): 

421 if "=" not in item: 

422 result[item] = None 

423 continue 

424 name, value = item.split("=", 1) 

425 if value[:1] == value[-1:] == '"': 

426 value = unquote_header_value(value[1:-1]) 

427 result[name] = value 

428 return result 

429 

430 

431# From mitsuhiko/werkzeug (used with permission). 

432def unquote_header_value(value, is_filename=False): 

433 r"""Unquotes a header value. (Reversal of :func:`quote_header_value`). 

434 This does not use the real unquoting but what browsers are actually 

435 using for quoting. 

436 

437 :param value: the header value to unquote. 

438 :rtype: str 

439 """ 

440 if value and value[0] == value[-1] == '"': 

441 # this is not the real unquoting, but fixing this so that the 

442 # RFC is met will result in bugs with internet explorer and 

443 # probably some other browsers as well. IE for example is 

444 # uploading files with "C:\foo\bar.txt" as filename 

445 value = value[1:-1] 

446 

447 # if this is a filename and the starting characters look like 

448 # a UNC path, then just return the value without quotes. Using the 

449 # replace sequence below on a UNC path has the effect of turning 

450 # the leading double slash into a single slash and then 

451 # _fix_ie_filename() doesn't work correctly. See #458. 

452 if not is_filename or value[:2] != "\\\\": 

453 return value.replace("\\\\", "\\").replace('\\"', '"') 

454 return value 

455 

456 

457def dict_from_cookiejar(cj): 

458 """Returns a key/value dictionary from a CookieJar. 

459 

460 :param cj: CookieJar object to extract cookies from. 

461 :rtype: dict 

462 """ 

463 

464 cookie_dict = {cookie.name: cookie.value for cookie in cj} 

465 return cookie_dict 

466 

467 

468def add_dict_to_cookiejar(cj, cookie_dict): 

469 """Returns a CookieJar from a key/value dictionary. 

470 

471 :param cj: CookieJar to insert cookies into. 

472 :param cookie_dict: Dict of key/values to insert into CookieJar. 

473 :rtype: CookieJar 

474 """ 

475 

476 return cookiejar_from_dict(cookie_dict, cj) 

477 

478 

479def get_encodings_from_content(content): 

480 """Returns encodings from given content string. 

481 

482 :param content: bytestring to extract encodings from. 

483 """ 

484 warnings.warn( 

485 ( 

486 "In requests 3.0, get_encodings_from_content will be removed. For " 

487 "more information, please see the discussion on issue #2266. (This" 

488 " warning should only appear once.)" 

489 ), 

490 DeprecationWarning, 

491 ) 

492 

493 charset_re = re.compile(r'<meta.*?charset=["\']*(.+?)["\'>]', flags=re.I) 

494 pragma_re = re.compile(r'<meta.*?content=["\']*;?charset=(.+?)["\'>]', flags=re.I) 

495 xml_re = re.compile(r'^<\?xml.*?encoding=["\']*(.+?)["\'>]') 

496 

497 return ( 

498 charset_re.findall(content) 

499 + pragma_re.findall(content) 

500 + xml_re.findall(content) 

501 ) 

502 

503 

504def _parse_content_type_header(header): 

505 """Returns content type and parameters from given header. 

506 

507 :param header: string 

508 :return: tuple containing content type and dictionary of 

509 parameters. 

510 """ 

511 

512 tokens = header.split(";") 

513 content_type, params = tokens[0].strip(), tokens[1:] 

514 params_dict = {} 

515 strip_chars = "\"' " 

516 

517 for param in params: 

518 param = param.strip() 

519 if param and (idx := param.find("=")) != -1: 

520 key = param[:idx].strip(strip_chars) 

521 value = param[idx + 1 :].strip(strip_chars) 

522 params_dict[key.lower()] = value 

523 return content_type, params_dict 

524 

525 

526def get_encoding_from_headers(headers): 

527 """Returns encodings from given HTTP Header Dict. 

528 

529 :param headers: dictionary to extract encoding from. 

530 :rtype: str 

531 """ 

532 

533 content_type = headers.get("content-type") 

534 

535 if not content_type: 

536 return None 

537 

538 content_type, params = _parse_content_type_header(content_type) 

539 

540 if "charset" in params: 

541 return params["charset"].strip("'\"") 

542 

543 if "text" in content_type: 

544 return "ISO-8859-1" 

545 

546 if "application/json" in content_type: 

547 # Assume UTF-8 based on RFC 4627: https://www.ietf.org/rfc/rfc4627.txt since the charset was unset 

548 return "utf-8" 

549 

550 

551def stream_decode_response_unicode(iterator, r): 

552 """Stream decodes an iterator.""" 

553 

554 if r.encoding is None: 

555 yield from iterator 

556 return 

557 

558 decoder = codecs.getincrementaldecoder(r.encoding)(errors="replace") 

559 for chunk in iterator: 

560 rv = decoder.decode(chunk) 

561 if rv: 

562 yield rv 

563 rv = decoder.decode(b"", final=True) 

564 if rv: 

565 yield rv 

566 

567 

568def iter_slices(string, slice_length): 

569 """Iterate over slices of a string.""" 

570 pos = 0 

571 if slice_length is None or slice_length <= 0: 

572 slice_length = len(string) 

573 while pos < len(string): 

574 yield string[pos : pos + slice_length] 

575 pos += slice_length 

576 

577 

578def get_unicode_from_response(r): 

579 """Returns the requested content back in unicode. 

580 

581 :param r: Response object to get unicode content from. 

582 

583 Tried: 

584 

585 1. charset from content-type 

586 2. fall back and replace all unicode characters 

587 

588 :rtype: str 

589 """ 

590 warnings.warn( 

591 ( 

592 "In requests 3.0, get_unicode_from_response will be removed. For " 

593 "more information, please see the discussion on issue #2266. (This" 

594 " warning should only appear once.)" 

595 ), 

596 DeprecationWarning, 

597 ) 

598 

599 tried_encodings = [] 

600 

601 # Try charset from content-type 

602 encoding = get_encoding_from_headers(r.headers) 

603 

604 if encoding: 

605 try: 

606 return str(r.content, encoding) 

607 except UnicodeError: 

608 tried_encodings.append(encoding) 

609 

610 # Fall back: 

611 try: 

612 return str(r.content, encoding, errors="replace") 

613 except TypeError: 

614 return r.content 

615 

616 

617# The unreserved URI characters (RFC 3986) 

618UNRESERVED_SET = frozenset( 

619 "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" + "0123456789-._~" 

620) 

621 

622 

623def unquote_unreserved(uri): 

624 """Un-escape any percent-escape sequences in a URI that are unreserved 

625 characters. This leaves all reserved, illegal and non-ASCII bytes encoded. 

626 

627 :rtype: str 

628 """ 

629 parts = uri.split("%") 

630 for i in range(1, len(parts)): 

631 h = parts[i][0:2] 

632 if len(h) == 2 and h.isalnum(): 

633 try: 

634 c = chr(int(h, 16)) 

635 except ValueError: 

636 raise InvalidURL(f"Invalid percent-escape sequence: '{h}'") 

637 

638 if c in UNRESERVED_SET: 

639 parts[i] = c + parts[i][2:] 

640 else: 

641 parts[i] = f"%{parts[i]}" 

642 else: 

643 parts[i] = f"%{parts[i]}" 

644 return "".join(parts) 

645 

646 

647def requote_uri(uri): 

648 """Re-quote the given URI. 

649 

650 This function passes the given URI through an unquote/quote cycle to 

651 ensure that it is fully and consistently quoted. 

652 

653 :rtype: str 

654 """ 

655 safe_with_percent = "!#$%&'()*+,/:;=?@[]~" 

656 safe_without_percent = "!#$&'()*+,/:;=?@[]~" 

657 try: 

658 # Unquote only the unreserved characters 

659 # Then quote only illegal characters (do not quote reserved, 

660 # unreserved, or '%') 

661 return quote(unquote_unreserved(uri), safe=safe_with_percent) 

662 except InvalidURL: 

663 # We couldn't unquote the given URI, so let's try quoting it, but 

664 # there may be unquoted '%'s in the URI. We need to make sure they're 

665 # properly quoted so they do not cause issues elsewhere. 

666 return quote(uri, safe=safe_without_percent) 

667 

668 

669def address_in_network(ip, net): 

670 """This function allows you to check if an IP belongs to a network subnet 

671 

672 Example: returns True if ip = 192.168.1.1 and net = 192.168.1.0/24 

673 returns False if ip = 192.168.1.1 and net = 192.168.100.0/24 

674 

675 :rtype: bool 

676 """ 

677 ipaddr = struct.unpack("=L", socket.inet_aton(ip))[0] 

678 netaddr, bits = net.split("/") 

679 netmask = struct.unpack("=L", socket.inet_aton(dotted_netmask(int(bits))))[0] 

680 network = struct.unpack("=L", socket.inet_aton(netaddr))[0] & netmask 

681 return (ipaddr & netmask) == (network & netmask) 

682 

683 

684def dotted_netmask(mask): 

685 """Converts mask from /xx format to xxx.xxx.xxx.xxx 

686 

687 Example: if mask is 24 function returns 255.255.255.0 

688 

689 :rtype: str 

690 """ 

691 bits = 0xFFFFFFFF ^ (1 << 32 - mask) - 1 

692 return socket.inet_ntoa(struct.pack(">I", bits)) 

693 

694 

695def is_ipv4_address(string_ip): 

696 """ 

697 :rtype: bool 

698 """ 

699 try: 

700 socket.inet_aton(string_ip) 

701 except OSError: 

702 return False 

703 return True 

704 

705 

706def is_valid_cidr(string_network): 

707 """ 

708 Very simple check of the cidr format in no_proxy variable. 

709 

710 :rtype: bool 

711 """ 

712 if string_network.count("/") == 1: 

713 try: 

714 mask = int(string_network.split("/")[1]) 

715 except ValueError: 

716 return False 

717 

718 if mask < 1 or mask > 32: 

719 return False 

720 

721 try: 

722 socket.inet_aton(string_network.split("/")[0]) 

723 except OSError: 

724 return False 

725 else: 

726 return False 

727 return True 

728 

729 

730@contextlib.contextmanager 

731def set_environ(env_name, value): 

732 """Set the environment variable 'env_name' to 'value' 

733 

734 Save previous value, yield, and then restore the previous value stored in 

735 the environment variable 'env_name'. 

736 

737 If 'value' is None, do nothing""" 

738 value_changed = value is not None 

739 if value_changed: 

740 old_value = os.environ.get(env_name) 

741 os.environ[env_name] = value 

742 try: 

743 yield 

744 finally: 

745 if value_changed: 

746 if old_value is None: 

747 del os.environ[env_name] 

748 else: 

749 os.environ[env_name] = old_value 

750 

751 

752def should_bypass_proxies(url, no_proxy): 

753 """ 

754 Returns whether we should bypass proxies or not. 

755 

756 :rtype: bool 

757 """ 

758 

759 # Prioritize lowercase environment variables over uppercase 

760 # to keep a consistent behaviour with other http projects (curl, wget). 

761 def get_proxy(key): 

762 return os.environ.get(key) or os.environ.get(key.upper()) 

763 

764 # First check whether no_proxy is defined. If it is, check that the URL 

765 # we're getting isn't in the no_proxy list. 

766 no_proxy_arg = no_proxy 

767 if no_proxy is None: 

768 no_proxy = get_proxy("no_proxy") 

769 parsed = urlparse(url) 

770 

771 if parsed.hostname is None: 

772 # URLs don't always have hostnames, e.g. file:/// urls. 

773 return True 

774 

775 if no_proxy: 

776 # We need to check whether we match here. We need to see if we match 

777 # the end of the hostname, both with and without the port. 

778 no_proxy = (host for host in no_proxy.replace(" ", "").split(",") if host) 

779 

780 if is_ipv4_address(parsed.hostname): 

781 for proxy_ip in no_proxy: 

782 if is_valid_cidr(proxy_ip): 

783 if address_in_network(parsed.hostname, proxy_ip): 

784 return True 

785 elif parsed.hostname == proxy_ip: 

786 # If no_proxy ip was defined in plain IP notation instead of cidr notation & 

787 # matches the IP of the index 

788 return True 

789 else: 

790 host_with_port = parsed.hostname 

791 if parsed.port: 

792 host_with_port += f":{parsed.port}" 

793 

794 for host in no_proxy: 

795 if parsed.hostname.endswith(host) or host_with_port.endswith(host): 

796 # The URL does match something in no_proxy, so we don't want 

797 # to apply the proxies on this URL. 

798 return True 

799 

800 with set_environ("no_proxy", no_proxy_arg): 

801 # parsed.hostname can be `None` in cases such as a file URI. 

802 try: 

803 bypass = proxy_bypass(parsed.hostname) 

804 except (TypeError, socket.gaierror): 

805 bypass = False 

806 

807 if bypass: 

808 return True 

809 

810 return False 

811 

812 

813def get_environ_proxies(url, no_proxy=None): 

814 """ 

815 Return a dict of environment proxies. 

816 

817 :rtype: dict 

818 """ 

819 if should_bypass_proxies(url, no_proxy=no_proxy): 

820 return {} 

821 else: 

822 return getproxies() 

823 

824 

825def select_proxy(url, proxies): 

826 """Select a proxy for the url, if applicable. 

827 

828 :param url: The url being for the request 

829 :param proxies: A dictionary of schemes or schemes and hosts to proxy URLs 

830 """ 

831 proxies = proxies or {} 

832 urlparts = urlparse(url) 

833 if urlparts.hostname is None: 

834 return proxies.get(urlparts.scheme, proxies.get("all")) 

835 

836 proxy_keys = [ 

837 urlparts.scheme + "://" + urlparts.hostname, 

838 urlparts.scheme, 

839 "all://" + urlparts.hostname, 

840 "all", 

841 ] 

842 proxy = None 

843 for proxy_key in proxy_keys: 

844 if proxy_key in proxies: 

845 proxy = proxies[proxy_key] 

846 break 

847 

848 return proxy 

849 

850 

851def resolve_proxies(request, proxies, trust_env=True): 

852 """This method takes proxy information from a request and configuration 

853 input to resolve a mapping of target proxies. This will consider settings 

854 such as NO_PROXY to strip proxy configurations. 

855 

856 :param request: Request or PreparedRequest 

857 :param proxies: A dictionary of schemes or schemes and hosts to proxy URLs 

858 :param trust_env: Boolean declaring whether to trust environment configs 

859 

860 :rtype: dict 

861 """ 

862 proxies = proxies if proxies is not None else {} 

863 url = request.url 

864 scheme = urlparse(url).scheme 

865 no_proxy = proxies.get("no_proxy") 

866 new_proxies = proxies.copy() 

867 

868 if trust_env and not should_bypass_proxies(url, no_proxy=no_proxy): 

869 environ_proxies = get_environ_proxies(url, no_proxy=no_proxy) 

870 

871 proxy = environ_proxies.get(scheme, environ_proxies.get("all")) 

872 

873 if proxy: 

874 new_proxies.setdefault(scheme, proxy) 

875 return new_proxies 

876 

877 

878def default_user_agent(name="python-requests"): 

879 """ 

880 Return a string representing the default user agent. 

881 

882 :rtype: str 

883 """ 

884 return f"{name}/{__version__}" 

885 

886 

887def default_headers(): 

888 """ 

889 :rtype: requests.structures.CaseInsensitiveDict 

890 """ 

891 return CaseInsensitiveDict( 

892 { 

893 "User-Agent": default_user_agent(), 

894 "Accept-Encoding": DEFAULT_ACCEPT_ENCODING, 

895 "Accept": "*/*", 

896 "Connection": "keep-alive", 

897 } 

898 ) 

899 

900 

901def parse_header_links(value): 

902 """Return a list of parsed link headers proxies. 

903 

904 i.e. Link: <http:/.../front.jpeg>; rel=front; type="image/jpeg",<http://.../back.jpeg>; rel=back;type="image/jpeg" 

905 

906 :rtype: list 

907 """ 

908 

909 links = [] 

910 

911 replace_chars = " '\"" 

912 

913 value = value.strip(replace_chars) 

914 if not value: 

915 return links 

916 

917 for val in re.split(", *<", value): 

918 try: 

919 url, params = val.split(";", 1) 

920 except ValueError: 

921 url, params = val, "" 

922 

923 link = {"url": url.strip("<> '\"")} 

924 

925 for param in params.split(";"): 

926 try: 

927 key, value = param.split("=") 

928 except ValueError: 

929 break 

930 

931 link[key.strip(replace_chars)] = value.strip(replace_chars) 

932 

933 links.append(link) 

934 

935 return links 

936 

937 

938# Null bytes; no need to recreate these on each call to guess_json_utf 

939_null = "\x00".encode("ascii") # encoding to ASCII for Python 3 

940_null2 = _null * 2 

941_null3 = _null * 3 

942 

943 

944def guess_json_utf(data): 

945 """ 

946 :rtype: str 

947 """ 

948 # JSON always starts with two ASCII characters, so detection is as 

949 # easy as counting the nulls and from their location and count 

950 # determine the encoding. Also detect a BOM, if present. 

951 sample = data[:4] 

952 if sample in (codecs.BOM_UTF32_LE, codecs.BOM_UTF32_BE): 

953 return "utf-32" # BOM included 

954 if sample[:3] == codecs.BOM_UTF8: 

955 return "utf-8-sig" # BOM included, MS style (discouraged) 

956 if sample[:2] in (codecs.BOM_UTF16_LE, codecs.BOM_UTF16_BE): 

957 return "utf-16" # BOM included 

958 nullcount = sample.count(_null) 

959 if nullcount == 0: 

960 return "utf-8" 

961 if nullcount == 2: 

962 if sample[::2] == _null2: # 1st and 3rd are null 

963 return "utf-16-be" 

964 if sample[1::2] == _null2: # 2nd and 4th are null 

965 return "utf-16-le" 

966 # Did not detect 2 valid UTF-16 ascii-range characters 

967 if nullcount == 3: 

968 if sample[:3] == _null3: 

969 return "utf-32-be" 

970 if sample[1:] == _null3: 

971 return "utf-32-le" 

972 # Did not detect a valid UTF-32 ascii-range character 

973 return None 

974 

975 

976def prepend_scheme_if_needed(url, new_scheme): 

977 """Given a URL that may or may not have a scheme, prepend the given scheme. 

978 Does not replace a present scheme with the one provided as an argument. 

979 

980 :rtype: str 

981 """ 

982 parsed = parse_url(url) 

983 scheme, auth, host, port, path, query, fragment = parsed 

984 

985 # A defect in urlparse determines that there isn't a netloc present in some 

986 # urls. We previously assumed parsing was overly cautious, and swapped the 

987 # netloc and path. Due to a lack of tests on the original defect, this is 

988 # maintained with parse_url for backwards compatibility. 

989 netloc = parsed.netloc 

990 if not netloc: 

991 netloc, path = path, netloc 

992 

993 if auth: 

994 # parse_url doesn't provide the netloc with auth 

995 # so we'll add it ourselves. 

996 netloc = "@".join([auth, netloc]) 

997 if scheme is None: 

998 scheme = new_scheme 

999 if path is None: 

1000 path = "" 

1001 

1002 return urlunparse((scheme, netloc, path, "", query, fragment)) 

1003 

1004 

1005def get_auth_from_url(url): 

1006 """Given a url with authentication components, extract them into a tuple of 

1007 username,password. 

1008 

1009 :rtype: (str,str) 

1010 """ 

1011 parsed = urlparse(url) 

1012 

1013 try: 

1014 auth = (unquote(parsed.username), unquote(parsed.password)) 

1015 except (AttributeError, TypeError): 

1016 auth = ("", "") 

1017 

1018 return auth 

1019 

1020 

1021def check_header_validity(header): 

1022 """Verifies that header parts don't contain leading whitespace 

1023 reserved characters, or return characters. 

1024 

1025 :param header: tuple, in the format (name, value). 

1026 """ 

1027 name, value = header 

1028 _validate_header_part(header, name, 0) 

1029 _validate_header_part(header, value, 1) 

1030 

1031 

1032def _validate_header_part(header, header_part, header_validator_index): 

1033 if isinstance(header_part, str): 

1034 validator = _HEADER_VALIDATORS_STR[header_validator_index] 

1035 elif isinstance(header_part, bytes): 

1036 validator = _HEADER_VALIDATORS_BYTE[header_validator_index] 

1037 else: 

1038 raise InvalidHeader( 

1039 f"Header part ({header_part!r}) from {header} " 

1040 f"must be of type str or bytes, not {type(header_part)}" 

1041 ) 

1042 

1043 if not validator.match(header_part): 

1044 header_kind = "name" if header_validator_index == 0 else "value" 

1045 raise InvalidHeader( 

1046 f"Invalid leading whitespace, reserved character(s), or return " 

1047 f"character(s) in header {header_kind}: {header_part!r}" 

1048 ) 

1049 

1050 

1051def urldefragauth(url): 

1052 """ 

1053 Given a url remove the fragment and the authentication part. 

1054 

1055 :rtype: str 

1056 """ 

1057 scheme, netloc, path, params, query, fragment = urlparse(url) 

1058 

1059 # see func:`prepend_scheme_if_needed` 

1060 if not netloc: 

1061 netloc, path = path, netloc 

1062 

1063 netloc = netloc.rsplit("@", 1)[-1] 

1064 

1065 return urlunparse((scheme, netloc, path, params, query, "")) 

1066 

1067 

1068def rewind_body(prepared_request): 

1069 """Move file pointer back to its recorded starting position 

1070 so it can be read again on redirect. 

1071 """ 

1072 body_seek = getattr(prepared_request.body, "seek", None) 

1073 if body_seek is not None and isinstance( 

1074 prepared_request._body_position, integer_types 

1075 ): 

1076 try: 

1077 body_seek(prepared_request._body_position) 

1078 except OSError: 

1079 raise UnrewindableBodyError( 

1080 "An error occurred when rewinding request body for redirect." 

1081 ) 

1082 else: 

1083 raise UnrewindableBodyError("Unable to rewind request body for redirect.")