Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pip/_vendor/requests/utils.py: 15%

485 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:48 +0000

1""" 

2requests.utils 

3~~~~~~~~~~~~~~ 

4 

5This module provides utility functions that are used within Requests 

6that are also useful for external consumption. 

7""" 

8 

9import codecs 

10import contextlib 

11import io 

12import os 

13import re 

14import socket 

15import struct 

16import sys 

17import tempfile 

18import warnings 

19import zipfile 

20from collections import OrderedDict 

21 

22from pip._vendor.urllib3.util import make_headers, parse_url 

23 

24from . import certs 

25from .__version__ import __version__ 

26 

27# to_native_string is unused here, but imported here for backwards compatibility 

28from ._internal_utils import ( # noqa: F401 

29 _HEADER_VALIDATORS_BYTE, 

30 _HEADER_VALIDATORS_STR, 

31 HEADER_VALIDATORS, 

32 to_native_string, 

33) 

34from .compat import ( 

35 Mapping, 

36 basestring, 

37 bytes, 

38 getproxies, 

39 getproxies_environment, 

40 integer_types, 

41) 

42from .compat import parse_http_list as _parse_list_header 

43from .compat import ( 

44 proxy_bypass, 

45 proxy_bypass_environment, 

46 quote, 

47 str, 

48 unquote, 

49 urlparse, 

50 urlunparse, 

51) 

52from .cookies import cookiejar_from_dict 

53from .exceptions import ( 

54 FileModeWarning, 

55 InvalidHeader, 

56 InvalidURL, 

57 UnrewindableBodyError, 

58) 

59from .structures import CaseInsensitiveDict 

60 

61NETRC_FILES = (".netrc", "_netrc") 

62 

63DEFAULT_CA_BUNDLE_PATH = certs.where() 

64 

65DEFAULT_PORTS = {"http": 80, "https": 443} 

66 

67# Ensure that ', ' is used to preserve previous delimiter behavior. 

68DEFAULT_ACCEPT_ENCODING = ", ".join( 

69 re.split(r",\s*", make_headers(accept_encoding=True)["accept-encoding"]) 

70) 

71 

72 

73if sys.platform == "win32": 

74 # provide a proxy_bypass version on Windows without DNS lookups 

75 

76 def proxy_bypass_registry(host): 

77 try: 

78 import winreg 

79 except ImportError: 

80 return False 

81 

82 try: 

83 internetSettings = winreg.OpenKey( 

84 winreg.HKEY_CURRENT_USER, 

85 r"Software\Microsoft\Windows\CurrentVersion\Internet Settings", 

86 ) 

87 # ProxyEnable could be REG_SZ or REG_DWORD, normalizing it 

88 proxyEnable = int(winreg.QueryValueEx(internetSettings, "ProxyEnable")[0]) 

89 # ProxyOverride is almost always a string 

90 proxyOverride = winreg.QueryValueEx(internetSettings, "ProxyOverride")[0] 

91 except (OSError, ValueError): 

92 return False 

93 if not proxyEnable or not proxyOverride: 

94 return False 

95 

96 # make a check value list from the registry entry: replace the 

97 # '<local>' string by the localhost entry and the corresponding 

98 # canonical entry. 

99 proxyOverride = proxyOverride.split(";") 

100 # now check if we match one of the registry values. 

101 for test in proxyOverride: 

102 if test == "<local>": 

103 if "." not in host: 

104 return True 

105 test = test.replace(".", r"\.") # mask dots 

106 test = test.replace("*", r".*") # change glob sequence 

107 test = test.replace("?", r".") # change glob char 

108 if re.match(test, host, re.I): 

109 return True 

110 return False 

111 

112 def proxy_bypass(host): # noqa 

113 """Return True, if the host should be bypassed. 

114 

115 Checks proxy settings gathered from the environment, if specified, 

116 or the registry. 

117 """ 

118 if getproxies_environment(): 

119 return proxy_bypass_environment(host) 

120 else: 

121 return proxy_bypass_registry(host) 

122 

123 

124def dict_to_sequence(d): 

125 """Returns an internal sequence dictionary update.""" 

126 

127 if hasattr(d, "items"): 

128 d = d.items() 

129 

130 return d 

131 

132 

133def super_len(o): 

134 total_length = None 

135 current_position = 0 

136 

137 if hasattr(o, "__len__"): 

138 total_length = len(o) 

139 

140 elif hasattr(o, "len"): 

141 total_length = o.len 

142 

143 elif hasattr(o, "fileno"): 

144 try: 

145 fileno = o.fileno() 

146 except (io.UnsupportedOperation, AttributeError): 

147 # AttributeError is a surprising exception, seeing as how we've just checked 

