Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pip/_vendor/requests/utils.py: 16%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

481 statements  

1""" 

2requests.utils 

3~~~~~~~~~~~~~~ 

4 

5This module provides utility functions that are used within Requests 

6that are also useful for external consumption. 

7""" 

8 

9import codecs 

10import contextlib 

11import io 

12import os 

13import re 

14import socket 

15import struct 

16import sys 

17import tempfile 

18import warnings 

19import zipfile 

20from collections import OrderedDict 

21 

22from pip._vendor.urllib3.util import make_headers, parse_url 

23 

24from . import certs 

25from .__version__ import __version__ 

26 

27# to_native_string is unused here, but imported here for backwards compatibility 

28from ._internal_utils import ( # noqa: F401 

29 _HEADER_VALIDATORS_BYTE, 

30 _HEADER_VALIDATORS_STR, 

31 HEADER_VALIDATORS, 

32 to_native_string, 

33) 

34from .compat import ( 

35 Mapping, 

36 basestring, 

37 bytes, 

38 getproxies, 

39 getproxies_environment, 

40 integer_types, 

41 is_urllib3_1, 

42) 

43from .compat import parse_http_list as _parse_list_header 

44from .compat import ( 

45 proxy_bypass, 

46 proxy_bypass_environment, 

47 quote, 

48 str, 

49 unquote, 

50 urlparse, 

51 urlunparse, 

52) 

53from .cookies import cookiejar_from_dict 

54from .exceptions import ( 

55 FileModeWarning, 

56 InvalidHeader, 

57 InvalidURL, 

58 UnrewindableBodyError, 

59) 

60from .structures import CaseInsensitiveDict 

61 

62NETRC_FILES = (".netrc", "_netrc") 

63 

64DEFAULT_CA_BUNDLE_PATH = certs.where() 

65 

66DEFAULT_PORTS = {"http": 80, "https": 443} 

67 

68# Ensure that ', ' is used to preserve previous delimiter behavior. 

69DEFAULT_ACCEPT_ENCODING = ", ".join( 

70 re.split(r",\s*", make_headers(accept_encoding=True)["accept-encoding"]) 

71) 

72 

73 

74if sys.platform == "win32": 

75 # provide a proxy_bypass version on Windows without DNS lookups 

76 

77 def proxy_bypass_registry(host): 

78 try: 

79 import winreg 

80 except ImportError: 

81 return False 

82 

83 try: 

84 internetSettings = winreg.OpenKey( 

85 winreg.HKEY_CURRENT_USER, 

86 r"Software\Microsoft\Windows\CurrentVersion\Internet Settings", 

87 ) 

88 # ProxyEnable could be REG_SZ or REG_DWORD, normalizing it 

89 proxyEnable = int(winreg.QueryValueEx(internetSettings, "ProxyEnable")[0]) 

90 # ProxyOverride is almost always a string 

91 proxyOverride = winreg.QueryValueEx(internetSettings, "ProxyOverride")[0] 

92 except (OSError, ValueError): 

93 return False 

94 if not proxyEnable or not proxyOverride: 

95 return False 

96 

97 # make a check value list from the registry entry: replace the 

98 # '<local>' string by the localhost entry and the corresponding 

99 # canonical entry. 

100 proxyOverride = proxyOverride.split(";") 

101 # filter out empty strings to avoid re.match return true in the following code. 

102 proxyOverride = filter(None, proxyOverride) 

103 # now check if we match one of the registry values. 

104 for test in proxyOverride: 

105 if test == "<local>": 

106 if "." not in host: 

107 return True 

108 test = test.replace(".", r"\.") # mask dots 

109 test = test.replace("*", r".*") # change glob sequence 

110 test = test.replace("?", r".") # change glob char 

111 if re.match(test, host, re.I): 

112 return True 

113 return False 

114 

115 def proxy_bypass(host): # noqa 

116 """Return True, if the host should be bypassed. 

117 

118 Checks proxy settings gathered from the environment, if specified, 

119 or the registry. 

120 """ 

121 if getproxies_environment(): 

122 return proxy_bypass_environment(host) 

123 else: 

124 return proxy_bypass_registry(host) 

125 

126 

127def dict_to_sequence(d): 

128 """Returns an internal sequence dictionary update.""" 

129 

130 if hasattr(d, "items"): 

131 d = d.items() 

132 

133 return d 

134 

135 

136def super_len(o): 

137 total_length = None 

138 current_position = 0 

139 

140 if not is_urllib3_1 and isinstance(o, str): 

141 # urllib3 2.x+ treats all strings as utf-8 instead 

142 # of latin-1 (iso-8859-1) like http.client. 

