Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpx/_utils.py: 24%

232 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 07:19 +0000

1import codecs 

2import email.message 

3import ipaddress 

4import mimetypes 

5import os 

6import re 

7import time 

8import typing 

9from pathlib import Path 

10from urllib.request import getproxies 

11 

12import sniffio 

13 

14from ._types import PrimitiveData 

15 

16if typing.TYPE_CHECKING: # pragma: no cover 

17 from ._urls import URL 

18 

19 

20_HTML5_FORM_ENCODING_REPLACEMENTS = {'"': "%22", "\\": "\\\\"} 

21_HTML5_FORM_ENCODING_REPLACEMENTS.update( 

22 {chr(c): "%{:02X}".format(c) for c in range(0x1F + 1) if c != 0x1B} 

23) 

24_HTML5_FORM_ENCODING_RE = re.compile( 

25 r"|".join([re.escape(c) for c in _HTML5_FORM_ENCODING_REPLACEMENTS.keys()]) 

26) 

27 

28 

29def normalize_header_key( 

30 value: typing.Union[str, bytes], 

31 lower: bool, 

32 encoding: typing.Optional[str] = None, 

33) -> bytes: 

34 """ 

35 Coerce str/bytes into a strictly byte-wise HTTP header key. 

36 """ 

37 if isinstance(value, bytes): 

38 bytes_value = value 

39 else: 

40 bytes_value = value.encode(encoding or "ascii") 

41 

42 return bytes_value.lower() if lower else bytes_value 

43 

44 

45def normalize_header_value( 

46 value: typing.Union[str, bytes], encoding: typing.Optional[str] = None 

47) -> bytes: 

48 """ 

49 Coerce str/bytes into a strictly byte-wise HTTP header value. 

50 """ 

51 if isinstance(value, bytes): 

52 return value 

53 return value.encode(encoding or "ascii") 

54 

55 

56def primitive_value_to_str(value: "PrimitiveData") -> str: 

57 """ 

58 Coerce a primitive data type into a string value. 

59 

60 Note that we prefer JSON-style 'true'/'false' for boolean values here. 

61 """ 

62 if value is True: 

63 return "true" 

64 elif value is False: 

65 return "false" 

66 elif value is None: 

67 return "" 

68 return str(value) 

69 

70 

71def is_known_encoding(encoding: str) -> bool: 

72 """ 

73 Return `True` if `encoding` is a known codec. 

74 """ 

75 try: 

76 codecs.lookup(encoding) 

77 except LookupError: 

78 return False 

79 return True 

80 

81 

82def format_form_param(name: str, value: str) -> bytes: 

83 """ 

84 Encode a name/value pair within a multipart form. 

85 """ 

86 

87 def replacer(match: typing.Match[str]) -> str: 

88 return _HTML5_FORM_ENCODING_REPLACEMENTS[match.group(0)] 

89 

90 value = _HTML5_FORM_ENCODING_RE.sub(replacer, value) 

91 return f'{name}="{value}"'.encode() 

92 

93 

94# Null bytes; no need to recreate these on each call to guess_json_utf 

95_null = b"\x00" 

96_null2 = _null * 2 

97_null3 = _null * 3 

98 

99 

100def guess_json_utf(data: bytes) -> typing.Optional[str]: 

101 # JSON always starts with two ASCII characters, so detection is as 

102 # easy as counting the nulls and from their location and count 

103 # determine the encoding. Also detect a BOM, if present. 

104 sample = data[:4] 

105 if sample in (codecs.BOM_UTF32_LE, codecs.BOM_UTF32_BE): 

106 return "utf-32" # BOM included 

107 if sample[:3] == codecs.BOM_UTF8: 

108 return "utf-8-sig" # BOM included, MS style (discouraged) 

109 if sample[:2] in (codecs.BOM_UTF16_LE, codecs.BOM_UTF16_BE): 

110 return "utf-16" # BOM included 

111 nullcount = sample.count(_null) 

112 if nullcount == 0: 

113 return "utf-8" 

114 if nullcount == 2: 

115 if sample[::2] == _null2: # 1st and 3rd are null 

116 return "utf-16-be" 

117 if sample[1::2] == _null2: # 2nd and 4th are null 

118 return "utf-16-le" 

119 # Did not detect 2 valid UTF-16 ascii-range characters 

120 if nullcount == 3: 

121 if sample[:3] == _null3: 

122 return "utf-32-be" 

123 if sample[1:] == _null3: 

124 return "utf-32-le" 