148 # that `hasattr(o, 'fileno')`. It happens for objects obtained via 

149 # `Tarfile.extractfile()`, per issue 5229. 

150 pass 

151 else: 

152 total_length = os.fstat(fileno).st_size 

153 

154 # Having used fstat to determine the file length, we need to 

155 # confirm that this file was opened up in binary mode. 

156 if "b" not in o.mode: 

157 warnings.warn( 

158 ( 

159 "Requests has determined the content-length for this " 

160 "request using the binary size of the file: however, the " 

161 "file has been opened in text mode (i.e. without the 'b' " 

162 "flag in the mode). This may lead to an incorrect " 

163 "content-length. In Requests 3.0, support will be removed " 

164 "for files in text mode." 

165 ), 

166 FileModeWarning, 

167 ) 

168 

169 if hasattr(o, "tell"): 

170 try: 

171 current_position = o.tell() 

172 except OSError: 

173 # This can happen in some weird situations, such as when the file 

174 # is actually a special file descriptor like stdin. In this 

175 # instance, we don't know what the length is, so set it to zero and 

176 # let requests chunk it instead. 

177 if total_length is not None: 

178 current_position = total_length 

179 else: 

180 if hasattr(o, "seek") and total_length is None: 

181 # StringIO and BytesIO have seek but no usable fileno 

182 try: 

183 # seek to end of file 

184 o.seek(0, 2) 

185 total_length = o.tell() 

186 

187 # seek back to current position to support 

188 # partially read file-like objects 

189 o.seek(current_position or 0) 

190 except OSError: 

191 total_length = 0 

192 

193 if total_length is None: 

194 total_length = 0 

195 

196 return max(0, total_length - current_position) 

197 

198 

199def get_netrc_auth(url, raise_errors=False): 

200 """Returns the Requests tuple auth for a given url from netrc.""" 

201 

202 netrc_file = os.environ.get("NETRC") 

203 if netrc_file is not None: 

204 netrc_locations = (netrc_file,) 

205 else: 

206 netrc_locations = (f"~/{f}" for f in NETRC_FILES) 

207 

208 try: 

209 from netrc import NetrcParseError, netrc 

210 

211 netrc_path = None 

212 

213 for f in netrc_locations: 

214 try: 

215 loc = os.path.expanduser(f) 

216 except KeyError: 

217 # os.path.expanduser can fail when $HOME is undefined and 

218 # getpwuid fails. See https://bugs.python.org/issue20164 & 

219 # https://github.com/psf/requests/issues/1846 

220 return 

221 

222 if os.path.exists(loc): 

223 netrc_path = loc 

224 break 

225 

226 # Abort early if there isn't one. 

227 if netrc_path is None: 

228 return 

229 

230 ri = urlparse(url) 

231 

232 # Strip port numbers from netloc. This weird `if...encode`` dance is 

233 # used for Python 3.2, which doesn't support unicode literals. 

234 splitstr = b":" 

235 if isinstance(url, str): 

236 splitstr = splitstr.decode("ascii") 

237 host = ri.netloc.split(splitstr)[0] 

238 

239 try: 

240 _netrc = netrc(netrc_path).authenticators(host) 

241 if _netrc: 

242 # Return with login / password 

243 login_i = 0 if _netrc[0] else 1 

244 return (_netrc[login_i], _netrc[2]) 

245 except (NetrcParseError, OSError): 

246 # If there was a parsing error or a permissions issue reading the file, 

247 # we'll just skip netrc auth unless explicitly asked to raise errors. 

248 if raise_errors: 

249 raise 

250 

251 # App Engine hackiness. 

252 except (ImportError, AttributeError): 

253 pass 

254 

255 

256def guess_filename(obj): 

257 """Tries to guess the filename of the given object.""" 

258 name = getattr(obj, "name", None) 

259 if name and isinstance(name, basestring) and name[0] != "<" and name[-1] != ">": 

260 return os.path.basename(name) 

261 

262 

263def extract_zipped_paths(path): 

264 """Replace nonexistent paths that look like they refer to a member of a zip 

265 archive with the location of an extracted copy of the target, or else 

266 just return the provided path unchanged. 

267 """ 

268 if os.path.exists(path): 

269 # this is already a valid path, no need to do anything further 

270 return path 

271 

272 # find the first valid part of the provided path and treat that as a zip archive 

273 # assume the rest of the path is the name of a member in the archive 

274 archive, member = os.path.split(path) 

275 while archive and not os.path.exists(archive): 

276 archive, prefix = os.path.split(archive) 

277 if not prefix: 

278 # If we don't check for an empty prefix after the split (in other words, archive remains unchanged after the split), 