143 o = o.encode("utf-8") 

144 

145 if hasattr(o, "__len__"): 

146 total_length = len(o) 

147 

148 elif hasattr(o, "len"): 

149 total_length = o.len 

150 

151 elif hasattr(o, "fileno"): 

152 try: 

153 fileno = o.fileno() 

154 except (io.UnsupportedOperation, AttributeError): 

155 # AttributeError is a surprising exception, seeing as how we've just checked 

156 # that `hasattr(o, 'fileno')`. It happens for objects obtained via 

157 # `Tarfile.extractfile()`, per issue 5229. 

158 pass 

159 else: 

160 total_length = os.fstat(fileno).st_size 

161 

162 # Having used fstat to determine the file length, we need to 

163 # confirm that this file was opened up in binary mode. 

164 if "b" not in o.mode: 

165 warnings.warn( 

166 ( 

167 "Requests has determined the content-length for this " 

168 "request using the binary size of the file: however, the " 

169 "file has been opened in text mode (i.e. without the 'b' " 

170 "flag in the mode). This may lead to an incorrect " 

171 "content-length. In Requests 3.0, support will be removed " 

172 "for files in text mode." 

173 ), 

174 FileModeWarning, 

175 ) 

176 

177 if hasattr(o, "tell"): 

178 try: 

179 current_position = o.tell() 

180 except OSError: 

181 # This can happen in some weird situations, such as when the file 

182 # is actually a special file descriptor like stdin. In this 

183 # instance, we don't know what the length is, so set it to zero and 

184 # let requests chunk it instead. 

185 if total_length is not None: 

186 current_position = total_length 

187 else: 

188 if hasattr(o, "seek") and total_length is None: 

189 # StringIO and BytesIO have seek but no usable fileno 

190 try: 

191 # seek to end of file 

192 o.seek(0, 2) 

193 total_length = o.tell() 

194 

195 # seek back to current position to support 

196 # partially read file-like objects 

197 o.seek(current_position or 0) 

198 except OSError: 

199 total_length = 0 

200 

201 if total_length is None: 

202 total_length = 0 

203 

204 return max(0, total_length - current_position) 

205 

206 

207def get_netrc_auth(url, raise_errors=False): 

208 """Returns the Requests tuple auth for a given url from netrc.""" 

209 

210 netrc_file = os.environ.get("NETRC") 

211 if netrc_file is not None: 

212 netrc_locations = (netrc_file,) 

213 else: 

214 netrc_locations = (f"~/{f}" for f in NETRC_FILES) 

215 

216 try: 

217 from netrc import NetrcParseError, netrc 

218 

219 netrc_path = None 

220 

221 for f in netrc_locations: 

222 loc = os.path.expanduser(f) 

223 if os.path.exists(loc): 

224 netrc_path = loc 

225 break 

226 

227 # Abort early if there isn't one. 

228 if netrc_path is None: 

229 return 

230 

231 ri = urlparse(url) 

232 host = ri.hostname 

233 

234 try: 

235 _netrc = netrc(netrc_path).authenticators(host) 

236 if _netrc: 

237 # Return with login / password 

238 login_i = 0 if _netrc[0] else 1 

239 return (_netrc[login_i], _netrc[2]) 

240 except (NetrcParseError, OSError): 

241 # If there was a parsing error or a permissions issue reading the file, 

242 # we'll just skip netrc auth unless explicitly asked to raise errors. 

243 if raise_errors: 

244 raise 

245 

246 # App Engine hackiness. 

247 except (ImportError, AttributeError): 

248 pass 

249 

250 

251def guess_filename(obj): 

252 """Tries to guess the filename of the given object.""" 

253 name = getattr(obj, "name", None) 

254 if name and isinstance(name, basestring) and name[0] != "<" and name[-1] != ">": 

255 return os.path.basename(name) 

256 

257 

258def extract_zipped_paths(path): 

259 """Replace nonexistent paths that look like they refer to a member of a zip 

260 archive with the location of an extracted copy of the target, or else 

261 just return the provided path unchanged. 

262 """ 

263 if os.path.exists(path): 

264 # this is already a valid path, no need to do anything further 

265 return path 

266 

267 # find the first valid part of the provided path and treat that as a zip archive 

268 # assume the rest of the path is the name of a member in the archive 

269 archive, member = os.path.split(path) 

270 while archive and not os.path.exists(archive): 

271 archive, prefix = os.path.split(archive) 

272 if not prefix: 

273 # If we don't check for an empty prefix after the split (in other words, archive remains unchanged after the split), 

274 # we _can_ end up in an infinite loop on a rare corner case affecting a small number of users 

275 break 

