Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpx/_utils.py: 25%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

212 statements  

1from __future__ import annotations 

2 

3import codecs 

4import email.message 

5import ipaddress 

6import mimetypes 

7import os 

8import re 

9import time 

10import typing 

11from pathlib import Path 

12from urllib.request import getproxies 

13 

14import sniffio 

15 

16from ._types import PrimitiveData 

17 

18if typing.TYPE_CHECKING: # pragma: no cover 

19 from ._urls import URL 

20 

21 

22_HTML5_FORM_ENCODING_REPLACEMENTS = {'"': "%22", "\\": "\\\\"} 

23_HTML5_FORM_ENCODING_REPLACEMENTS.update( 

24 {chr(c): "%{:02X}".format(c) for c in range(0x1F + 1) if c != 0x1B} 

25) 

26_HTML5_FORM_ENCODING_RE = re.compile( 

27 r"|".join([re.escape(c) for c in _HTML5_FORM_ENCODING_REPLACEMENTS.keys()]) 

28) 

29 

30 

31def normalize_header_key( 

32 value: str | bytes, 

33 lower: bool, 

34 encoding: str | None = None, 

35) -> bytes: 

36 """ 

37 Coerce str/bytes into a strictly byte-wise HTTP header key. 

38 """ 

39 if isinstance(value, bytes): 

40 bytes_value = value 

41 else: 

42 bytes_value = value.encode(encoding or "ascii") 

43 

44 return bytes_value.lower() if lower else bytes_value 

45 

46 

47def normalize_header_value(value: str | bytes, encoding: str | None = None) -> bytes: 

48 """ 

49 Coerce str/bytes into a strictly byte-wise HTTP header value. 

50 """ 

51 if isinstance(value, bytes): 

52 return value 

53 if not isinstance(value, str): 

54 raise TypeError(f"Header value must be str or bytes, not {type(value)}") 

55 return value.encode(encoding or "ascii") 

56 

57 

58def primitive_value_to_str(value: PrimitiveData) -> str: 

59 """ 

60 Coerce a primitive data type into a string value. 

61 

62 Note that we prefer JSON-style 'true'/'false' for boolean values here. 

63 """ 

64 if value is True: 

65 return "true" 

66 elif value is False: 

67 return "false" 

68 elif value is None: 

69 return "" 

70 return str(value) 

71 

72 

73def is_known_encoding(encoding: str) -> bool: 

74 """ 

75 Return `True` if `encoding` is a known codec. 

76 """ 

77 try: 

78 codecs.lookup(encoding) 

79 except LookupError: 

80 return False 

81 return True 

82 

83 

84def format_form_param(name: str, value: str) -> bytes: 

85 """ 

86 Encode a name/value pair within a multipart form. 

87 """ 

88 

89 def replacer(match: typing.Match[str]) -> str: 

90 return _HTML5_FORM_ENCODING_REPLACEMENTS[match.group(0)] 

91 

92 value = _HTML5_FORM_ENCODING_RE.sub(replacer, value) 

93 return f'{name}="{value}"'.encode() 

94 

95 

96def get_ca_bundle_from_env() -> str | None: 

97 if "SSL_CERT_FILE" in os.environ: 

98 ssl_file = Path(os.environ["SSL_CERT_FILE"]) 

99 if ssl_file.is_file(): 

100 return str(ssl_file) 

101 if "SSL_CERT_DIR" in os.environ: 

102 ssl_path = Path(os.environ["SSL_CERT_DIR"]) 

103 if ssl_path.is_dir(): 

104 return str(ssl_path) 

105 return None 

106 

107 

108def parse_header_links(value: str) -> list[dict[str, str]]: 

109 """ 

110 Returns a list of parsed link headers, for more info see: 

111 https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Link 

112 The generic syntax of those is: 

113 Link: < uri-reference >; param1=value1; param2="value2" 

114 So for instance: 

115 Link; '<http:/.../front.jpeg>; type="image/jpeg",<http://.../back.jpeg>;' 

116 would return 

117 [ 

118 {"url": "http:/.../front.jpeg", "type": "image/jpeg"}, 

119 {"url": "http://.../back.jpeg"}, 

120 ] 

121 :param value: HTTP Link entity-header field 

122 :return: list of parsed link headers 

123 """ 