279 # we _can_ end up in an infinite loop on a rare corner case affecting a small number of users 

280 break 

281 member = "/".join([prefix, member]) 

282 

283 if not zipfile.is_zipfile(archive): 

284 return path 

285 

286 zip_file = zipfile.ZipFile(archive) 

287 if member not in zip_file.namelist(): 

288 return path 

289 

290 # we have a valid zip archive and a valid member of that archive 

291 tmp = tempfile.gettempdir() 

292 extracted_path = os.path.join(tmp, member.split("/")[-1]) 

293 if not os.path.exists(extracted_path): 

294 # use read + write to avoid the creating nested folders, we only want the file, avoids mkdir racing condition 

295 with atomic_open(extracted_path) as file_handler: 

296 file_handler.write(zip_file.read(member)) 

297 return extracted_path 

298 

299 

300@contextlib.contextmanager 

301def atomic_open(filename): 

302 """Write a file to the disk in an atomic fashion""" 

303 tmp_descriptor, tmp_name = tempfile.mkstemp(dir=os.path.dirname(filename)) 

304 try: 

305 with os.fdopen(tmp_descriptor, "wb") as tmp_handler: 

306 yield tmp_handler 

307 os.replace(tmp_name, filename) 

308 except BaseException: 

309 os.remove(tmp_name) 

310 raise 

311 

312 

313def from_key_val_list(value): 

314 """Take an object and test to see if it can be represented as a 

315 dictionary. Unless it can not be represented as such, return an 

316 OrderedDict, e.g., 

317 

318 :: 

319 

320 >>> from_key_val_list([('key', 'val')]) 

321 OrderedDict([('key', 'val')]) 

322 >>> from_key_val_list('string') 

323 Traceback (most recent call last): 

324 ... 

325 ValueError: cannot encode objects that are not 2-tuples 

326 >>> from_key_val_list({'key': 'val'}) 

327 OrderedDict([('key', 'val')]) 

328 

329 :rtype: OrderedDict 

330 """ 

331 if value is None: 

332 return None 

333 

334 if isinstance(value, (str, bytes, bool, int)): 

335 raise ValueError("cannot encode objects that are not 2-tuples") 

336 

337 return OrderedDict(value) 

338 

339 

340def to_key_val_list(value): 

341 """Take an object and test to see if it can be represented as a 

342 dictionary. If it can be, return a list of tuples, e.g., 

343 

344 :: 

345 

346 >>> to_key_val_list([('key', 'val')]) 

347 [('key', 'val')] 

348 >>> to_key_val_list({'key': 'val'}) 

349 [('key', 'val')] 

350 >>> to_key_val_list('string') 

351 Traceback (most recent call last): 

352 ... 

353 ValueError: cannot encode objects that are not 2-tuples 

354 

355 :rtype: list 

356 """ 

357 if value is None: 

358 return None 

359 

360 if isinstance(value, (str, bytes, bool, int)): 

361 raise ValueError("cannot encode objects that are not 2-tuples") 

362 

363 if isinstance(value, Mapping): 

364 value = value.items() 

365 

366 return list(value) 

367 

368 

369# From mitsuhiko/werkzeug (used with permission). 

370def parse_list_header(value): 

371 """Parse lists as described by RFC 2068 Section 2. 

372 

373 In particular, parse comma-separated lists where the elements of 

374 the list may include quoted-strings. A quoted-string could 

375 contain a comma. A non-quoted string could have quotes in the 

376 middle. Quotes are removed automatically after parsing. 

377 

378 It basically works like :func:`parse_set_header` just that items 

379 may appear multiple times and case sensitivity is preserved. 

380 

381 The return value is a standard :class:`list`: 

382 

383 >>> parse_list_header('token, "quoted value"') 

384 ['token', 'quoted value'] 

385 

386 To create a header from the :class:`list` again, use the 

387 :func:`dump_header` function. 

388 

389 :param value: a string with a list header. 

390 :return: :class:`list` 

391 :rtype: list 

392 """ 

393 result = [] 

394 for item in _parse_list_header(value): 

395 if item[:1] == item[-1:] == '"': 

396 item = unquote_header_value(item[1:-1]) 

397 result.append(item) 

398 return result 

399 

400 

401# From mitsuhiko/werkzeug (used with permission). 

402def parse_dict_header(value): 