276 member = "/".join([prefix, member]) 

277 

278 if not zipfile.is_zipfile(archive): 

279 return path 

280 

281 zip_file = zipfile.ZipFile(archive) 

282 if member not in zip_file.namelist(): 

283 return path 

284 

285 # we have a valid zip archive and a valid member of that archive 

286 tmp = tempfile.gettempdir() 

287 extracted_path = os.path.join(tmp, member.split("/")[-1]) 

288 if not os.path.exists(extracted_path): 

289 # use read + write to avoid the creating nested folders, we only want the file, avoids mkdir racing condition 

290 with atomic_open(extracted_path) as file_handler: 

291 file_handler.write(zip_file.read(member)) 

292 return extracted_path 

293 

294 

295@contextlib.contextmanager 

296def atomic_open(filename): 

297 """Write a file to the disk in an atomic fashion""" 

298 tmp_descriptor, tmp_name = tempfile.mkstemp(dir=os.path.dirname(filename)) 

299 try: 

300 with os.fdopen(tmp_descriptor, "wb") as tmp_handler: 

301 yield tmp_handler 

302 os.replace(tmp_name, filename) 

303 except BaseException: 

304 os.remove(tmp_name) 

305 raise 

306 

307 

308def from_key_val_list(value): 

309 """Take an object and test to see if it can be represented as a 

310 dictionary. Unless it can not be represented as such, return an 

311 OrderedDict, e.g., 

312 

313 :: 

314 

315 >>> from_key_val_list([('key', 'val')]) 

316 OrderedDict([('key', 'val')]) 

317 >>> from_key_val_list('string') 

318 Traceback (most recent call last): 

319 ... 

320 ValueError: cannot encode objects that are not 2-tuples 

321 >>> from_key_val_list({'key': 'val'}) 

322 OrderedDict([('key', 'val')]) 

323 

324 :rtype: OrderedDict 

325 """ 

326 if value is None: 

327 return None 

328 

329 if isinstance(value, (str, bytes, bool, int)): 

330 raise ValueError("cannot encode objects that are not 2-tuples") 

331 

332 return OrderedDict(value) 

333 

334 

335def to_key_val_list(value): 

336 """Take an object and test to see if it can be represented as a 

337 dictionary. If it can be, return a list of tuples, e.g., 

338 

339 :: 

340 

341 >>> to_key_val_list([('key', 'val')]) 

342 [('key', 'val')] 

343 >>> to_key_val_list({'key': 'val'}) 

344 [('key', 'val')] 

345 >>> to_key_val_list('string') 

346 Traceback (most recent call last): 

347 ... 

348 ValueError: cannot encode objects that are not 2-tuples 

349 

350 :rtype: list 

351 """ 

352 if value is None: 

353 return None 

354 

355 if isinstance(value, (str, bytes, bool, int)): 

356 raise ValueError("cannot encode objects that are not 2-tuples") 

357 

358 if isinstance(value, Mapping): 

359 value = value.items() 

360 

361 return list(value) 

362 

363 

364# From mitsuhiko/werkzeug (used with permission). 

365def parse_list_header(value): 

366 """Parse lists as described by RFC 2068 Section 2. 

367 

368 In particular, parse comma-separated lists where the elements of 

369 the list may include quoted-strings. A quoted-string could 

370 contain a comma. A non-quoted string could have quotes in the 

371 middle. Quotes are removed automatically after parsing. 

372 

373 It basically works like :func:`parse_set_header` just that items 

374 may appear multiple times and case sensitivity is preserved. 

375 

376 The return value is a standard :class:`list`: 

377 

378 >>> parse_list_header('token, "quoted value"') 

379 ['token', 'quoted value'] 

380 

381 To create a header from the :class:`list` again, use the 

382 :func:`dump_header` function. 

383 

384 :param value: a string with a list header. 

385 :return: :class:`list` 

386 :rtype: list 

387 """ 

388 result = [] 

389 for item in _parse_list_header(value): 

390 if item[:1] == item[-1:] == '"': 

391 item = unquote_header_value(item[1:-1]) 

392 result.append(item) 

393 return result 

394 

395 

396# From mitsuhiko/werkzeug (used with permission). 

397def parse_dict_header(value): 

398 """Parse lists of key, value pairs as described by RFC 2068 Section 2 and 

399 convert them into a python dict: 

400 

401 >>> d = parse_dict_header('foo="is a fish", bar="as well"') 

402 >>> type(d) is dict 

403 True 

404 >>> sorted(d.items()) 

405 [('bar', 'as well'), ('foo', 'is a fish')] 

406 

407 If there is no value for a key it will be `None`: 

408 

409 >>> parse_dict_header('key_without_value') 

410 {'key_without_value': None} 

411 

412 To create a header from the :class:`dict` again, use the 

413 :func:`dump_header` function. 

414 

415 :param value: a string with a dict header. 

416 :return: :class:`dict` 

417 :rtype: dict 

418 """ 

