Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpx/_utils.py: 25%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

210 statements  

1from __future__ import annotations 

2 

3import codecs 

4import email.message 

5import ipaddress 

6import mimetypes 

7import os 

8import re 

9import time 

10import typing 

11from pathlib import Path 

12from urllib.request import getproxies 

13 

14import sniffio 

15 

16from ._types import PrimitiveData 

17 

18if typing.TYPE_CHECKING: # pragma: no cover 

19 from ._urls import URL 

20 

21 

22_HTML5_FORM_ENCODING_REPLACEMENTS = {'"': "%22", "\\": "\\\\"} 

23_HTML5_FORM_ENCODING_REPLACEMENTS.update( 

24 {chr(c): "%{:02X}".format(c) for c in range(0x1F + 1) if c != 0x1B} 

25) 

26_HTML5_FORM_ENCODING_RE = re.compile( 

27 r"|".join([re.escape(c) for c in _HTML5_FORM_ENCODING_REPLACEMENTS.keys()]) 

28) 

29 

30 

31def normalize_header_key( 

32 value: str | bytes, 

33 lower: bool, 

34 encoding: str | None = None, 

35) -> bytes: 

36 """ 

37 Coerce str/bytes into a strictly byte-wise HTTP header key. 

38 """ 

39 if isinstance(value, bytes): 

40 bytes_value = value 

41 else: 

42 bytes_value = value.encode(encoding or "ascii") 

43 

44 return bytes_value.lower() if lower else bytes_value 

45 

46 

47def normalize_header_value(value: str | bytes, encoding: str | None = None) -> bytes: 

48 """ 

49 Coerce str/bytes into a strictly byte-wise HTTP header value. 

50 """ 

51 if isinstance(value, bytes): 

52 return value 

53 return value.encode(encoding or "ascii") 

54 

55 

56def primitive_value_to_str(value: PrimitiveData) -> str: 

57 """ 

58 Coerce a primitive data type into a string value. 

59 

60 Note that we prefer JSON-style 'true'/'false' for boolean values here. 

61 """ 

62 if value is True: 

63 return "true" 

64 elif value is False: 

65 return "false" 

66 elif value is None: 

67 return "" 

68 return str(value) 

69 

70 

71def is_known_encoding(encoding: str) -> bool: 

72 """ 

73 Return `True` if `encoding` is a known codec. 

74 """ 

75 try: 

76 codecs.lookup(encoding) 

77 except LookupError: 

78 return False 

79 return True 

80 

81 

82def format_form_param(name: str, value: str) -> bytes: 

83 """ 

84 Encode a name/value pair within a multipart form. 

85 """ 

86 

87 def replacer(match: typing.Match[str]) -> str: 

88 return _HTML5_FORM_ENCODING_REPLACEMENTS[match.group(0)] 

89 

90 value = _HTML5_FORM_ENCODING_RE.sub(replacer, value) 

91 return f'{name}="{value}"'.encode() 

92 

93 

94def get_ca_bundle_from_env() -> str | None: 

95 if "SSL_CERT_FILE" in os.environ: 

96 ssl_file = Path(os.environ["SSL_CERT_FILE"]) 

97 if ssl_file.is_file(): 

98 return str(ssl_file) 

99 if "SSL_CERT_DIR" in os.environ: 

100 ssl_path = Path(os.environ["SSL_CERT_DIR"]) 

101 if ssl_path.is_dir(): 

102 return str(ssl_path) 

103 return None 

104 

105 

106def parse_header_links(value: str) -> list[dict[str, str]]: 

107 """ 

108 Returns a list of parsed link headers, for more info see: 

109 https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Link 

110 The generic syntax of those is: 

111 Link: < uri-reference >; param1=value1; param2="value2" 

112 So for instance: 

113 Link; '<http:/.../front.jpeg>; type="image/jpeg",<http://.../back.jpeg>;' 

114 would return 

115 [ 

116 {"url": "http:/.../front.jpeg", "type": "image/jpeg"}, 

117 {"url": "http://.../back.jpeg"}, 

118 ] 

119 :param value: HTTP Link entity-header field 

120 :return: list of parsed link headers 

121 """ 