125 # Did not detect a valid UTF-32 ascii-range character 

126 return None 

127 

128 

129def get_ca_bundle_from_env() -> typing.Optional[str]: 

130 if "SSL_CERT_FILE" in os.environ: 

131 ssl_file = Path(os.environ["SSL_CERT_FILE"]) 

132 if ssl_file.is_file(): 

133 return str(ssl_file) 

134 if "SSL_CERT_DIR" in os.environ: 

135 ssl_path = Path(os.environ["SSL_CERT_DIR"]) 

136 if ssl_path.is_dir(): 

137 return str(ssl_path) 

138 return None 

139 

140 

141def parse_header_links(value: str) -> typing.List[typing.Dict[str, str]]: 

142 """ 

143 Returns a list of parsed link headers, for more info see: 

144 https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Link 

145 The generic syntax of those is: 

146 Link: < uri-reference >; param1=value1; param2="value2" 

147 So for instance: 

148 Link; '<http:/.../front.jpeg>; type="image/jpeg",<http://.../back.jpeg>;' 

149 would return 

150 [ 

151 {"url": "http:/.../front.jpeg", "type": "image/jpeg"}, 

152 {"url": "http://.../back.jpeg"}, 

153 ] 

154 :param value: HTTP Link entity-header field 

155 :return: list of parsed link headers 

156 """ 

157 links: typing.List[typing.Dict[str, str]] = [] 

158 replace_chars = " '\"" 

159 value = value.strip(replace_chars) 

160 if not value: 

161 return links 

162 for val in re.split(", *<", value): 

163 try: 

164 url, params = val.split(";", 1) 

165 except ValueError: 

166 url, params = val, "" 

167 link = {"url": url.strip("<> '\"")} 

168 for param in params.split(";"): 

169 try: 

170 key, value = param.split("=") 

171 except ValueError: 

172 break 

173 link[key.strip(replace_chars)] = value.strip(replace_chars) 

174 links.append(link) 

175 return links 

176 

177 

178def parse_content_type_charset(content_type: str) -> typing.Optional[str]: 

179 # We used to use `cgi.parse_header()` here, but `cgi` became a dead battery. 

180 # See: https://peps.python.org/pep-0594/#cgi 

181 msg = email.message.Message() 

182 msg["content-type"] = content_type 

183 return msg.get_content_charset(failobj=None) 

184 

185 

186SENSITIVE_HEADERS = {"authorization", "proxy-authorization"} 

187 

188 

189def obfuscate_sensitive_headers( 

190 items: typing.Iterable[typing.Tuple[typing.AnyStr, typing.AnyStr]] 

191) -> typing.Iterator[typing.Tuple[typing.AnyStr, typing.AnyStr]]: 

192 for k, v in items: 

193 if to_str(k.lower()) in SENSITIVE_HEADERS: 

194 v = to_bytes_or_str("[secure]", match_type_of=v) 

195 yield k, v 

196 

197 

198def port_or_default(url: "URL") -> typing.Optional[int]: 

199 if url.port is not None: 

200 return url.port 

201 return {"http": 80, "https": 443}.get(url.scheme) 

202 

203 

204def same_origin(url: "URL", other: "URL") -> bool: 

205 """ 

206 Return 'True' if the given URLs share the same origin. 

207 """ 

208 return ( 

209 url.scheme == other.scheme 

210 and url.host == other.host 

211 and port_or_default(url) == port_or_default(other) 

212 ) 

213 

214 

215def is_https_redirect(url: "URL", location: "URL") -> bool: 

216 """ 

217 Return 'True' if 'location' is a HTTPS upgrade of 'url' 

218 """ 

219 if url.host != location.host: 

220 return False 

221 

222 return ( 

223 url.scheme == "http" 

224 and port_or_default(url) == 80 

225 and location.scheme == "https" 

226 and port_or_default(location) == 443 

227 ) 

228 

229 

230def get_environment_proxies() -> typing.Dict[str, typing.Optional[str]]: 

231 """Gets proxy information from the environment""" 

232 

233 # urllib.request.getproxies() falls back on System 

234 # Registry and Config for proxies on Windows and macOS. 

235 # We don't want to propagate non-HTTP proxies into 

236 # our configuration such as 'TRAVIS_APT_PROXY'. 

237 proxy_info = getproxies() 

238 mounts: typing.Dict[str, typing.Optional[str]] = {} 

239 

240 for scheme in ("http", "https", "all"): 