419 result = {} 

420 for item in _parse_list_header(value): 

421 if "=" not in item: 

422 result[item] = None 

423 continue 

424 name, value = item.split("=", 1) 

425 if value[:1] == value[-1:] == '"': 

426 value = unquote_header_value(value[1:-1]) 

427 result[name] = value 

428 return result 

429 

430 

431# From mitsuhiko/werkzeug (used with permission). 

432def unquote_header_value(value, is_filename=False): 

433 r"""Unquotes a header value. (Reversal of :func:`quote_header_value`). 

434 This does not use the real unquoting but what browsers are actually 

435 using for quoting. 

436 

437 :param value: the header value to unquote. 

438 :rtype: str 

439 """ 

440 if value and value[0] == value[-1] == '"': 

441 # this is not the real unquoting, but fixing this so that the 

442 # RFC is met will result in bugs with internet explorer and 

443 # probably some other browsers as well. IE for example is 

444 # uploading files with "C:\foo\bar.txt" as filename 

445 value = value[1:-1] 

446 

447 # if this is a filename and the starting characters look like 

448 # a UNC path, then just return the value without quotes. Using the 

449 # replace sequence below on a UNC path has the effect of turning 

450 # the leading double slash into a single slash and then 

451 # _fix_ie_filename() doesn't work correctly. See #458. 

452 if not is_filename or value[:2] != "\\\\": 

453 return value.replace("\\\\", "\\").replace('\\"', '"') 

454 return value 

455 

456 

457def dict_from_cookiejar(cj): 

458 """Returns a key/value dictionary from a CookieJar. 

459 

460 :param cj: CookieJar object to extract cookies from. 

461 :rtype: dict 

462 """ 

463 

464 cookie_dict = {cookie.name: cookie.value for cookie in cj} 

465 return cookie_dict 

466 

467 

468def add_dict_to_cookiejar(cj, cookie_dict): 

469 """Returns a CookieJar from a key/value dictionary. 

470 

471 :param cj: CookieJar to insert cookies into. 

472 :param cookie_dict: Dict of key/values to insert into CookieJar. 

473 :rtype: CookieJar 

474 """ 

475 

476 return cookiejar_from_dict(cookie_dict, cj) 

477 

478 

479def get_encodings_from_content(content): 

480 """Returns encodings from given content string. 

481 

482 :param content: bytestring to extract encodings from. 

483 """ 

484 warnings.warn( 

485 ( 

486 "In requests 3.0, get_encodings_from_content will be removed. For " 

487 "more information, please see the discussion on issue #2266. (This" 

488 " warning should only appear once.)" 

489 ), 

490 DeprecationWarning, 

491 ) 

492 

493 charset_re = re.compile(r'<meta.*?charset=["\']*(.+?)["\'>]', flags=re.I) 

494 pragma_re = re.compile(r'<meta.*?content=["\']*;?charset=(.+?)["\'>]', flags=re.I) 

495 xml_re = re.compile(r'^<\?xml.*?encoding=["\']*(.+?)["\'>]') 

496 

497 return ( 

498 charset_re.findall(content) 

499 + pragma_re.findall(content) 

500 + xml_re.findall(content) 

501 ) 

502 

503 

504def _parse_content_type_header(header): 

505 """Returns content type and parameters from given header 

506 

507 :param header: string 

508 :return: tuple containing content type and dictionary of 

509 parameters 

510 """ 

511 

512 tokens = header.split(";") 

513 content_type, params = tokens[0].strip(), tokens[1:] 

514 params_dict = {} 

515 items_to_strip = "\"' " 

516 

517 for param in params: 

518 param = param.strip() 

519 if param: 

520 key, value = param, True 

521 index_of_equals = param.find("=") 

522 if index_of_equals != -1: 

523 key = param[:index_of_equals].strip(items_to_strip) 

524 value = param[index_of_equals + 1 :].strip(items_to_strip) 

525 params_dict[key.lower()] = value 

526 return content_type, params_dict 

527 

528 

529def get_encoding_from_headers(headers): 

530 """Returns encodings from given HTTP Header Dict. 

531 

532 :param headers: dictionary to extract encoding from. 

533 :rtype: str 

534 """ 

535 

536 content_type = headers.get("content-type") 

537 

538 if not content_type: 

539 return None 

540 

541 content_type, params = _parse_content_type_header(content_type) 