403 """Parse lists of key, value pairs as described by RFC 2068 Section 2 and 

404 convert them into a python dict: 

405 

406 >>> d = parse_dict_header('foo="is a fish", bar="as well"') 

407 >>> type(d) is dict 

408 True 

409 >>> sorted(d.items()) 

410 [('bar', 'as well'), ('foo', 'is a fish')] 

411 

412 If there is no value for a key it will be `None`: 

413 

414 >>> parse_dict_header('key_without_value') 

415 {'key_without_value': None} 

416 

417 To create a header from the :class:`dict` again, use the 

418 :func:`dump_header` function. 

419 

420 :param value: a string with a dict header. 

421 :return: :class:`dict` 

422 :rtype: dict 

423 """ 

424 result = {} 

425 for item in _parse_list_header(value): 

426 if "=" not in item: 

427 result[item] = None 

428 continue 

429 name, value = item.split("=", 1) 

430 if value[:1] == value[-1:] == '"': 

431 value = unquote_header_value(value[1:-1]) 

432 result[name] = value 

433 return result 

434 

435 

436# From mitsuhiko/werkzeug (used with permission). 

437def unquote_header_value(value, is_filename=False): 

438 r"""Unquotes a header value. (Reversal of :func:`quote_header_value`). 

439 This does not use the real unquoting but what browsers are actually 

440 using for quoting. 

441 

442 :param value: the header value to unquote. 

443 :rtype: str 

444 """ 

445 if value and value[0] == value[-1] == '"': 

446 # this is not the real unquoting, but fixing this so that the 

447 # RFC is met will result in bugs with internet explorer and 

448 # probably some other browsers as well. IE for example is 

449 # uploading files with "C:\foo\bar.txt" as filename 

450 value = value[1:-1] 

451 

452 # if this is a filename and the starting characters look like 

453 # a UNC path, then just return the value without quotes. Using the 

454 # replace sequence below on a UNC path has the effect of turning 

455 # the leading double slash into a single slash and then 

456 # _fix_ie_filename() doesn't work correctly. See #458. 

457 if not is_filename or value[:2] != "\\\\": 

458 return value.replace("\\\\", "\\").replace('\\"', '"') 

459 return value 

460 

461 

462def dict_from_cookiejar(cj): 

463 """Returns a key/value dictionary from a CookieJar. 

464 

465 :param cj: CookieJar object to extract cookies from. 

466 :rtype: dict 

467 """ 

468 

469 cookie_dict = {} 

470 

471 for cookie in cj: 

472 cookie_dict[cookie.name] = cookie.value 

473 

474 return cookie_dict 

475 

476 

477def add_dict_to_cookiejar(cj, cookie_dict): 

478 """Returns a CookieJar from a key/value dictionary. 

479 

480 :param cj: CookieJar to insert cookies into. 

481 :param cookie_dict: Dict of key/values to insert into CookieJar. 

482 :rtype: CookieJar 

483 """ 

484 

485 return cookiejar_from_dict(cookie_dict, cj) 

486 

487 

488def get_encodings_from_content(content): 

489 """Returns encodings from given content string. 

490 

491 :param content: bytestring to extract encodings from. 

492 """ 

493 warnings.warn( 

494 ( 

495 "In requests 3.0, get_encodings_from_content will be removed. For " 

496 "more information, please see the discussion on issue #2266. (This" 

497 " warning should only appear once.)" 

498 ), 

499 DeprecationWarning, 

500 ) 

501 

502 charset_re = re.compile(r'<meta.*?charset=["\']*(.+?)["\'>]', flags=re.I) 

503 pragma_re = re.compile(r'<meta.*?content=["\']*;?charset=(.+?)["\'>]', flags=re.I) 

504 xml_re = re.compile(r'^<\?xml.*?encoding=["\']*(.+?)["\'>]') 

505 

506 return ( 

507 charset_re.findall(content) 

508 + pragma_re.findall(content) 

509 + xml_re.findall(content) 

510 ) 

511 

512 

513def _parse_content_type_header(header): 

514 """Returns content type and parameters from given header 

515 

516 :param header: string 

517 :return: tuple containing content type and dictionary of 

518 parameters 

519 """ 

520 

521 tokens = header.split(";") 

522 content_type, params = tokens[0].strip(), tokens[1:] 

523 params_dict = {} 

524 items_to_strip = "\"' " 

525 

526 for param in params: 

527 param = param.strip() 

528 if param: 

529 key, value = param, True 

530 index_of_equals = param.find("=") 

531 if index_of_equals != -1: 

532 key = param[:index_of_equals].strip(items_to_strip) 

533 value = param[index_of_equals + 1 :].strip(items_to_strip) 

534 params_dict[key.lower()] = value 

535 return content_type, params_dict 

536 

537 

538def get_encoding_from_headers(headers): 

539 """Returns encodings from given HTTP Header Dict. 

540 

541 :param headers: dictionary to extract encoding from. 

542 :rtype: str 

543 """ 

544 

545 content_type = headers.get("content-type") 

546 

