Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpx/_utils.py: 29%

259 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:12 +0000

1import codecs 

2import email.message 

3import logging 

4import mimetypes 

5import netrc 

6import os 

7import re 

8import sys 

9import time 

10import typing 

11from pathlib import Path 

12from urllib.request import getproxies 

13 

14import sniffio 

15 

16from ._types import PrimitiveData 

17 

18if typing.TYPE_CHECKING: # pragma: no cover 

19 from ._urls import URL 

20 

21 

22_HTML5_FORM_ENCODING_REPLACEMENTS = {'"': "%22", "\\": "\\\\"} 

23_HTML5_FORM_ENCODING_REPLACEMENTS.update( 

24 {chr(c): "%{:02X}".format(c) for c in range(0x1F + 1) if c != 0x1B} 

25) 

26_HTML5_FORM_ENCODING_RE = re.compile( 

27 r"|".join([re.escape(c) for c in _HTML5_FORM_ENCODING_REPLACEMENTS.keys()]) 

28) 

29 

30 

31def normalize_header_key( 

32 value: typing.Union[str, bytes], 

33 lower: bool, 

34 encoding: typing.Optional[str] = None, 

35) -> bytes: 

36 """ 

37 Coerce str/bytes into a strictly byte-wise HTTP header key. 

38 """ 

39 if isinstance(value, bytes): 

40 bytes_value = value 

41 else: 

42 bytes_value = value.encode(encoding or "ascii") 

43 

44 return bytes_value.lower() if lower else bytes_value 

45 

46 

47def normalize_header_value( 

48 value: typing.Union[str, bytes], encoding: typing.Optional[str] = None 

49) -> bytes: 

50 """ 

51 Coerce str/bytes into a strictly byte-wise HTTP header value. 

52 """ 

53 if isinstance(value, bytes): 

54 return value 

55 return value.encode(encoding or "ascii") 

56 

57 

58def primitive_value_to_str(value: "PrimitiveData") -> str: 

59 """ 

60 Coerce a primitive data type into a string value. 

61 

62 Note that we prefer JSON-style 'true'/'false' for boolean values here. 

63 """ 

64 if value is True: 

65 return "true" 

66 elif value is False: 

67 return "false" 

68 elif value is None: 

69 return "" 

70 return str(value) 

71 

72 

73def is_known_encoding(encoding: str) -> bool: 

74 """ 

75 Return `True` if `encoding` is a known codec. 

76 """ 

77 try: 

78 codecs.lookup(encoding) 

79 except LookupError: 

80 return False 

81 return True 

82 

83 

84def format_form_param(name: str, value: str) -> bytes: 

85 """ 

86 Encode a name/value pair within a multipart form. 

87 """ 

88 

89 def replacer(match: typing.Match[str]) -> str: 

90 return _HTML5_FORM_ENCODING_REPLACEMENTS[match.group(0)] 

91 

92 value = _HTML5_FORM_ENCODING_RE.sub(replacer, value) 

93 return f'{name}="{value}"'.encode() 

94 

95 

96# Null bytes; no need to recreate these on each call to guess_json_utf 

97_null = b"\x00" 

98_null2 = _null * 2 

99_null3 = _null * 3 

100 

101 

102def guess_json_utf(data: bytes) -> typing.Optional[str]: 

103 # JSON always starts with two ASCII characters, so detection is as 

104 # easy as counting the nulls and from their location and count 

105 # determine the encoding. Also detect a BOM, if present. 

106 sample = data[:4] 

107 if sample in (codecs.BOM_UTF32_LE, codecs.BOM_UTF32_BE): 

108 return "utf-32" # BOM included 

109 if sample[:3] == codecs.BOM_UTF8: 

110 return "utf-8-sig" # BOM included, MS style (discouraged) 

111 if sample[:2] in (codecs.BOM_UTF16_LE, codecs.BOM_UTF16_BE): 

112 return "utf-16" # BOM included 

113 nullcount = sample.count(_null) 

114 if nullcount == 0: 

115 return "utf-8" 