542 

543 if "charset" in params: 

544 return params["charset"].strip("'\"") 

545 

546 if "text" in content_type: 

547 return "ISO-8859-1" 

548 

549 if "application/json" in content_type: 

550 # Assume UTF-8 based on RFC 4627: https://www.ietf.org/rfc/rfc4627.txt since the charset was unset 

551 return "utf-8" 

552 

553 

554def stream_decode_response_unicode(iterator, r): 

555 """Stream decodes an iterator.""" 

556 

557 if r.encoding is None: 

558 yield from iterator 

559 return 

560 

561 decoder = codecs.getincrementaldecoder(r.encoding)(errors="replace") 

562 for chunk in iterator: 

563 rv = decoder.decode(chunk) 

564 if rv: 

565 yield rv 

566 rv = decoder.decode(b"", final=True) 

567 if rv: 

568 yield rv 

569 

570 

571def iter_slices(string, slice_length): 

572 """Iterate over slices of a string.""" 

573 pos = 0 

574 if slice_length is None or slice_length <= 0: 

575 slice_length = len(string) 

576 while pos < len(string): 

577 yield string[pos : pos + slice_length] 

578 pos += slice_length 

579 

580 

581def get_unicode_from_response(r): 

582 """Returns the requested content back in unicode. 

583 

584 :param r: Response object to get unicode content from. 

585 

586 Tried: 

587 

588 1. charset from content-type 

589 2. fall back and replace all unicode characters 

590 

591 :rtype: str 

592 """ 

593 warnings.warn( 

594 ( 

595 "In requests 3.0, get_unicode_from_response will be removed. For " 

596 "more information, please see the discussion on issue #2266. (This" 

597 " warning should only appear once.)" 

598 ), 

599 DeprecationWarning, 

600 ) 

601 

602 tried_encodings = [] 

603 

604 # Try charset from content-type 

605 encoding = get_encoding_from_headers(r.headers) 

606 

607 if encoding: 

608 try: 

609 return str(r.content, encoding) 

610 except UnicodeError: 

611 tried_encodings.append(encoding) 

612 

613 # Fall back: 

614 try: 

615 return str(r.content, encoding, errors="replace") 

616 except TypeError: 

617 return r.content 

618 

619 

620# The unreserved URI characters (RFC 3986) 

621UNRESERVED_SET = frozenset( 

622 "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" + "0123456789-._~" 

623) 

624 

625 

626def unquote_unreserved(uri): 

627 """Un-escape any percent-escape sequences in a URI that are unreserved 

628 characters. This leaves all reserved, illegal and non-ASCII bytes encoded. 

629 

630 :rtype: str 

631 """ 

632 parts = uri.split("%") 

633 for i in range(1, len(parts)): 

634 h = parts[i][0:2] 

635 if len(h) == 2 and h.isalnum(): 

636 try: 

637 c = chr(int(h, 16)) 

638 except ValueError: 

639 raise InvalidURL(f"Invalid percent-escape sequence: '{h}'") 

640 

641 if c in UNRESERVED_SET: 

642 parts[i] = c + parts[i][2:] 

643 else: 

644 parts[i] = f"%{parts[i]}" 

645 else: 

646 parts[i] = f"%{parts[i]}" 

647 return "".join(parts) 

648 

649 

650def requote_uri(uri): 

651 """Re-quote the given URI. 

652 

653 This function passes the given URI through an unquote/quote cycle to 

654 ensure that it is fully and consistently quoted. 

655 

656 :rtype: str 

657 """ 

658 safe_with_percent = "!#$%&'()*+,/:;=?@[]~" 

659 safe_without_percent = "!#$&'()*+,/:;=?@[]~" 

660 try: 

661 # Unquote only the unreserved characters 

662 # Then quote only illegal characters (do not quote reserved, 

663 # unreserved, or '%') 

664 return quote(unquote_unreserved(uri), safe=safe_with_percent) 

665 except InvalidURL: 

666 # We couldn't unquote the given URI, so let's try quoting it, but 

667 # there may be unquoted '%'s in the URI. We need to make sure they're 

668 # properly quoted so they do not cause issues elsewhere. 

669 return quote(uri, safe=safe_without_percent) 

670 

671 

672def address_in_network(ip, net): 

673 """This function allows you to check if an IP belongs to a network subnet 

674 

675 Example: returns True if ip = 192.168.1.1 and net = 192.168.1.0/24 

676 returns False if ip = 192.168.1.1 and net = 192.168.100.0/24 

677 

678 :rtype: bool 

679 """ 

680 ipaddr = struct.unpack("=L", socket.inet_aton(ip))[0] 

