Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpx/_config.py: 33%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

159 statements  

1from __future__ import annotations 

2 

3import logging 

4import os 

5import ssl 

6import typing 

7from pathlib import Path 

8 

9import certifi 

10 

11from ._compat import set_minimum_tls_version_1_2 

12from ._models import Headers 

13from ._types import CertTypes, HeaderTypes, TimeoutTypes, URLTypes, VerifyTypes 

14from ._urls import URL 

15from ._utils import get_ca_bundle_from_env 

16 

17DEFAULT_CIPHERS = ":".join( 

18 [ 

19 "ECDHE+AESGCM", 

20 "ECDHE+CHACHA20", 

21 "DHE+AESGCM", 

22 "DHE+CHACHA20", 

23 "ECDH+AESGCM", 

24 "DH+AESGCM", 

25 "ECDH+AES", 

26 "DH+AES", 

27 "RSA+AESGCM", 

28 "RSA+AES", 

29 "!aNULL", 

30 "!eNULL", 

31 "!MD5", 

32 "!DSS", 

33 ] 

34) 

35 

36 

37logger = logging.getLogger("httpx") 

38 

39 

40class UnsetType: 

41 pass # pragma: no cover 

42 

43 

44UNSET = UnsetType() 

45 

46 

47def create_ssl_context( 

48 cert: CertTypes | None = None, 

49 verify: VerifyTypes = True, 

50 trust_env: bool = True, 

51 http2: bool = False, 

52) -> ssl.SSLContext: 

53 return SSLConfig( 

54 cert=cert, verify=verify, trust_env=trust_env, http2=http2 

55 ).ssl_context 

56 

57 

58class SSLConfig: 

59 """ 

60 SSL Configuration. 

61 """ 

62 

63 DEFAULT_CA_BUNDLE_PATH = Path(certifi.where()) 

64 

65 def __init__( 

66 self, 

67 *, 

68 cert: CertTypes | None = None, 

69 verify: VerifyTypes = True, 

70 trust_env: bool = True, 

71 http2: bool = False, 

72 ) -> None: 

73 self.cert = cert 

74 self.verify = verify 

75 self.trust_env = trust_env 

76 self.http2 = http2 

77 self.ssl_context = self.load_ssl_context() 

78 

79 def load_ssl_context(self) -> ssl.SSLContext: 

80 logger.debug( 

81 "load_ssl_context verify=%r cert=%r trust_env=%r http2=%r", 

82 self.verify, 

83 self.cert, 

84 self.trust_env, 

85 self.http2, 

86 ) 

87 

88 if self.verify: 

89 return self.load_ssl_context_verify() 

90 return self.load_ssl_context_no_verify() 

91 

92 def load_ssl_context_no_verify(self) -> ssl.SSLContext: 

93 """ 

94 Return an SSL context for unverified connections. 

95 """ 

96 context = self._create_default_ssl_context() 

97 context.check_hostname = False 

98 context.verify_mode = ssl.CERT_NONE 

99 self._load_client_certs(context) 

100 return context 

101 

102 def load_ssl_context_verify(self) -> ssl.SSLContext: 

103 """ 

104 Return an SSL context for verified connections. 

105 """ 

106 if self.trust_env and self.verify is True: 

107 ca_bundle = get_ca_bundle_from_env() 

108 if ca_bundle is not None: 

109 self.verify = ca_bundle 

110 

111 if isinstance(self.verify, ssl.SSLContext): 

112 # Allow passing in our own SSLContext object that's pre-configured. 

113 context = self.verify 

114 self._load_client_certs(context) 

115 return context 

116 elif isinstance(self.verify, bool): 

117 ca_bundle_path = self.DEFAULT_CA_BUNDLE_PATH 

118 elif Path(self.verify).exists(): 

119 ca_bundle_path = Path(self.verify) 

120 else: 

121 raise IOError( 

122 "Could not find a suitable TLS CA certificate bundle, " 

123 "invalid path: {}".format(self.verify) 

124 ) 

125 

126 context = self._create_default_ssl_context() 

127 context.verify_mode = ssl.CERT_REQUIRED 

128 context.check_hostname = True 

129 

130 # Signal to server support for PHA in TLS 1.3. Raises an 

131 # AttributeError if only read-only access is implemented. 

132 try: 

133 context.post_handshake_auth = True 

134 except AttributeError: # pragma: no cover 

135 pass 

136 

137 # Disable using 'commonName' for SSLContext.check_hostname 

138 # when the 'subjectAltName' extension isn't available. 