547 if not content_type: 

548 return None 

549 

550 content_type, params = _parse_content_type_header(content_type) 

551 

552 if "charset" in params: 

553 return params["charset"].strip("'\"") 

554 

555 if "text" in content_type: 

556 return "ISO-8859-1" 

557 

558 if "application/json" in content_type: 

559 # Assume UTF-8 based on RFC 4627: https://www.ietf.org/rfc/rfc4627.txt since the charset was unset 

560 return "utf-8" 

561 

562 

563def stream_decode_response_unicode(iterator, r): 

564 """Stream decodes an iterator.""" 

565 

566 if r.encoding is None: 

567 yield from iterator 

568 return 

569 

570 decoder = codecs.getincrementaldecoder(r.encoding)(errors="replace") 

571 for chunk in iterator: 

572 rv = decoder.decode(chunk) 

573 if rv: 

574 yield rv 

575 rv = decoder.decode(b"", final=True) 

576 if rv: 

577 yield rv 

578 

579 

580def iter_slices(string, slice_length): 

581 """Iterate over slices of a string.""" 

582 pos = 0 

583 if slice_length is None or slice_length <= 0: 

584 slice_length = len(string) 

585 while pos < len(string): 

586 yield string[pos : pos + slice_length] 

587 pos += slice_length 

588 

589 

590def get_unicode_from_response(r): 

591 """Returns the requested content back in unicode. 

592 

593 :param r: Response object to get unicode content from. 

594 

595 Tried: 

596 

597 1. charset from content-type 

598 2. fall back and replace all unicode characters 

599 

600 :rtype: str 

601 """ 

602 warnings.warn( 

603 ( 

604 "In requests 3.0, get_unicode_from_response will be removed. For " 

605 "more information, please see the discussion on issue #2266. (This" 

606 " warning should only appear once.)" 

607 ), 

608 DeprecationWarning, 

609 ) 

610 

611 tried_encodings = [] 

612 

613 # Try charset from content-type 

614 encoding = get_encoding_from_headers(r.headers) 

615 

616 if encoding: 

617 try: 

618 return str(r.content, encoding) 

619 except UnicodeError: 

620 tried_encodings.append(encoding) 

621 

622 # Fall back: 

623 try: 

624 return str(r.content, encoding, errors="replace") 

625 except TypeError: 

626 return r.content 

627 

628 

629# The unreserved URI characters (RFC 3986) 

630UNRESERVED_SET = frozenset( 

631 "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" + "0123456789-._~" 

632) 

633 

634 

635def unquote_unreserved(uri): 

636 """Un-escape any percent-escape sequences in a URI that are unreserved 

637 characters. This leaves all reserved, illegal and non-ASCII bytes encoded. 

638 

639 :rtype: str 

640 """ 

641 parts = uri.split("%") 

642 for i in range(1, len(parts)): 

643 h = parts[i][0:2] 

644 if len(h) == 2 and h.isalnum(): 

645 try: 

646 c = chr(int(h, 16)) 

647 except ValueError: 

648 raise InvalidURL(f"Invalid percent-escape sequence: '{h}'") 

649 

650 if c in UNRESERVED_SET: 

651 parts[i] = c + parts[i][2:] 

652 else: 

653 parts[i] = f"%{parts[i]}" 

654 else: 

655 parts[i] = f"%{parts[i]}" 

656 return "".join(parts) 

657 

658 

659def requote_uri(uri): 

660 """Re-quote the given URI. 

661 

662 This function passes the given URI through an unquote/quote cycle to 

663 ensure that it is fully and consistently quoted. 

664 

665 :rtype: str 

666 """ 

667 safe_with_percent = "!#$%&'()*+,/:;=?@[]~" 

668 safe_without_percent = "!#$&'()*+,/:;=?@[]~" 

669 try: 

670 # Unquote only the unreserved characters 

671 # Then quote only illegal characters (do not quote reserved, 

672 # unreserved, or '%') 

673 return quote(unquote_unreserved(uri), safe=safe_with_percent) 

674 except InvalidURL: 

675 # We couldn't unquote the given URI, so let's try quoting it, but 

676 # there may be unquoted '%'s in the URI. We need to make sure they're 

677 # properly quoted so they do not cause issues elsewhere. 

678 return quote(uri, safe=safe_without_percent) 

679 

680 

681def address_in_network(ip, net): 

682 """This function allows you to check if an IP belongs to a network subnet 

683 

684 Example: returns True if ip = 192.168.1.1 and net = 192.168.1.0/24 

685 returns False if ip = 192.168.1.1 and net = 192.168.100.0/24 

686 

687 :rtype: bool 

688 """ 