681 netaddr, bits = net.split("/") 

682 netmask = struct.unpack("=L", socket.inet_aton(dotted_netmask(int(bits))))[0] 

683 network = struct.unpack("=L", socket.inet_aton(netaddr))[0] & netmask 

684 return (ipaddr & netmask) == (network & netmask) 

685 

686 

687def dotted_netmask(mask): 

688 """Converts mask from /xx format to xxx.xxx.xxx.xxx 

689 

690 Example: if mask is 24 function returns 255.255.255.0 

691 

692 :rtype: str 

693 """ 

694 bits = 0xFFFFFFFF ^ (1 << 32 - mask) - 1 

695 return socket.inet_ntoa(struct.pack(">I", bits)) 

696 

697 

698def is_ipv4_address(string_ip): 

699 """ 

700 :rtype: bool 

701 """ 

702 try: 

703 socket.inet_aton(string_ip) 

704 except OSError: 

705 return False 

706 return True 

707 

708 

709def is_valid_cidr(string_network): 

710 """ 

711 Very simple check of the cidr format in no_proxy variable. 

712 

713 :rtype: bool 

714 """ 

715 if string_network.count("/") == 1: 

716 try: 

717 mask = int(string_network.split("/")[1]) 

718 except ValueError: 

719 return False 

720 

721 if mask < 1 or mask > 32: 

722 return False 

723 

724 try: 

725 socket.inet_aton(string_network.split("/")[0]) 

726 except OSError: 

727 return False 

728 else: 

729 return False 

730 return True 

731 

732 

733@contextlib.contextmanager 

734def set_environ(env_name, value): 

735 """Set the environment variable 'env_name' to 'value' 

736 

737 Save previous value, yield, and then restore the previous value stored in 

738 the environment variable 'env_name'. 

739 

740 If 'value' is None, do nothing""" 

741 value_changed = value is not None 

742 if value_changed: 

743 old_value = os.environ.get(env_name) 

744 os.environ[env_name] = value 

745 try: 

746 yield 

747 finally: 

748 if value_changed: 

749 if old_value is None: 

750 del os.environ[env_name] 

751 else: 

752 os.environ[env_name] = old_value 

753 

754 

755def should_bypass_proxies(url, no_proxy): 

756 """ 

757 Returns whether we should bypass proxies or not. 

758 

759 :rtype: bool 

760 """ 

761 

762 # Prioritize lowercase environment variables over uppercase 

763 # to keep a consistent behaviour with other http projects (curl, wget). 

764 def get_proxy(key): 

765 return os.environ.get(key) or os.environ.get(key.upper()) 

766 

767 # First check whether no_proxy is defined. If it is, check that the URL 

768 # we're getting isn't in the no_proxy list. 

769 no_proxy_arg = no_proxy 

770 if no_proxy is None: 

771 no_proxy = get_proxy("no_proxy") 

772 parsed = urlparse(url) 

773 

774 if parsed.hostname is None: 

775 # URLs don't always have hostnames, e.g. file:/// urls. 

776 return True 

777 

778 if no_proxy: 

779 # We need to check whether we match here. We need to see if we match 

780 # the end of the hostname, both with and without the port. 

781 no_proxy = (host for host in no_proxy.replace(" ", "").split(",") if host) 

782 

783 if is_ipv4_address(parsed.hostname): 

784 for proxy_ip in no_proxy: 

785 if is_valid_cidr(proxy_ip): 

786 if address_in_network(parsed.hostname, proxy_ip): 

787 return True 

788 elif parsed.hostname == proxy_ip: 

789 # If no_proxy ip was defined in plain IP notation instead of cidr notation & 

790 # matches the IP of the index 

791 return True 

792 else: 

793 host_with_port = parsed.hostname 

794 if parsed.port: 

795 host_with_port += f":{parsed.port}" 

796 

797 for host in no_proxy: 

798 if parsed.hostname.endswith(host) or host_with_port.endswith(host): 

799 # The URL does match something in no_proxy, so we don't want 

800 # to apply the proxies on this URL. 

801 return True 

802 

803 with set_environ("no_proxy", no_proxy_arg): 

804 # parsed.hostname can be `None` in cases such as a file URI. 

805 try: 

806 bypass = proxy_bypass(parsed.hostname) 

807 except (TypeError, socket.gaierror): 

808 bypass = False 

809 

810 if bypass: 

811 return True 

812 

813 return False 

814 

815 

816def get_environ_proxies(url, no_proxy=None): 

817 """ 

818 Return a dict of environment proxies. 

819 

820 :rtype: dict 

821 """ 

822 if should_bypass_proxies(url, no_proxy=no_proxy): 