139 try: 

140 context.hostname_checks_common_name = False 

141 except AttributeError: # pragma: no cover 

142 pass 

143 

144 if ca_bundle_path.is_file(): 

145 cafile = str(ca_bundle_path) 

146 logger.debug("load_verify_locations cafile=%r", cafile) 

147 context.load_verify_locations(cafile=cafile) 

148 elif ca_bundle_path.is_dir(): 

149 capath = str(ca_bundle_path) 

150 logger.debug("load_verify_locations capath=%r", capath) 

151 context.load_verify_locations(capath=capath) 

152 

153 self._load_client_certs(context) 

154 

155 return context 

156 

157 def _create_default_ssl_context(self) -> ssl.SSLContext: 

158 """ 

159 Creates the default SSLContext object that's used for both verified 

160 and unverified connections. 

161 """ 

162 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 

163 set_minimum_tls_version_1_2(context) 

164 context.options |= ssl.OP_NO_COMPRESSION 

165 context.set_ciphers(DEFAULT_CIPHERS) 

166 

167 if ssl.HAS_ALPN: 

168 alpn_idents = ["http/1.1", "h2"] if self.http2 else ["http/1.1"] 

169 context.set_alpn_protocols(alpn_idents) 

170 

171 keylogfile = os.environ.get("SSLKEYLOGFILE") 

172 if keylogfile and self.trust_env: 

173 context.keylog_filename = keylogfile 

174 

175 return context 

176 

177 def _load_client_certs(self, ssl_context: ssl.SSLContext) -> None: 

178 """ 

179 Loads client certificates into our SSLContext object 

180 """ 

181 if self.cert is not None: 

182 if isinstance(self.cert, str): 

183 ssl_context.load_cert_chain(certfile=self.cert) 

184 elif isinstance(self.cert, tuple) and len(self.cert) == 2: 

185 ssl_context.load_cert_chain(certfile=self.cert[0], keyfile=self.cert[1]) 

186 elif isinstance(self.cert, tuple) and len(self.cert) == 3: 

187 ssl_context.load_cert_chain( 

188 certfile=self.cert[0], 

189 keyfile=self.cert[1], 

190 password=self.cert[2], 

191 ) 

192 

193 

194class Timeout: 

195 """ 

196 Timeout configuration. 

197 

198 **Usage**: 

199 

200 Timeout(None) # No timeouts. 

201 Timeout(5.0) # 5s timeout on all operations. 

202 Timeout(None, connect=5.0) # 5s timeout on connect, no other timeouts. 

203 Timeout(5.0, connect=10.0) # 10s timeout on connect. 5s timeout elsewhere. 

204 Timeout(5.0, pool=None) # No timeout on acquiring connection from pool. 

205 # 5s timeout elsewhere. 

206 """ 

207 

208 def __init__( 

209 self, 

210 timeout: TimeoutTypes | UnsetType = UNSET, 

211 *, 

212 connect: None | float | UnsetType = UNSET, 

213 read: None | float | UnsetType = UNSET, 

214 write: None | float | UnsetType = UNSET, 

215 pool: None | float | UnsetType = UNSET, 

216 ) -> None: 

217 if isinstance(timeout, Timeout): 

218 # Passed as a single explicit Timeout. 

219 assert connect is UNSET 

220 assert read is UNSET 

221 assert write is UNSET 

222 assert pool is UNSET 

223 self.connect = timeout.connect # type: typing.Optional[float] 

224 self.read = timeout.read # type: typing.Optional[float] 

225 self.write = timeout.write # type: typing.Optional[float] 

226 self.pool = timeout.pool # type: typing.Optional[float] 

227 elif isinstance(timeout, tuple): 

228 # Passed as a tuple. 

229 self.connect = timeout[0] 

230 self.read = timeout[1] 

231 self.write = None if len(timeout) < 3 else timeout[2] 

232 self.pool = None if len(timeout) < 4 else timeout[3] 

233 elif not ( 

234 isinstance(connect, UnsetType) 

235 or isinstance(read, UnsetType) 

236 or isinstance(write, UnsetType) 

237 or isinstance(pool, UnsetType) 

238 ): 

239 self.connect = connect 

240 self.read = read 

241 self.write = write 

242 self.pool = pool 

243 else: 

244 if isinstance(timeout, UnsetType): 

245 raise ValueError( 

246 "httpx.Timeout must either include a default, or set all " 

247 "four parameters explicitly." 

248 ) 

249 self.connect = timeout if isinstance(connect, UnsetType) else connect 