116 if nullcount == 2: 

117 if sample[::2] == _null2: # 1st and 3rd are null 

118 return "utf-16-be" 

119 if sample[1::2] == _null2: # 2nd and 4th are null 

120 return "utf-16-le" 

121 # Did not detect 2 valid UTF-16 ascii-range characters 

122 if nullcount == 3: 

123 if sample[:3] == _null3: 

124 return "utf-32-be" 

125 if sample[1:] == _null3: 

126 return "utf-32-le" 

127 # Did not detect a valid UTF-32 ascii-range character 

128 return None 

129 

130 

131class NetRCInfo: 

132 def __init__(self, files: typing.Optional[typing.List[str]] = None) -> None: 

133 if files is None: 

134 files = [os.getenv("NETRC", ""), "~/.netrc", "~/_netrc"] 

135 self.netrc_files = files 

136 

137 @property 

138 def netrc_info(self) -> typing.Optional[netrc.netrc]: 

139 if not hasattr(self, "_netrc_info"): 

140 self._netrc_info = None 

141 for file_path in self.netrc_files: 

142 expanded_path = Path(file_path).expanduser() 

143 try: 

144 if expanded_path.is_file(): 

145 self._netrc_info = netrc.netrc(str(expanded_path)) 

146 break 

147 except (netrc.NetrcParseError, IOError): # pragma: no cover 

148 # Issue while reading the netrc file, ignore... 

149 pass 

150 return self._netrc_info 

151 

152 def get_credentials(self, host: str) -> typing.Optional[typing.Tuple[str, str]]: 

153 if self.netrc_info is None: 

154 return None 

155 

156 auth_info = self.netrc_info.authenticators(host) 

157 if auth_info is None or auth_info[2] is None: 

158 return None 

159 return (auth_info[0], auth_info[2]) 

160 

161 

162def get_ca_bundle_from_env() -> typing.Optional[str]: 

163 if "SSL_CERT_FILE" in os.environ: 

164 ssl_file = Path(os.environ["SSL_CERT_FILE"]) 

165 if ssl_file.is_file(): 

166 return str(ssl_file) 

167 if "SSL_CERT_DIR" in os.environ: 

168 ssl_path = Path(os.environ["SSL_CERT_DIR"]) 

169 if ssl_path.is_dir(): 

170 return str(ssl_path) 

171 return None 

172 

173 

174def parse_header_links(value: str) -> typing.List[typing.Dict[str, str]]: 

175 """ 

176 Returns a list of parsed link headers, for more info see: 

177 https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Link 

178 The generic syntax of those is: 

179 Link: < uri-reference >; param1=value1; param2="value2" 

180 So for instance: 

181 Link; '<http:/.../front.jpeg>; type="image/jpeg",<http://.../back.jpeg>;' 

182 would return 

183 [ 

184 {"url": "http:/.../front.jpeg", "type": "image/jpeg"}, 

185 {"url": "http://.../back.jpeg"}, 

186 ] 

187 :param value: HTTP Link entity-header field 

188 :return: list of parsed link headers 

189 """ 

190 links: typing.List[typing.Dict[str, str]] = [] 

191 replace_chars = " '\"" 

192 value = value.strip(replace_chars) 

193 if not value: 

194 return links 

195 for val in re.split(", *<", value): 

196 try: 

197 url, params = val.split(";", 1) 

198 except ValueError: 

199 url, params = val, "" 

200 link = {"url": url.strip("<> '\"")} 

201 for param in params.split(";"): 

202 try: 

203 key, value = param.split("=") 

204 except ValueError: 

205 break 

206 link[key.strip(replace_chars)] = value.strip(replace_chars) 

207 links.append(link) 

208 return links 

209 

210 

211def parse_content_type_charset(content_type: str) -> typing.Optional[str]: 

212 # We used to use `cgi.parse_header()` here, but `cgi` became a dead battery. 

213 # See: https://peps.python.org/pep-0594/#cgi 

214 msg = email.message.Message() 