122 links: list[dict[str, str]] = [] 

123 replace_chars = " '\"" 

124 value = value.strip(replace_chars) 

125 if not value: 

126 return links 

127 for val in re.split(", *<", value): 

128 try: 

129 url, params = val.split(";", 1) 

130 except ValueError: 

131 url, params = val, "" 

132 link = {"url": url.strip("<> '\"")} 

133 for param in params.split(";"): 

134 try: 

135 key, value = param.split("=") 

136 except ValueError: 

137 break 

138 link[key.strip(replace_chars)] = value.strip(replace_chars) 

139 links.append(link) 

140 return links 

141 

142 

143def parse_content_type_charset(content_type: str) -> str | None: 

144 # We used to use `cgi.parse_header()` here, but `cgi` became a dead battery. 

145 # See: https://peps.python.org/pep-0594/#cgi 

146 msg = email.message.Message() 

147 msg["content-type"] = content_type 

148 return msg.get_content_charset(failobj=None) 

149 

150 

151SENSITIVE_HEADERS = {"authorization", "proxy-authorization"} 

152 

153 

154def obfuscate_sensitive_headers( 

155 items: typing.Iterable[tuple[typing.AnyStr, typing.AnyStr]], 

156) -> typing.Iterator[tuple[typing.AnyStr, typing.AnyStr]]: 

157 for k, v in items: 

158 if to_str(k.lower()) in SENSITIVE_HEADERS: 

159 v = to_bytes_or_str("[secure]", match_type_of=v) 

160 yield k, v 

161 

162 

163def port_or_default(url: URL) -> int | None: 

164 if url.port is not None: 

165 return url.port 

166 return {"http": 80, "https": 443}.get(url.scheme) 

167 

168 

169def same_origin(url: URL, other: URL) -> bool: 

170 """ 

171 Return 'True' if the given URLs share the same origin. 

172 """ 

173 return ( 

174 url.scheme == other.scheme 

175 and url.host == other.host 

176 and port_or_default(url) == port_or_default(other) 

177 ) 

178 

179 

180def is_https_redirect(url: URL, location: URL) -> bool: 

181 """ 

182 Return 'True' if 'location' is a HTTPS upgrade of 'url' 

183 """ 

184 if url.host != location.host: 

185 return False 

186 

187 return ( 

188 url.scheme == "http" 

189 and port_or_default(url) == 80 

190 and location.scheme == "https" 

191 and port_or_default(location) == 443 

192 ) 

193 

194 

195def get_environment_proxies() -> dict[str, str | None]: 

196 """Gets proxy information from the environment""" 

197 

198 # urllib.request.getproxies() falls back on System 

199 # Registry and Config for proxies on Windows and macOS. 

200 # We don't want to propagate non-HTTP proxies into 

201 # our configuration such as 'TRAVIS_APT_PROXY'. 

202 proxy_info = getproxies() 

203 mounts: dict[str, str | None] = {} 

204 

205 for scheme in ("http", "https", "all"): 

206 if proxy_info.get(scheme): 

207 hostname = proxy_info[scheme] 

208 mounts[f"{scheme}://"] = ( 

209 hostname if "://" in hostname else f"http://{hostname}" 

210 ) 

211 

212 no_proxy_hosts = [host.strip() for host in proxy_info.get("no", "").split(",")] 

213 for hostname in no_proxy_hosts: 

214 # See https://curl.haxx.se/libcurl/c/CURLOPT_NOPROXY.html for details 

215 # on how names in `NO_PROXY` are handled. 

216 if hostname == "*": 

217 # If NO_PROXY=* is used or if "*" occurs as any one of the comma 

218 # separated hostnames, then we should just bypass any information 

219 # from HTTP_PROXY, HTTPS_PROXY, ALL_PROXY, and always ignore 

220 # proxies. 