823 return {} 

824 else: 

825 return getproxies() 

826 

827 

828def select_proxy(url, proxies): 

829 """Select a proxy for the url, if applicable. 

830 

831 :param url: The url being for the request 

832 :param proxies: A dictionary of schemes or schemes and hosts to proxy URLs 

833 """ 

834 proxies = proxies or {} 

835 urlparts = urlparse(url) 

836 if urlparts.hostname is None: 

837 return proxies.get(urlparts.scheme, proxies.get("all")) 

838 

839 proxy_keys = [ 

840 urlparts.scheme + "://" + urlparts.hostname, 

841 urlparts.scheme, 

842 "all://" + urlparts.hostname, 

843 "all", 

844 ] 

845 proxy = None 

846 for proxy_key in proxy_keys: 

847 if proxy_key in proxies: 

848 proxy = proxies[proxy_key] 

849 break 

850 

851 return proxy 

852 

853 

854def resolve_proxies(request, proxies, trust_env=True): 

855 """This method takes proxy information from a request and configuration 

856 input to resolve a mapping of target proxies. This will consider settings 

857 such as NO_PROXY to strip proxy configurations. 

858 

859 :param request: Request or PreparedRequest 

860 :param proxies: A dictionary of schemes or schemes and hosts to proxy URLs 

861 :param trust_env: Boolean declaring whether to trust environment configs 

862 

863 :rtype: dict 

864 """ 

865 proxies = proxies if proxies is not None else {} 

866 url = request.url 

867 scheme = urlparse(url).scheme 

868 no_proxy = proxies.get("no_proxy") 

869 new_proxies = proxies.copy() 

870 

871 if trust_env and not should_bypass_proxies(url, no_proxy=no_proxy): 

872 environ_proxies = get_environ_proxies(url, no_proxy=no_proxy) 

873 

874 proxy = environ_proxies.get(scheme, environ_proxies.get("all")) 

875 

876 if proxy: 

877 new_proxies.setdefault(scheme, proxy) 

878 return new_proxies 

879 

880 

881def default_user_agent(name="python-requests"): 

882 """ 

883 Return a string representing the default user agent. 

884 

885 :rtype: str 

886 """ 

887 return f"{name}/{__version__}" 

888 

889 

890def default_headers(): 

891 """ 

892 :rtype: requests.structures.CaseInsensitiveDict 

893 """ 

894 return CaseInsensitiveDict( 

895 { 

896 "User-Agent": default_user_agent(), 

897 "Accept-Encoding": DEFAULT_ACCEPT_ENCODING, 

898 "Accept": "*/*", 

899 "Connection": "keep-alive", 

900 } 

901 ) 

902 

903 

904def parse_header_links(value): 

905 """Return a list of parsed link headers proxies. 

906 

907 i.e. Link: <http:/.../front.jpeg>; rel=front; type="image/jpeg",<http://.../back.jpeg>; rel=back;type="image/jpeg" 

908 

909 :rtype: list 

910 """ 

911 

912 links = [] 

913 

914 replace_chars = " '\"" 

915 

916 value = value.strip(replace_chars) 

917 if not value: 

918 return links 

919 

920 for val in re.split(", *<", value): 

921 try: 

922 url, params = val.split(";", 1) 

923 except ValueError: 

924 url, params = val, "" 

925 

926 link = {"url": url.strip("<> '\"")} 

927 

928 for param in params.split(";"): 

929 try: 

930 key, value = param.split("=") 

931 except ValueError: 

932 break 

933 

934 link[key.strip(replace_chars)] = value.strip(replace_chars) 

935 

936 links.append(link) 

937 

938 return links 

939 

940 

941# Null bytes; no need to recreate these on each call to guess_json_utf 

942_null = "\x00".encode("ascii") # encoding to ASCII for Python 3 

943_null2 = _null * 2 

944_null3 = _null * 3 

945 

946 

947def guess_json_utf(data): 

948 """ 

949 :rtype: str 

950 """ 

951 # JSON always starts with two ASCII characters, so detection is as 

952 # easy as counting the nulls and from their location and count 

953 # determine the encoding. Also detect a BOM, if present. 

954 sample = data[:4] 

955 if sample in (codecs.BOM_UTF32_LE, codecs.BOM_UTF32_BE): 

956 return "utf-32" # BOM included 

957 if sample[:3] == codecs.BOM_UTF8: 

958 return "utf-8-sig" # BOM included, MS style (discouraged) 

959 if sample[:2] in (codecs.BOM_UTF16_LE, codecs.BOM_UTF16_BE): 

960 return "utf-16" # BOM included 

961 nullcount = sample.count(_null) 