215 msg["content-type"] = content_type 

216 return msg.get_content_charset(failobj=None) 

217 

218 

219SENSITIVE_HEADERS = {"authorization", "proxy-authorization"} 

220 

221 

222def obfuscate_sensitive_headers( 

223 items: typing.Iterable[typing.Tuple[typing.AnyStr, typing.AnyStr]] 

224) -> typing.Iterator[typing.Tuple[typing.AnyStr, typing.AnyStr]]: 

225 for k, v in items: 

226 if to_str(k.lower()) in SENSITIVE_HEADERS: 

227 v = to_bytes_or_str("[secure]", match_type_of=v) 

228 yield k, v 

229 

230 

231_LOGGER_INITIALIZED = False 

232TRACE_LOG_LEVEL = 5 

233 

234 

235class Logger(logging.Logger): 

236 # Stub for type checkers. 

237 def trace(self, message: str, *args: typing.Any, **kwargs: typing.Any) -> None: 

238 ... # pragma: no cover 

239 

240 

241def get_logger(name: str) -> Logger: 

242 """ 

243 Get a `logging.Logger` instance, and optionally 

244 set up debug logging based on the HTTPX_LOG_LEVEL environment variable. 

245 """ 

246 global _LOGGER_INITIALIZED 

247 

248 if not _LOGGER_INITIALIZED: 

249 _LOGGER_INITIALIZED = True 

250 logging.addLevelName(TRACE_LOG_LEVEL, "TRACE") 

251 

252 log_level = os.environ.get("HTTPX_LOG_LEVEL", "").upper() 

253 if log_level in ("DEBUG", "TRACE"): 

254 logger = logging.getLogger("httpx") 

255 logger.setLevel(logging.DEBUG if log_level == "DEBUG" else TRACE_LOG_LEVEL) 

256 handler = logging.StreamHandler(sys.stderr) 

257 handler.setFormatter( 

258 logging.Formatter( 

259 fmt="%(levelname)s [%(asctime)s] %(name)s - %(message)s", 

260 datefmt="%Y-%m-%d %H:%M:%S", 

261 ) 

262 ) 

263 logger.addHandler(handler) 

264 

265 logger = logging.getLogger(name) 

266 

267 def trace(message: str, *args: typing.Any, **kwargs: typing.Any) -> None: 

268 logger.log(TRACE_LOG_LEVEL, message, *args, **kwargs) 

269 

270 logger.trace = trace # type: ignore 

271 

272 return typing.cast(Logger, logger) 

273 

274 

275def port_or_default(url: "URL") -> typing.Optional[int]: 

276 if url.port is not None: 

277 return url.port 

278 return {"http": 80, "https": 443}.get(url.scheme) 

279 

280 

281def same_origin(url: "URL", other: "URL") -> bool: 

282 """ 

283 Return 'True' if the given URLs share the same origin. 

284 """ 

285 return ( 

286 url.scheme == other.scheme 

287 and url.host == other.host 

288 and port_or_default(url) == port_or_default(other) 

289 ) 

290 

291 

292def is_https_redirect(url: "URL", location: "URL") -> bool: 

293 """ 

294 Return 'True' if 'location' is a HTTPS upgrade of 'url' 

295 """ 

296 if url.host != location.host: 

297 return False 

298 

299 return ( 

300 url.scheme == "http" 

301 and port_or_default(url) == 80 

302 and location.scheme == "https" 

303 and port_or_default(location) == 443 

304 ) 

305 

306 

307def get_environment_proxies() -> typing.Dict[str, typing.Optional[str]]: 

308 """Gets proxy information from the environment""" 

309 

310 # urllib.request.getproxies() falls back on System 

311 # Registry and Config for proxies on Windows and macOS. 

312 # We don't want to propagate non-HTTP proxies into 

313 # our configuration such as 'TRAVIS_APT_PROXY'. 

314 proxy_info = getproxies() 

315 mounts: typing.Dict[str, typing.Optional[str]] = {} 

316 

317 for scheme in ("http", "https", "all"): 