221 return {} 

222 elif hostname: 

223 # NO_PROXY=.google.com is marked as "all://*.google.com, 

224 # which disables "www.google.com" but not "google.com" 

225 # NO_PROXY=google.com is marked as "all://*google.com, 

226 # which disables "www.google.com" and "google.com". 

227 # (But not "wwwgoogle.com") 

228 # NO_PROXY can include domains, IPv6, IPv4 addresses and "localhost" 

229 # NO_PROXY=example.com,::1,localhost,192.168.0.0/16 

230 if "://" in hostname: 

231 mounts[hostname] = None 

232 elif is_ipv4_hostname(hostname): 

233 mounts[f"all://{hostname}"] = None 

234 elif is_ipv6_hostname(hostname): 

235 mounts[f"all://[{hostname}]"] = None 

236 elif hostname.lower() == "localhost": 

237 mounts[f"all://{hostname}"] = None 

238 else: 

239 mounts[f"all://*{hostname}"] = None 

240 

241 return mounts 

242 

243 

244def to_bytes(value: str | bytes, encoding: str = "utf-8") -> bytes: 

245 return value.encode(encoding) if isinstance(value, str) else value 

246 

247 

248def to_str(value: str | bytes, encoding: str = "utf-8") -> str: 

249 return value if isinstance(value, str) else value.decode(encoding) 

250 

251 

252def to_bytes_or_str(value: str, match_type_of: typing.AnyStr) -> typing.AnyStr: 

253 return value if isinstance(match_type_of, str) else value.encode() 

254 

255 

256def unquote(value: str) -> str: 

257 return value[1:-1] if value[0] == value[-1] == '"' else value 

258 

259 

260def guess_content_type(filename: str | None) -> str | None: 

261 if filename: 

262 return mimetypes.guess_type(filename)[0] or "application/octet-stream" 

263 return None 

264 

265 

266def peek_filelike_length(stream: typing.Any) -> int | None: 

267 """ 

268 Given a file-like stream object, return its length in number of bytes 

269 without reading it into memory. 

270 """ 

271 try: 

272 # Is it an actual file? 

273 fd = stream.fileno() 

274 # Yup, seems to be an actual file. 

275 length = os.fstat(fd).st_size 

276 except (AttributeError, OSError): 

277 # No... Maybe it's something that supports random access, like `io.BytesIO`? 

278 try: 

279 # Assuming so, go to end of stream to figure out its length, 

280 # then put it back in place. 

281 offset = stream.tell() 

282 length = stream.seek(0, os.SEEK_END) 

283 stream.seek(offset) 

284 except (AttributeError, OSError): 

285 # Not even that? Sorry, we're doomed... 

286 return None 

287 

288 return length 

289 

290 

291class Timer: 

292 async def _get_time(self) -> float: 

293 library = sniffio.current_async_library() 

294 if library == "trio": 

295 import trio 

296 

297 return trio.current_time() 

298 else: 

299 import asyncio 

300 

301 return asyncio.get_event_loop().time() 

302 

303 def sync_start(self) -> None: 

304 self.started = time.perf_counter() 

305 

306 async def async_start(self) -> None: 

307 self.started = await self._get_time() 

308 

309 def sync_elapsed(self) -> float: 

310 now = time.perf_counter() 

311 return now - self.started 

312 

313 async def async_elapsed(self) -> float: 

314 now = await self._get_time() 

315 return now - self.started 

316 

317 

318class URLPattern: 