241 if proxy_info.get(scheme): 

242 hostname = proxy_info[scheme] 

243 mounts[f"{scheme}://"] = ( 

244 hostname if "://" in hostname else f"http://{hostname}" 

245 ) 

246 

247 no_proxy_hosts = [host.strip() for host in proxy_info.get("no", "").split(",")] 

248 for hostname in no_proxy_hosts: 

249 # See https://curl.haxx.se/libcurl/c/CURLOPT_NOPROXY.html for details 

250 # on how names in `NO_PROXY` are handled. 

251 if hostname == "*": 

252 # If NO_PROXY=* is used or if "*" occurs as any one of the comma 

253 # separated hostnames, then we should just bypass any information 

254 # from HTTP_PROXY, HTTPS_PROXY, ALL_PROXY, and always ignore 

255 # proxies. 

256 return {} 

257 elif hostname: 

258 # NO_PROXY=.google.com is marked as "all://*.google.com, 

259 # which disables "www.google.com" but not "google.com" 

260 # NO_PROXY=google.com is marked as "all://*google.com, 

261 # which disables "www.google.com" and "google.com". 

262 # (But not "wwwgoogle.com") 

263 # NO_PROXY can include domains, IPv6, IPv4 addresses and "localhost" 

264 # NO_PROXY=example.com,::1,localhost,192.168.0.0/16 

265 if is_ipv4_hostname(hostname): 

266 mounts[f"all://{hostname}"] = None 

267 elif is_ipv6_hostname(hostname): 

268 mounts[f"all://[{hostname}]"] = None 

269 elif hostname.lower() == "localhost": 

270 mounts[f"all://{hostname}"] = None 

271 else: 

272 mounts[f"all://*{hostname}"] = None 

273 

274 return mounts 

275 

276 

277def to_bytes(value: typing.Union[str, bytes], encoding: str = "utf-8") -> bytes: 

278 return value.encode(encoding) if isinstance(value, str) else value 

279 

280 

281def to_str(value: typing.Union[str, bytes], encoding: str = "utf-8") -> str: 

282 return value if isinstance(value, str) else value.decode(encoding) 

283 

284 

285def to_bytes_or_str(value: str, match_type_of: typing.AnyStr) -> typing.AnyStr: 

286 return value if isinstance(match_type_of, str) else value.encode() 

287 

288 

289def unquote(value: str) -> str: 

290 return value[1:-1] if value[0] == value[-1] == '"' else value 

291 

292 

293def guess_content_type(filename: typing.Optional[str]) -> typing.Optional[str]: 

294 if filename: 

295 return mimetypes.guess_type(filename)[0] or "application/octet-stream" 

296 return None 

297 

298 

299def peek_filelike_length(stream: typing.Any) -> typing.Optional[int]: 

300 """ 

301 Given a file-like stream object, return its length in number of bytes 

302 without reading it into memory. 

303 """ 

304 try: 

305 # Is it an actual file? 

306 fd = stream.fileno() 

307 # Yup, seems to be an actual file. 

308 length = os.fstat(fd).st_size 

309 except (AttributeError, OSError): 

310 # No... Maybe it's something that supports random access, like `io.BytesIO`? 

311 try: 

312 # Assuming so, go to end of stream to figure out its length, 

313 # then put it back in place. 

314 offset = stream.tell() 

315 length = stream.seek(0, os.SEEK_END) 

316 stream.seek(offset) 

317 except (AttributeError, OSError): 

318 # Not even that? Sorry, we're doomed... 

319 return None 

320 

321 return length 

322 

323 

324class Timer: 

325 async def _get_time(self) -> float: 

326 library = sniffio.current_async_library() 

327 if library == "trio": 

328 import trio 

329 

330 return trio.current_time() 

331 elif library == "curio": # pragma: no cover 

332 import curio 

333 

334 return typing.cast(float, await curio.clock()) 

335 

336 import asyncio 

337 

338 return asyncio.get_event_loop().time() 

339 

340 def sync_start(self) -> None: 

341 self.started = time.perf_counter() 

342 

343 async def async_start(self) -> None: 

344 self.started = await self._get_time() 

345 

346 def sync_elapsed(self) -> float: 

347 now = time.perf_counter() 

348 return now - self.started 

349 

350 async def async_elapsed(self) -> float: 

351 now = await self._get_time() 

352 return now - self.started 

353 

354 

355class URLPattern: 