124 links: list[dict[str, str]] = [] 

125 replace_chars = " '\"" 

126 value = value.strip(replace_chars) 

127 if not value: 

128 return links 

129 for val in re.split(", *<", value): 

130 try: 

131 url, params = val.split(";", 1) 

132 except ValueError: 

133 url, params = val, "" 

134 link = {"url": url.strip("<> '\"")} 

135 for param in params.split(";"): 

136 try: 

137 key, value = param.split("=") 

138 except ValueError: 

139 break 

140 link[key.strip(replace_chars)] = value.strip(replace_chars) 

141 links.append(link) 

142 return links 

143 

144 

145def parse_content_type_charset(content_type: str) -> str | None: 

146 # We used to use `cgi.parse_header()` here, but `cgi` became a dead battery. 

147 # See: https://peps.python.org/pep-0594/#cgi 

148 msg = email.message.Message() 

149 msg["content-type"] = content_type 

150 return msg.get_content_charset(failobj=None) 

151 

152 

153SENSITIVE_HEADERS = {"authorization", "proxy-authorization"} 

154 

155 

156def obfuscate_sensitive_headers( 

157 items: typing.Iterable[tuple[typing.AnyStr, typing.AnyStr]], 

158) -> typing.Iterator[tuple[typing.AnyStr, typing.AnyStr]]: 

159 for k, v in items: 

160 if to_str(k.lower()) in SENSITIVE_HEADERS: 

161 v = to_bytes_or_str("[secure]", match_type_of=v) 

162 yield k, v 

163 

164 

165def port_or_default(url: URL) -> int | None: 

166 if url.port is not None: 

167 return url.port 

168 return {"http": 80, "https": 443}.get(url.scheme) 

169 

170 

171def same_origin(url: URL, other: URL) -> bool: 

172 """ 

173 Return 'True' if the given URLs share the same origin. 

174 """ 

175 return ( 

176 url.scheme == other.scheme 

177 and url.host == other.host 

178 and port_or_default(url) == port_or_default(other) 

179 ) 

180 

181 

182def is_https_redirect(url: URL, location: URL) -> bool: 

183 """ 

184 Return 'True' if 'location' is a HTTPS upgrade of 'url' 

185 """ 

186 if url.host != location.host: 

187 return False 

188 

189 return ( 

190 url.scheme == "http" 

191 and port_or_default(url) == 80 

192 and location.scheme == "https" 

193 and port_or_default(location) == 443 

194 ) 

195 

196 

197def get_environment_proxies() -> dict[str, str | None]: 

198 """Gets proxy information from the environment""" 

199 

200 # urllib.request.getproxies() falls back on System 

201 # Registry and Config for proxies on Windows and macOS. 

202 # We don't want to propagate non-HTTP proxies into 

203 # our configuration such as 'TRAVIS_APT_PROXY'. 

204 proxy_info = getproxies() 

205 mounts: dict[str, str | None] = {} 

206 

207 for scheme in ("http", "https", "all"): 

208 if proxy_info.get(scheme): 

209 hostname = proxy_info[scheme] 

210 mounts[f"{scheme}://"] = ( 

211 hostname if "://" in hostname else f"http://{hostname}" 

212 ) 

213 

214 no_proxy_hosts = [host.strip() for host in proxy_info.get("no", "").split(",")] 

215 for hostname in no_proxy_hosts: 

216 # See https://curl.haxx.se/libcurl/c/CURLOPT_NOPROXY.html for details 

217 # on how names in `NO_PROXY` are handled. 

218 if hostname == "*": 

219 # If NO_PROXY=* is used or if "*" occurs as any one of the comma 

220 # separated hostnames, then we should just bypass any information 

221 # from HTTP_PROXY, HTTPS_PROXY, ALL_PROXY, and always ignore 