319 """ 

320 A utility class currently used for making lookups against proxy keys... 

321 

322 # Wildcard matching... 

323 >>> pattern = URLPattern("all://") 

324 >>> pattern.matches(httpx.URL("http://example.com")) 

325 True 

326 

327 # Witch scheme matching... 

328 >>> pattern = URLPattern("https://") 

329 >>> pattern.matches(httpx.URL("https://example.com")) 

330 True 

331 >>> pattern.matches(httpx.URL("http://example.com")) 

332 False 

333 

334 # With domain matching... 

335 >>> pattern = URLPattern("https://example.com") 

336 >>> pattern.matches(httpx.URL("https://example.com")) 

337 True 

338 >>> pattern.matches(httpx.URL("http://example.com")) 

339 False 

340 >>> pattern.matches(httpx.URL("https://other.com")) 

341 False 

342 

343 # Wildcard scheme, with domain matching... 

344 >>> pattern = URLPattern("all://example.com") 

345 >>> pattern.matches(httpx.URL("https://example.com")) 

346 True 

347 >>> pattern.matches(httpx.URL("http://example.com")) 

348 True 

349 >>> pattern.matches(httpx.URL("https://other.com")) 

350 False 

351 

352 # With port matching... 

353 >>> pattern = URLPattern("https://example.com:1234") 

354 >>> pattern.matches(httpx.URL("https://example.com:1234")) 

355 True 

356 >>> pattern.matches(httpx.URL("https://example.com")) 

357 False 

358 """ 

359 

360 def __init__(self, pattern: str) -> None: 

361 from ._urls import URL 

362 

363 if pattern and ":" not in pattern: 

364 raise ValueError( 

365 f"Proxy keys should use proper URL forms rather " 

366 f"than plain scheme strings. " 

367 f'Instead of "{pattern}", use "{pattern}://"' 

368 ) 

369 

370 url = URL(pattern) 

371 self.pattern = pattern 

372 self.scheme = "" if url.scheme == "all" else url.scheme 

373 self.host = "" if url.host == "*" else url.host 

374 self.port = url.port 

375 if not url.host or url.host == "*": 

376 self.host_regex: typing.Pattern[str] | None = None 

377 elif url.host.startswith("*."): 

378 # *.example.com should match "www.example.com", but not "example.com" 

379 domain = re.escape(url.host[2:]) 

380 self.host_regex = re.compile(f"^.+\\.{domain}$") 

381 elif url.host.startswith("*"): 

382 # *example.com should match "www.example.com" and "example.com" 

383 domain = re.escape(url.host[1:]) 

384 self.host_regex = re.compile(f"^(.+\\.)?{domain}$") 

385 else: 

386 # example.com should match "example.com" but not "www.example.com" 

387 domain = re.escape(url.host) 

388 self.host_regex = re.compile(f"^{domain}$") 

389 

390 def matches(self, other: URL) -> bool: 

391 if self.scheme and self.scheme != other.scheme: 

392 return False 

393 if ( 

394 self.host 

395 and self.host_regex is not None 

396 and not self.host_regex.match(other.host) 

397 ): 

398 return False 

399 if self.port is not None and self.port != other.port: 

400 return False 

401 return True 

402 

403 @property 

404 def priority(self) -> tuple[int, int, int]: 

405 """ 

406 The priority allows URLPattern instances to be sortable, so that 

407 we can match from most specific to least specific. 

408 """ 

409 # URLs with a port should take priority over URLs without a port. 

410 port_priority = 0 if self.port is not None else 1 

411 # Longer hostnames should match first. 

412 host_priority = -len(self.host) 

413 # Longer schemes should match first. 

414 scheme_priority = -len(self.scheme) 

415 return (port_priority, host_priority, scheme_priority) 

416 

417 def __hash__(self) -> int: 

418 return hash(self.pattern) 

419 

420 def __lt__(self, other: URLPattern) -> bool: 

421 return self.priority < other.priority 

422 

423 def __eq__(self, other: typing.Any) -> bool: 

424 return isinstance(other, URLPattern) and self.pattern == other.pattern 

425 

426 

427def is_ipv4_hostname(hostname: str) -> bool: 

428 try: 

429 ipaddress.IPv4Address(hostname.split("/")[0]) 

430 except Exception: 

431 return False 

432 return True 

433 

434 

435def is_ipv6_hostname(hostname: str) -> bool: 

436 try: 

437 ipaddress.IPv6Address(hostname.split("/")[0]) 

438 except Exception: 

439 return False 

440 return True