689 ipaddr = struct.unpack("=L", socket.inet_aton(ip))[0] 

690 netaddr, bits = net.split("/") 

691 netmask = struct.unpack("=L", socket.inet_aton(dotted_netmask(int(bits))))[0] 

692 network = struct.unpack("=L", socket.inet_aton(netaddr))[0] & netmask 

693 return (ipaddr & netmask) == (network & netmask) 

694 

695 

696def dotted_netmask(mask): 

697 """Converts mask from /xx format to xxx.xxx.xxx.xxx 

698 

699 Example: if mask is 24 function returns 255.255.255.0 

700 

701 :rtype: str 

702 """ 

703 bits = 0xFFFFFFFF ^ (1 << 32 - mask) - 1 

704 return socket.inet_ntoa(struct.pack(">I", bits)) 

705 

706 

707def is_ipv4_address(string_ip): 

708 """ 

709 :rtype: bool 

710 """ 

711 try: 

712 socket.inet_aton(string_ip) 

713 except OSError: 

714 return False 

715 return True 

716 

717 

718def is_valid_cidr(string_network): 

719 """ 

720 Very simple check of the cidr format in no_proxy variable. 

721 

722 :rtype: bool 

723 """ 

724 if string_network.count("/") == 1: 

725 try: 

726 mask = int(string_network.split("/")[1]) 

727 except ValueError: 

728 return False 

729 

730 if mask < 1 or mask > 32: 

731 return False 

732 

733 try: 

734 socket.inet_aton(string_network.split("/")[0]) 

735 except OSError: 

736 return False 

737 else: 

738 return False 

739 return True 

740 

741 

742@contextlib.contextmanager 

743def set_environ(env_name, value): 

744 """Set the environment variable 'env_name' to 'value' 

745 

746 Save previous value, yield, and then restore the previous value stored in 

747 the environment variable 'env_name'. 

748 

749 If 'value' is None, do nothing""" 

750 value_changed = value is not None 

751 if value_changed: 

752 old_value = os.environ.get(env_name) 

753 os.environ[env_name] = value 

754 try: 

755 yield 

756 finally: 

757 if value_changed: 

758 if old_value is None: 

759 del os.environ[env_name] 

760 else: 

761 os.environ[env_name] = old_value 

762 

763 

764def should_bypass_proxies(url, no_proxy): 

765 """ 

766 Returns whether we should bypass proxies or not. 

767 

768 :rtype: bool 

769 """ 

770 # Prioritize lowercase environment variables over uppercase 

771 # to keep a consistent behaviour with other http projects (curl, wget). 

772 def get_proxy(key): 

773 return os.environ.get(key) or os.environ.get(key.upper()) 

774 

775 # First check whether no_proxy is defined. If it is, check that the URL 

776 # we're getting isn't in the no_proxy list. 

777 no_proxy_arg = no_proxy 

778 if no_proxy is None: 

779 no_proxy = get_proxy("no_proxy") 

780 parsed = urlparse(url) 

781 

782 if parsed.hostname is None: 

783 # URLs don't always have hostnames, e.g. file:/// urls. 

784 return True 

785 

786 if no_proxy: 

787 # We need to check whether we match here. We need to see if we match 

788 # the end of the hostname, both with and without the port. 

789 no_proxy = (host for host in no_proxy.replace(" ", "").split(",") if host) 

790 

791 if is_ipv4_address(parsed.hostname): 

792 for proxy_ip in no_proxy: 

793 if is_valid_cidr(proxy_ip): 

794 if address_in_network(parsed.hostname, proxy_ip): 

795 return True 

796 elif parsed.hostname == proxy_ip: 

797 # If no_proxy ip was defined in plain IP notation instead of cidr notation & 

798 # matches the IP of the index 

799 return True 

800 else: 

801 host_with_port = parsed.hostname 

802 if parsed.port: 

803 host_with_port += f":{parsed.port}" 

804 

805 for host in no_proxy: 

806 if parsed.hostname.endswith(host) or host_with_port.endswith(host): 

807 # The URL does match something in no_proxy, so we don't want 

808 # to apply the proxies on this URL. 

809 return True 

810 

811 with set_environ("no_proxy", no_proxy_arg): 

812 # parsed.hostname can be `None` in cases such as a file URI. 

813 try: 

814 bypass = proxy_bypass(parsed.hostname) 

815 except (TypeError, socket.gaierror): 

816 bypass = False 

817 

818 if bypass: 

819 return True 

820 

821 return False 

822 

823 

824def get_environ_proxies(url, no_proxy=None): 

825 """ 

826 Return a dict of environment proxies. 

827 

828 :rtype: dict 

829 """ 

830 if should_bypass_proxies(url, no_proxy=no_proxy): 