222 # proxies. 

223 return {} 

224 elif hostname: 

225 # NO_PROXY=.google.com is marked as "all://*.google.com, 

226 # which disables "www.google.com" but not "google.com" 

227 # NO_PROXY=google.com is marked as "all://*google.com, 

228 # which disables "www.google.com" and "google.com". 

229 # (But not "wwwgoogle.com") 

230 # NO_PROXY can include domains, IPv6, IPv4 addresses and "localhost" 

231 # NO_PROXY=example.com,::1,localhost,192.168.0.0/16 

232 if "://" in hostname: 

233 mounts[hostname] = None 

234 elif is_ipv4_hostname(hostname): 

235 mounts[f"all://{hostname}"] = None 

236 elif is_ipv6_hostname(hostname): 

237 mounts[f"all://[{hostname}]"] = None 

238 elif hostname.lower() == "localhost": 

239 mounts[f"all://{hostname}"] = None 

240 else: 

241 mounts[f"all://*{hostname}"] = None 

242 

243 return mounts 

244 

245 

246def to_bytes(value: str | bytes, encoding: str = "utf-8") -> bytes: 

247 return value.encode(encoding) if isinstance(value, str) else value 

248 

249 

250def to_str(value: str | bytes, encoding: str = "utf-8") -> str: 

251 return value if isinstance(value, str) else value.decode(encoding) 

252 

253 

254def to_bytes_or_str(value: str, match_type_of: typing.AnyStr) -> typing.AnyStr: 

255 return value if isinstance(match_type_of, str) else value.encode() 

256 

257 

258def unquote(value: str) -> str: 

259 return value[1:-1] if value[0] == value[-1] == '"' else value 

260 

261 

262def guess_content_type(filename: str | None) -> str | None: 

263 if filename: 

264 return mimetypes.guess_type(filename)[0] or "application/octet-stream" 

265 return None 

266 

267 

268def peek_filelike_length(stream: typing.Any) -> int | None: 

269 """ 

270 Given a file-like stream object, return its length in number of bytes 

271 without reading it into memory. 

272 """ 

273 try: 

274 # Is it an actual file? 

275 fd = stream.fileno() 

276 # Yup, seems to be an actual file. 

277 length = os.fstat(fd).st_size 

278 except (AttributeError, OSError): 

279 # No... Maybe it's something that supports random access, like `io.BytesIO`? 

280 try: 

281 # Assuming so, go to end of stream to figure out its length, 

282 # then put it back in place. 

283 offset = stream.tell() 

284 length = stream.seek(0, os.SEEK_END) 

285 stream.seek(offset) 

286 except (AttributeError, OSError): 

287 # Not even that? Sorry, we're doomed... 

288 return None 

289 

290 return length 

291 

292 

293class Timer: 

294 async def _get_time(self) -> float: 

295 library = sniffio.current_async_library() 

296 if library == "trio": 

297 import trio 

298 

299 return trio.current_time() 

300 else: 

301 import asyncio 

302 

303 return asyncio.get_event_loop().time() 

304 

305 def sync_start(self) -> None: 

306 self.started = time.perf_counter() 

307 

308 async def async_start(self) -> None: 

309 self.started = await self._get_time() 

310 

311 def sync_elapsed(self) -> float: 

312 now = time.perf_counter() 

313 return now - self.started 

314 

315 async def async_elapsed(self) -> float: 

316 now = await self._get_time() 

317 return now - self.started 

318 

319 

320class URLPattern: 