962 if nullcount == 0: 

963 return "utf-8" 

964 if nullcount == 2: 

965 if sample[::2] == _null2: # 1st and 3rd are null 

966 return "utf-16-be" 

967 if sample[1::2] == _null2: # 2nd and 4th are null 

968 return "utf-16-le" 

969 # Did not detect 2 valid UTF-16 ascii-range characters 

970 if nullcount == 3: 

971 if sample[:3] == _null3: 

972 return "utf-32-be" 

973 if sample[1:] == _null3: 

974 return "utf-32-le" 

975 # Did not detect a valid UTF-32 ascii-range character 

976 return None 

977 

978 

979def prepend_scheme_if_needed(url, new_scheme): 

980 """Given a URL that may or may not have a scheme, prepend the given scheme. 

981 Does not replace a present scheme with the one provided as an argument. 

982 

983 :rtype: str 

984 """ 

985 parsed = parse_url(url) 

986 scheme, auth, host, port, path, query, fragment = parsed 

987 

988 # A defect in urlparse determines that there isn't a netloc present in some 

989 # urls. We previously assumed parsing was overly cautious, and swapped the 

990 # netloc and path. Due to a lack of tests on the original defect, this is 

991 # maintained with parse_url for backwards compatibility. 

992 netloc = parsed.netloc 

993 if not netloc: 

994 netloc, path = path, netloc 

995 

996 if auth: 

997 # parse_url doesn't provide the netloc with auth 

998 # so we'll add it ourselves. 

999 netloc = "@".join([auth, netloc]) 

1000 if scheme is None: 

1001 scheme = new_scheme 

1002 if path is None: 

1003 path = "" 

1004 

1005 return urlunparse((scheme, netloc, path, "", query, fragment)) 

1006 

1007 

1008def get_auth_from_url(url): 

1009 """Given a url with authentication components, extract them into a tuple of 

1010 username,password. 

1011 

1012 :rtype: (str,str) 

1013 """ 

1014 parsed = urlparse(url) 

1015 

1016 try: 

1017 auth = (unquote(parsed.username), unquote(parsed.password)) 

1018 except (AttributeError, TypeError): 

1019 auth = ("", "") 

1020 

1021 return auth 

1022 

1023 

1024def check_header_validity(header): 

1025 """Verifies that header parts don't contain leading whitespace 

1026 reserved characters, or return characters. 

1027 

1028 :param header: tuple, in the format (name, value). 

1029 """ 

1030 name, value = header 

1031 _validate_header_part(header, name, 0) 

1032 _validate_header_part(header, value, 1) 

1033 

1034 

1035def _validate_header_part(header, header_part, header_validator_index): 

1036 if isinstance(header_part, str): 

1037 validator = _HEADER_VALIDATORS_STR[header_validator_index] 

1038 elif isinstance(header_part, bytes): 

1039 validator = _HEADER_VALIDATORS_BYTE[header_validator_index] 

1040 else: 

1041 raise InvalidHeader( 

1042 f"Header part ({header_part!r}) from {header} " 

1043 f"must be of type str or bytes, not {type(header_part)}" 

1044 ) 

1045 

1046 if not validator.match(header_part): 

1047 header_kind = "name" if header_validator_index == 0 else "value" 

1048 raise InvalidHeader( 

1049 f"Invalid leading whitespace, reserved character(s), or return " 

1050 f"character(s) in header {header_kind}: {header_part!r}" 

1051 ) 

1052 

1053 

1054def urldefragauth(url): 

1055 """ 

1056 Given a url remove the fragment and the authentication part. 

1057 

1058 :rtype: str 

1059 """ 

1060 scheme, netloc, path, params, query, fragment = urlparse(url) 

1061 

1062 # see func:`prepend_scheme_if_needed` 

1063 if not netloc: 

1064 netloc, path = path, netloc 

1065 

1066 netloc = netloc.rsplit("@", 1)[-1] 

1067 

1068 return urlunparse((scheme, netloc, path, params, query, "")) 

1069 

1070 

1071def rewind_body(prepared_request): 

1072 """Move file pointer back to its recorded starting position 

1073 so it can be read again on redirect. 

1074 """ 

1075 body_seek = getattr(prepared_request.body, "seek", None) 

1076 if body_seek is not None and isinstance( 

1077 prepared_request._body_position, integer_types 

1078 ): 

1079 try: 

1080 body_seek(prepared_request._body_position) 

1081 except OSError: 

1082 raise UnrewindableBodyError( 

1083 "An error occurred when rewinding request body for redirect." 

1084 ) 

1085 else: 

1086 raise UnrewindableBodyError("Unable to rewind request body for redirect.")