831 return {} 

832 else: 

833 return getproxies() 

834 

835 

836def select_proxy(url, proxies): 

837 """Select a proxy for the url, if applicable. 

838 

839 :param url: The url being for the request 

840 :param proxies: A dictionary of schemes or schemes and hosts to proxy URLs 

841 """ 

842 proxies = proxies or {} 

843 urlparts = urlparse(url) 

844 if urlparts.hostname is None: 

845 return proxies.get(urlparts.scheme, proxies.get("all")) 

846 

847 proxy_keys = [ 

848 urlparts.scheme + "://" + urlparts.hostname, 

849 urlparts.scheme, 

850 "all://" + urlparts.hostname, 

851 "all", 

852 ] 

853 proxy = None 

854 for proxy_key in proxy_keys: 

855 if proxy_key in proxies: 

856 proxy = proxies[proxy_key] 

857 break 

858 

859 return proxy 

860 

861 

862def resolve_proxies(request, proxies, trust_env=True): 

863 """This method takes proxy information from a request and configuration 

864 input to resolve a mapping of target proxies. This will consider settings 

865 such a NO_PROXY to strip proxy configurations. 

866 

867 :param request: Request or PreparedRequest 

868 :param proxies: A dictionary of schemes or schemes and hosts to proxy URLs 

869 :param trust_env: Boolean declaring whether to trust environment configs 

870 

871 :rtype: dict 

872 """ 

873 proxies = proxies if proxies is not None else {} 

874 url = request.url 

875 scheme = urlparse(url).scheme 

876 no_proxy = proxies.get("no_proxy") 

877 new_proxies = proxies.copy() 

878 

879 if trust_env and not should_bypass_proxies(url, no_proxy=no_proxy): 

880 environ_proxies = get_environ_proxies(url, no_proxy=no_proxy) 

881 

882 proxy = environ_proxies.get(scheme, environ_proxies.get("all")) 

883 

884 if proxy: 

885 new_proxies.setdefault(scheme, proxy) 

886 return new_proxies 

887 

888 

889def default_user_agent(name="python-requests"): 

890 """ 

891 Return a string representing the default user agent. 

892 

893 :rtype: str 

894 """ 

895 return f"{name}/{__version__}" 

896 

897 

898def default_headers(): 

899 """ 

900 :rtype: requests.structures.CaseInsensitiveDict 

901 """ 

902 return CaseInsensitiveDict( 

903 { 

904 "User-Agent": default_user_agent(), 

905 "Accept-Encoding": DEFAULT_ACCEPT_ENCODING, 

906 "Accept": "*/*", 

907 "Connection": "keep-alive", 

908 } 

909 ) 

910 

911 

912def parse_header_links(value): 

913 """Return a list of parsed link headers proxies. 

914 

915 i.e. Link: <http:/.../front.jpeg>; rel=front; type="image/jpeg",<http://.../back.jpeg>; rel=back;type="image/jpeg" 

916 

917 :rtype: list 

918 """ 

919 

920 links = [] 

921 

922 replace_chars = " '\"" 

923 

924 value = value.strip(replace_chars) 

925 if not value: 

926 return links 

927 

928 for val in re.split(", *<", value): 

929 try: 

930 url, params = val.split(";", 1) 

931 except ValueError: 

932 url, params = val, "" 

933 

934 link = {"url": url.strip("<> '\"")} 

935 

936 for param in params.split(";"): 

937 try: 

938 key, value = param.split("=") 

939 except ValueError: 

940 break 

941 

942 link[key.strip(replace_chars)] = value.strip(replace_chars) 

943 

944 links.append(link) 

945 

946 return links 

947 

948 

949# Null bytes; no need to recreate these on each call to guess_json_utf 

950_null = "\x00".encode("ascii") # encoding to ASCII for Python 3 

951_null2 = _null * 2 

952_null3 = _null * 3 

953 

954 

955def guess_json_utf(data): 

956 """ 

957 :rtype: str 

958 """ 

959 # JSON always starts with two ASCII characters, so detection is as 

960 # easy as counting the nulls and from their location and count 

961 # determine the encoding. Also detect a BOM, if present. 

962 sample = data[:4] 

963 if sample in (codecs.BOM_UTF32_LE, codecs.BOM_UTF32_BE): 

964 return "utf-32" # BOM included 

965 if sample[:3] == codecs.BOM_UTF8: 

966 return "utf-8-sig" # BOM included, MS style (discouraged) 

967 if sample[:2] in (codecs.BOM_UTF16_LE, codecs.BOM_UTF16_BE): 

968 return "utf-16" # BOM included 

969 nullcount = sample.count(_null) 