318 if proxy_info.get(scheme): 

319 hostname = proxy_info[scheme] 

320 mounts[f"{scheme}://"] = ( 

321 hostname if "://" in hostname else f"http://{hostname}" 

322 ) 

323 

324 no_proxy_hosts = [host.strip() for host in proxy_info.get("no", "").split(",")] 

325 for hostname in no_proxy_hosts: 

326 # See https://curl.haxx.se/libcurl/c/CURLOPT_NOPROXY.html for details 

327 # on how names in `NO_PROXY` are handled. 

328 if hostname == "*": 

329 # If NO_PROXY=* is used or if "*" occurs as any one of the comma 

330 # separated hostnames, then we should just bypass any information 

331 # from HTTP_PROXY, HTTPS_PROXY, ALL_PROXY, and always ignore 

332 # proxies. 

333 return {} 

334 elif hostname: 

335 # NO_PROXY=.google.com is marked as "all://*.google.com, 

336 # which disables "www.google.com" but not "google.com" 

337 # NO_PROXY=google.com is marked as "all://*google.com, 

338 # which disables "www.google.com" and "google.com". 

339 # (But not "wwwgoogle.com") 

340 mounts[f"all://*{hostname}"] = None 

341 

342 return mounts 

343 

344 

345def to_bytes(value: typing.Union[str, bytes], encoding: str = "utf-8") -> bytes: 

346 return value.encode(encoding) if isinstance(value, str) else value 

347 

348 

349def to_str(value: typing.Union[str, bytes], encoding: str = "utf-8") -> str: 

350 return value if isinstance(value, str) else value.decode(encoding) 

351 

352 

353def to_bytes_or_str(value: str, match_type_of: typing.AnyStr) -> typing.AnyStr: 

354 return value if isinstance(match_type_of, str) else value.encode() 

355 

356 

357def unquote(value: str) -> str: 

358 return value[1:-1] if value[0] == value[-1] == '"' else value 

359 

360 

361def guess_content_type(filename: typing.Optional[str]) -> typing.Optional[str]: 

362 if filename: 

363 return mimetypes.guess_type(filename)[0] or "application/octet-stream" 

364 return None 

365 

366 

367def peek_filelike_length(stream: typing.Any) -> typing.Optional[int]: 

368 """ 

369 Given a file-like stream object, return its length in number of bytes 

370 without reading it into memory. 

371 """ 

372 try: 

373 # Is it an actual file? 

374 fd = stream.fileno() 

375 # Yup, seems to be an actual file. 

376 length = os.fstat(fd).st_size 

377 except (AttributeError, OSError): 

378 # No... Maybe it's something that supports random access, like `io.BytesIO`? 

379 try: 

380 # Assuming so, go to end of stream to figure out its length, 

381 # then put it back in place. 

382 offset = stream.tell() 

383 length = stream.seek(0, os.SEEK_END) 

384 stream.seek(offset) 

385 except (AttributeError, OSError): 

386 # Not even that? Sorry, we're doomed... 

387 return None 

388 

389 return length 

390 

391 

392class Timer: 

393 async def _get_time(self) -> float: 

394 library = sniffio.current_async_library() 

395 if library == "trio": 

396 import trio 

397 

398 return trio.current_time() 

399 elif library == "curio": # pragma: no cover 

400 import curio 

401 

402 return typing.cast(float, await curio.clock()) 

403 

404 import asyncio 

405 

406 return asyncio.get_event_loop().time() 

407 

408 def sync_start(self) -> None: 

409 self.started = time.perf_counter() 

410 

411 async def async_start(self) -> None: 

412 self.started = await self._get_time() 

413 

414 def sync_elapsed(self) -> float: 

415 now = time.perf_counter() 

416 return now - self.started 

417 

418 async def async_elapsed(self) -> float: 

419 now = await self._get_time() 

420 return now - self.started 

421 

422 

423class URLPattern: 