321 """ 

322 A utility class currently used for making lookups against proxy keys... 

323 

324 # Wildcard matching... 

325 >>> pattern = URLPattern("all://") 

326 >>> pattern.matches(httpx.URL("http://example.com")) 

327 True 

328 

329 # Witch scheme matching... 

330 >>> pattern = URLPattern("https://") 

331 >>> pattern.matches(httpx.URL("https://example.com")) 

332 True 

333 >>> pattern.matches(httpx.URL("http://example.com")) 

334 False 

335 

336 # With domain matching... 

337 >>> pattern = URLPattern("https://example.com") 

338 >>> pattern.matches(httpx.URL("https://example.com")) 

339 True 

340 >>> pattern.matches(httpx.URL("http://example.com")) 

341 False 

342 >>> pattern.matches(httpx.URL("https://other.com")) 

343 False 

344 

345 # Wildcard scheme, with domain matching... 

346 >>> pattern = URLPattern("all://example.com") 

347 >>> pattern.matches(httpx.URL("https://example.com")) 

348 True 

349 >>> pattern.matches(httpx.URL("http://example.com")) 

350 True 

351 >>> pattern.matches(httpx.URL("https://other.com")) 

352 False 

353 

354 # With port matching... 

355 >>> pattern = URLPattern("https://example.com:1234") 

356 >>> pattern.matches(httpx.URL("https://example.com:1234")) 

357 True 

358 >>> pattern.matches(httpx.URL("https://example.com")) 

359 False 

360 """ 

361 

362 def __init__(self, pattern: str) -> None: 

363 from ._urls import URL 

364 

365 if pattern and ":" not in pattern: 

366 raise ValueError( 

367 f"Proxy keys should use proper URL forms rather " 

368 f"than plain scheme strings. " 

369 f'Instead of "{pattern}", use "{pattern}://"' 

370 ) 

371 

372 url = URL(pattern) 

373 self.pattern = pattern 

374 self.scheme = "" if url.scheme == "all" else url.scheme 

375 self.host = "" if url.host == "*" else url.host 

376 self.port = url.port 

377 if not url.host or url.host == "*": 

378 self.host_regex: typing.Pattern[str] | None = None 

379 elif url.host.startswith("*."): 

380 # *.example.com should match "www.example.com", but not "example.com" 

381 domain = re.escape(url.host[2:]) 

382 self.host_regex = re.compile(f"^.+\\.{domain}$") 

383 elif url.host.startswith("*"): 

384 # *example.com should match "www.example.com" and "example.com" 

385 domain = re.escape(url.host[1:]) 

386 self.host_regex = re.compile(f"^(.+\\.)?{domain}$") 

387 else: 

388 # example.com should match "example.com" but not "www.example.com" 

389 domain = re.escape(url.host) 

390 self.host_regex = re.compile(f"^{domain}$") 

391 

392 def matches(self, other: URL) -> bool: 

393 if self.scheme and self.scheme != other.scheme: 

394 return False 

395 if ( 

396 self.host 

397 and self.host_regex is not None 

398 and not self.host_regex.match(other.host) 

399 ): 

400 return False 

401 if self.port is not None and self.port != other.port: 

402 return False 

403 return True 

404 

405 @property 

406 def priority(self) -> tuple[int, int, int]: 

407 """ 

408 The priority allows URLPattern instances to be sortable, so that 

409 we can match from most specific to least specific. 

410 """ 

411 # URLs with a port should take priority over URLs without a port. 

412 port_priority = 0 if self.port is not None else 1 

413 # Longer hostnames should match first. 

414 host_priority = -len(self.host) 

415 # Longer schemes should match first. 

416 scheme_priority = -len(self.scheme) 

417 return (port_priority, host_priority, scheme_priority) 

418 

419 def __hash__(self) -> int: 

420 return hash(self.pattern) 

421 

422 def __lt__(self, other: URLPattern) -> bool: 

423 return self.priority < other.priority 

424 

425 def __eq__(self, other: typing.Any) -> bool: 

426 return isinstance(other, URLPattern) and self.pattern == other.pattern 

427 

428 

429def is_ipv4_hostname(hostname: str) -> bool: 

430 try: 

431 ipaddress.IPv4Address(hostname.split("/")[0]) 

432 except Exception: 

433 return False 

434 return True 

435 

436 

437def is_ipv6_hostname(hostname: str) -> bool: 

438 try: 

439 ipaddress.IPv6Address(hostname.split("/")[0]) 

440 except Exception: 

441 return False 

442 return True