970 if nullcount == 0: 

971 return "utf-8" 

972 if nullcount == 2: 

973 if sample[::2] == _null2: # 1st and 3rd are null 

974 return "utf-16-be" 

975 if sample[1::2] == _null2: # 2nd and 4th are null 

976 return "utf-16-le" 

977 # Did not detect 2 valid UTF-16 ascii-range characters 

978 if nullcount == 3: 

979 if sample[:3] == _null3: 

980 return "utf-32-be" 

981 if sample[1:] == _null3: 

982 return "utf-32-le" 

983 # Did not detect a valid UTF-32 ascii-range character 

984 return None 

985 

986 

987def prepend_scheme_if_needed(url, new_scheme): 

988 """Given a URL that may or may not have a scheme, prepend the given scheme. 

989 Does not replace a present scheme with the one provided as an argument. 

990 

991 :rtype: str 

992 """ 

993 parsed = parse_url(url) 

994 scheme, auth, host, port, path, query, fragment = parsed 

995 

996 # A defect in urlparse determines that there isn't a netloc present in some 

997 # urls. We previously assumed parsing was overly cautious, and swapped the 

998 # netloc and path. Due to a lack of tests on the original defect, this is 

999 # maintained with parse_url for backwards compatibility. 

1000 netloc = parsed.netloc 

1001 if not netloc: 

1002 netloc, path = path, netloc 

1003 

1004 if auth: 

1005 # parse_url doesn't provide the netloc with auth 

1006 # so we'll add it ourselves. 

1007 netloc = "@".join([auth, netloc]) 

1008 if scheme is None: 

1009 scheme = new_scheme 

1010 if path is None: 

1011 path = "" 

1012 

1013 return urlunparse((scheme, netloc, path, "", query, fragment)) 

1014 

1015 

1016def get_auth_from_url(url): 

1017 """Given a url with authentication components, extract them into a tuple of 

1018 username,password. 

1019 

1020 :rtype: (str,str) 

1021 """ 

1022 parsed = urlparse(url) 

1023 

1024 try: 

1025 auth = (unquote(parsed.username), unquote(parsed.password)) 

1026 except (AttributeError, TypeError): 

1027 auth = ("", "") 

1028 

1029 return auth 

1030 

1031 

1032def check_header_validity(header): 

1033 """Verifies that header parts don't contain leading whitespace 

1034 reserved characters, or return characters. 

1035 

1036 :param header: tuple, in the format (name, value). 

1037 """ 

1038 name, value = header 

1039 _validate_header_part(header, name, 0) 

1040 _validate_header_part(header, value, 1) 

1041 

1042 

1043def _validate_header_part(header, header_part, header_validator_index): 

1044 if isinstance(header_part, str): 

1045 validator = _HEADER_VALIDATORS_STR[header_validator_index] 

1046 elif isinstance(header_part, bytes): 

1047 validator = _HEADER_VALIDATORS_BYTE[header_validator_index] 

1048 else: 

1049 raise InvalidHeader( 

1050 f"Header part ({header_part!r}) from {header} " 

1051 f"must be of type str or bytes, not {type(header_part)}" 

1052 ) 

1053 

1054 if not validator.match(header_part): 

1055 header_kind = "name" if header_validator_index == 0 else "value" 

1056 raise InvalidHeader( 

1057 f"Invalid leading whitespace, reserved character(s), or return" 

1058 f"character(s) in header {header_kind}: {header_part!r}" 

1059 ) 

1060 

1061 

1062def urldefragauth(url): 

1063 """ 

1064 Given a url remove the fragment and the authentication part. 

1065 

1066 :rtype: str 

1067 """ 

1068 scheme, netloc, path, params, query, fragment = urlparse(url) 

1069 

1070 # see func:`prepend_scheme_if_needed` 

1071 if not netloc: 

1072 netloc, path = path, netloc 

1073 

1074 netloc = netloc.rsplit("@", 1)[-1] 

1075 

1076 return urlunparse((scheme, netloc, path, params, query, "")) 

1077 

1078 

1079def rewind_body(prepared_request): 

1080 """Move file pointer back to its recorded starting position 

1081 so it can be read again on redirect. 

1082 """ 

1083 body_seek = getattr(prepared_request.body, "seek", None) 

1084 if body_seek is not None and isinstance( 

1085 prepared_request._body_position, integer_types 

1086 ): 

1087 try: 

1088 body_seek(prepared_request._body_position) 

1089 except OSError: 

1090 raise UnrewindableBodyError( 

1091 "An error occurred when rewinding request body for redirect." 

1092 ) 

1093 else: 

1094 raise UnrewindableBodyError("Unable to rewind request body for redirect.")