424 """ 

425 A utility class currently used for making lookups against proxy keys... 

426 

427 # Wildcard matching... 

428 >>> pattern = URLPattern("all") 

429 >>> pattern.matches(httpx.URL("http://example.com")) 

430 True 

431 

432 # Witch scheme matching... 

433 >>> pattern = URLPattern("https") 

434 >>> pattern.matches(httpx.URL("https://example.com")) 

435 True 

436 >>> pattern.matches(httpx.URL("http://example.com")) 

437 False 

438 

439 # With domain matching... 

440 >>> pattern = URLPattern("https://example.com") 

441 >>> pattern.matches(httpx.URL("https://example.com")) 

442 True 

443 >>> pattern.matches(httpx.URL("http://example.com")) 

444 False 

445 >>> pattern.matches(httpx.URL("https://other.com")) 

446 False 

447 

448 # Wildcard scheme, with domain matching... 

449 >>> pattern = URLPattern("all://example.com") 

450 >>> pattern.matches(httpx.URL("https://example.com")) 

451 True 

452 >>> pattern.matches(httpx.URL("http://example.com")) 

453 True 

454 >>> pattern.matches(httpx.URL("https://other.com")) 

455 False 

456 

457 # With port matching... 

458 >>> pattern = URLPattern("https://example.com:1234") 

459 >>> pattern.matches(httpx.URL("https://example.com:1234")) 

460 True 

461 >>> pattern.matches(httpx.URL("https://example.com")) 

462 False 

463 """ 

464 

465 def __init__(self, pattern: str) -> None: 

466 from ._urls import URL 

467 

468 if pattern and ":" not in pattern: 

469 raise ValueError( 

470 f"Proxy keys should use proper URL forms rather " 

471 f"than plain scheme strings. " 

472 f'Instead of "{pattern}", use "{pattern}://"' 

473 ) 

474 

475 url = URL(pattern) 

476 self.pattern = pattern 

477 self.scheme = "" if url.scheme == "all" else url.scheme 

478 self.host = "" if url.host == "*" else url.host 

479 self.port = url.port 

480 if not url.host or url.host == "*": 

481 self.host_regex: typing.Optional[typing.Pattern[str]] = None 

482 elif url.host.startswith("*."): 

483 # *.example.com should match "www.example.com", but not "example.com" 

484 domain = re.escape(url.host[2:]) 

485 self.host_regex = re.compile(f"^.+\\.{domain}$") 

486 elif url.host.startswith("*"): 

487 # *example.com should match "www.example.com" and "example.com" 

488 domain = re.escape(url.host[1:]) 

489 self.host_regex = re.compile(f"^(.+\\.)?{domain}$") 

490 else: 

491 # example.com should match "example.com" but not "www.example.com" 

492 domain = re.escape(url.host) 

493 self.host_regex = re.compile(f"^{domain}$") 

494 

495 def matches(self, other: "URL") -> bool: 

496 if self.scheme and self.scheme != other.scheme: 

497 return False 

498 if ( 

499 self.host 

500 and self.host_regex is not None 

501 and not self.host_regex.match(other.host) 

502 ): 

503 return False 

504 if self.port is not None and self.port != other.port: 

505 return False 

506 return True 

507 

508 @property 

509 def priority(self) -> typing.Tuple[int, int, int]: 

510 """ 

511 The priority allows URLPattern instances to be sortable, so that 

512 we can match from most specific to least specific. 

513 """ 

514 # URLs with a port should take priority over URLs without a port. 

515 port_priority = 0 if self.port is not None else 1 

516 # Longer hostnames should match first. 

517 host_priority = -len(self.host) 

518 # Longer schemes should match first. 

519 scheme_priority = -len(self.scheme) 

520 return (port_priority, host_priority, scheme_priority) 

521 

522 def __hash__(self) -> int: 

523 return hash(self.pattern) 

524 

525 def __lt__(self, other: "URLPattern") -> bool: 

526 return self.priority < other.priority 

527 

528 def __eq__(self, other: typing.Any) -> bool: 

529 return isinstance(other, URLPattern) and self.pattern == other.pattern