250 self.read = timeout if isinstance(read, UnsetType) else read 

251 self.write = timeout if isinstance(write, UnsetType) else write 

252 self.pool = timeout if isinstance(pool, UnsetType) else pool 

253 

254 def as_dict(self) -> dict[str, float | None]: 

255 return { 

256 "connect": self.connect, 

257 "read": self.read, 

258 "write": self.write, 

259 "pool": self.pool, 

260 } 

261 

262 def __eq__(self, other: typing.Any) -> bool: 

263 return ( 

264 isinstance(other, self.__class__) 

265 and self.connect == other.connect 

266 and self.read == other.read 

267 and self.write == other.write 

268 and self.pool == other.pool 

269 ) 

270 

271 def __repr__(self) -> str: 

272 class_name = self.__class__.__name__ 

273 if len({self.connect, self.read, self.write, self.pool}) == 1: 

274 return f"{class_name}(timeout={self.connect})" 

275 return ( 

276 f"{class_name}(connect={self.connect}, " 

277 f"read={self.read}, write={self.write}, pool={self.pool})" 

278 ) 

279 

280 

281class Limits: 

282 """ 

283 Configuration for limits to various client behaviors. 

284 

285 **Parameters:** 

286 

287 * **max_connections** - The maximum number of concurrent connections that may be 

288 established. 

289 * **max_keepalive_connections** - Allow the connection pool to maintain 

290 keep-alive connections below this point. Should be less than or equal 

291 to `max_connections`. 

292 * **keepalive_expiry** - Time limit on idle keep-alive connections in seconds. 

293 """ 

294 

295 def __init__( 

296 self, 

297 *, 

298 max_connections: int | None = None, 

299 max_keepalive_connections: int | None = None, 

300 keepalive_expiry: float | None = 5.0, 

301 ) -> None: 

302 self.max_connections = max_connections 

303 self.max_keepalive_connections = max_keepalive_connections 

304 self.keepalive_expiry = keepalive_expiry 

305 

306 def __eq__(self, other: typing.Any) -> bool: 

307 return ( 

308 isinstance(other, self.__class__) 

309 and self.max_connections == other.max_connections 

310 and self.max_keepalive_connections == other.max_keepalive_connections 

311 and self.keepalive_expiry == other.keepalive_expiry 

312 ) 

313 

314 def __repr__(self) -> str: 

315 class_name = self.__class__.__name__ 

316 return ( 

317 f"{class_name}(max_connections={self.max_connections}, " 

318 f"max_keepalive_connections={self.max_keepalive_connections}, " 

319 f"keepalive_expiry={self.keepalive_expiry})" 

320 ) 

321 

322 

323class Proxy: 

324 def __init__( 

325 self, 

326 url: URLTypes, 

327 *, 

328 ssl_context: ssl.SSLContext | None = None, 

329 auth: tuple[str, str] | None = None, 

330 headers: HeaderTypes | None = None, 

331 ) -> None: 

332 url = URL(url) 

333 headers = Headers(headers) 

334 

335 if url.scheme not in ("http", "https", "socks5"): 

336 raise ValueError(f"Unknown scheme for proxy URL {url!r}") 

337 

338 if url.username or url.password: 

339 # Remove any auth credentials from the URL. 

340 auth = (url.username, url.password) 

341 url = url.copy_with(username=None, password=None) 

342 

343 self.url = url 

344 self.auth = auth 

345 self.headers = headers 

346 self.ssl_context = ssl_context 

347 

348 @property 

349 def raw_auth(self) -> tuple[bytes, bytes] | None: 

350 # The proxy authentication as raw bytes. 

351 return ( 

352 None 

353 if self.auth is None 

354 else (self.auth[0].encode("utf-8"), self.auth[1].encode("utf-8")) 

355 ) 

356 

357 def __repr__(self) -> str: 

358 # The authentication is represented with the password component masked. 

359 auth = (self.auth[0], "********") if self.auth else None 

360 

361 # Build a nice concise representation. 

362 url_str = f"{str(self.url)!r}" 

363 auth_str = f", auth={auth!r}" if auth else "" 

364 headers_str = f", headers={dict(self.headers)!r}" if self.headers else "" 

365 return f"Proxy({url_str}{auth_str}{headers_str})" 

366 

367 

368DEFAULT_TIMEOUT_CONFIG = Timeout(timeout=5.0) 

369DEFAULT_LIMITS = Limits(max_connections=100, max_keepalive_connections=20) 

370DEFAULT_MAX_REDIRECTS = 20