356 """ 

357 A utility class currently used for making lookups against proxy keys... 

358 

359 # Wildcard matching... 

360 >>> pattern = URLPattern("all") 

361 >>> pattern.matches(httpx.URL("http://example.com")) 

362 True 

363 

364 # Witch scheme matching... 

365 >>> pattern = URLPattern("https") 

366 >>> pattern.matches(httpx.URL("https://example.com")) 

367 True 

368 >>> pattern.matches(httpx.URL("http://example.com")) 

369 False 

370 

371 # With domain matching... 

372 >>> pattern = URLPattern("https://example.com") 

373 >>> pattern.matches(httpx.URL("https://example.com")) 

374 True 

375 >>> pattern.matches(httpx.URL("http://example.com")) 

376 False 

377 >>> pattern.matches(httpx.URL("https://other.com")) 

378 False 

379 

380 # Wildcard scheme, with domain matching... 

381 >>> pattern = URLPattern("all://example.com") 

382 >>> pattern.matches(httpx.URL("https://example.com")) 

383 True 

384 >>> pattern.matches(httpx.URL("http://example.com")) 

385 True 

386 >>> pattern.matches(httpx.URL("https://other.com")) 

387 False 

388 

389 # With port matching... 

390 >>> pattern = URLPattern("https://example.com:1234") 

391 >>> pattern.matches(httpx.URL("https://example.com:1234")) 

392 True 

393 >>> pattern.matches(httpx.URL("https://example.com")) 

394 False 

395 """ 

396 

397 def __init__(self, pattern: str) -> None: 

398 from ._urls import URL 

399 

400 if pattern and ":" not in pattern: 

401 raise ValueError( 

402 f"Proxy keys should use proper URL forms rather " 

403 f"than plain scheme strings. " 

404 f'Instead of "{pattern}", use "{pattern}://"' 

405 ) 

406 

407 url = URL(pattern) 

408 self.pattern = pattern 

409 self.scheme = "" if url.scheme == "all" else url.scheme 

410 self.host = "" if url.host == "*" else url.host 

411 self.port = url.port 

412 if not url.host or url.host == "*": 

413 self.host_regex: typing.Optional[typing.Pattern[str]] = None 

414 elif url.host.startswith("*."): 

415 # *.example.com should match "www.example.com", but not "example.com" 

416 domain = re.escape(url.host[2:]) 

417 self.host_regex = re.compile(f"^.+\\.{domain}$") 

418 elif url.host.startswith("*"): 

419 # *example.com should match "www.example.com" and "example.com" 

420 domain = re.escape(url.host[1:]) 

421 self.host_regex = re.compile(f"^(.+\\.)?{domain}$") 

422 else: 

423 # example.com should match "example.com" but not "www.example.com" 

424 domain = re.escape(url.host) 

425 self.host_regex = re.compile(f"^{domain}$") 

426 

427 def matches(self, other: "URL") -> bool: 

428 if self.scheme and self.scheme != other.scheme: 

429 return False 

430 if ( 

431 self.host 

432 and self.host_regex is not None 

433 and not self.host_regex.match(other.host) 

434 ): 

435 return False 

436 if self.port is not None and self.port != other.port: 

437 return False 

438 return True 

439 

440 @property 

441 def priority(self) -> typing.Tuple[int, int, int]: 

442 """ 

443 The priority allows URLPattern instances to be sortable, so that 

444 we can match from most specific to least specific. 

445 """ 

446 # URLs with a port should take priority over URLs without a port. 

447 port_priority = 0 if self.port is not None else 1 

448 # Longer hostnames should match first. 

449 host_priority = -len(self.host) 

450 # Longer schemes should match first. 

451 scheme_priority = -len(self.scheme) 

452 return (port_priority, host_priority, scheme_priority) 

453 

454 def __hash__(self) -> int: 

455 return hash(self.pattern) 

456 

457 def __lt__(self, other: "URLPattern") -> bool: 

458 return self.priority < other.priority 

459 

460 def __eq__(self, other: typing.Any) -> bool: 

461 return isinstance(other, URLPattern) and self.pattern == other.pattern 

462 

463 

464def is_ipv4_hostname(hostname: str) -> bool: 

465 try: 

466 ipaddress.IPv4Address(hostname.split("/")[0]) 

467 except Exception: 

468 return False 

469 return True 

470 

471 

472def is_ipv6_hostname(hostname: str) -> bool: 

473 try: 

474 ipaddress.IPv6Address(hostname.split("/")[0]) 

475 except Exception: 

476 return False 

477 return True