Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpx/_config.py: 34%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

160 statements  

1from __future__ import annotations 

2 

3import logging 

4import os 

5import ssl 

6import typing 

7from pathlib import Path 

8 

9import certifi 

10 

11from ._compat import set_minimum_tls_version_1_2 

12from ._models import Headers 

13from ._types import CertTypes, HeaderTypes, TimeoutTypes, VerifyTypes 

14from ._urls import URL 

15from ._utils import get_ca_bundle_from_env 

16 

17__all__ = ["Limits", "Proxy", "Timeout", "create_ssl_context"] 

18 

19DEFAULT_CIPHERS = ":".join( 

20 [ 

21 "ECDHE+AESGCM", 

22 "ECDHE+CHACHA20", 

23 "DHE+AESGCM", 

24 "DHE+CHACHA20", 

25 "ECDH+AESGCM", 

26 "DH+AESGCM", 

27 "ECDH+AES", 

28 "DH+AES", 

29 "RSA+AESGCM", 

30 "RSA+AES", 

31 "!aNULL", 

32 "!eNULL", 

33 "!MD5", 

34 "!DSS", 

35 ] 

36) 

37 

38 

39logger = logging.getLogger("httpx") 

40 

41 

42class UnsetType: 

43 pass # pragma: no cover 

44 

45 

46UNSET = UnsetType() 

47 

48 

49def create_ssl_context( 

50 cert: CertTypes | None = None, 

51 verify: VerifyTypes = True, 

52 trust_env: bool = True, 

53 http2: bool = False, 

54) -> ssl.SSLContext: 

55 return SSLConfig( 

56 cert=cert, verify=verify, trust_env=trust_env, http2=http2 

57 ).ssl_context 

58 

59 

60class SSLConfig: 

61 """ 

62 SSL Configuration. 

63 """ 

64 

65 DEFAULT_CA_BUNDLE_PATH = Path(certifi.where()) 

66 

67 def __init__( 

68 self, 

69 *, 

70 cert: CertTypes | None = None, 

71 verify: VerifyTypes = True, 

72 trust_env: bool = True, 

73 http2: bool = False, 

74 ) -> None: 

75 self.cert = cert 

76 self.verify = verify 

77 self.trust_env = trust_env 

78 self.http2 = http2 

79 self.ssl_context = self.load_ssl_context() 

80 

81 def load_ssl_context(self) -> ssl.SSLContext: 

82 logger.debug( 

83 "load_ssl_context verify=%r cert=%r trust_env=%r http2=%r", 

84 self.verify, 

85 self.cert, 

86 self.trust_env, 

87 self.http2, 

88 ) 

89 

90 if self.verify: 

91 return self.load_ssl_context_verify() 

92 return self.load_ssl_context_no_verify() 

93 

94 def load_ssl_context_no_verify(self) -> ssl.SSLContext: 

95 """ 

96 Return an SSL context for unverified connections. 

97 """ 

98 context = self._create_default_ssl_context() 

99 context.check_hostname = False 

100 context.verify_mode = ssl.CERT_NONE 

101 self._load_client_certs(context) 

102 return context 

103 

104 def load_ssl_context_verify(self) -> ssl.SSLContext: 

105 """ 

106 Return an SSL context for verified connections. 

107 """ 

108 if self.trust_env and self.verify is True: 

109 ca_bundle = get_ca_bundle_from_env() 

110 if ca_bundle is not None: 

111 self.verify = ca_bundle 

112 

113 if isinstance(self.verify, ssl.SSLContext): 

114 # Allow passing in our own SSLContext object that's pre-configured. 

115 context = self.verify 

116 self._load_client_certs(context) 

117 return context 

118 elif isinstance(self.verify, bool): 

119 ca_bundle_path = self.DEFAULT_CA_BUNDLE_PATH 

120 elif Path(self.verify).exists(): 

121 ca_bundle_path = Path(self.verify) 

122 else: 

123 raise IOError( 

124 "Could not find a suitable TLS CA certificate bundle, " 

125 "invalid path: {}".format(self.verify) 

126 ) 

127 

128 context = self._create_default_ssl_context() 

129 context.verify_mode = ssl.CERT_REQUIRED 

130 context.check_hostname = True 

131 

132 # Signal to server support for PHA in TLS 1.3. Raises an 

133 # AttributeError if only read-only access is implemented. 

134 try: 

135 context.post_handshake_auth = True 

136 except AttributeError: # pragma: no cover 

137 pass 

138 

139 # Disable using 'commonName' for SSLContext.check_hostname 

140 # when the 'subjectAltName' extension isn't available. 

141 try: 

142 context.hostname_checks_common_name = False 

143 except AttributeError: # pragma: no cover 

144 pass 

145 

146 if ca_bundle_path.is_file(): 

147 cafile = str(ca_bundle_path) 

148 logger.debug("load_verify_locations cafile=%r", cafile) 

149 context.load_verify_locations(cafile=cafile) 

150 elif ca_bundle_path.is_dir(): 

151 capath = str(ca_bundle_path) 

152 logger.debug("load_verify_locations capath=%r", capath) 

153 context.load_verify_locations(capath=capath) 

154 

155 self._load_client_certs(context) 

156 

157 return context 

158 

159 def _create_default_ssl_context(self) -> ssl.SSLContext: 

160 """ 

161 Creates the default SSLContext object that's used for both verified 

162 and unverified connections. 

163 """ 

164 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 

165 set_minimum_tls_version_1_2(context) 

166 context.options |= ssl.OP_NO_COMPRESSION 

167 context.set_ciphers(DEFAULT_CIPHERS) 

168 

169 if ssl.HAS_ALPN: 

170 alpn_idents = ["http/1.1", "h2"] if self.http2 else ["http/1.1"] 

171 context.set_alpn_protocols(alpn_idents) 

172 

173 keylogfile = os.environ.get("SSLKEYLOGFILE") 

174 if keylogfile and self.trust_env: 

175 context.keylog_filename = keylogfile 

176 

177 return context 

178 

179 def _load_client_certs(self, ssl_context: ssl.SSLContext) -> None: 

180 """ 

181 Loads client certificates into our SSLContext object 

182 """ 

183 if self.cert is not None: 

184 if isinstance(self.cert, str): 

185 ssl_context.load_cert_chain(certfile=self.cert) 

186 elif isinstance(self.cert, tuple) and len(self.cert) == 2: 

187 ssl_context.load_cert_chain(certfile=self.cert[0], keyfile=self.cert[1]) 

188 elif isinstance(self.cert, tuple) and len(self.cert) == 3: 

189 ssl_context.load_cert_chain( 

190 certfile=self.cert[0], 

191 keyfile=self.cert[1], 

192 password=self.cert[2], 

193 ) 

194 

195 

196class Timeout: 

197 """ 

198 Timeout configuration. 

199 

200 **Usage**: 

201 

202 Timeout(None) # No timeouts. 

203 Timeout(5.0) # 5s timeout on all operations. 

204 Timeout(None, connect=5.0) # 5s timeout on connect, no other timeouts. 

205 Timeout(5.0, connect=10.0) # 10s timeout on connect. 5s timeout elsewhere. 

206 Timeout(5.0, pool=None) # No timeout on acquiring connection from pool. 

207 # 5s timeout elsewhere. 

208 """ 

209 

210 def __init__( 

211 self, 

212 timeout: TimeoutTypes | UnsetType = UNSET, 

213 *, 

214 connect: None | float | UnsetType = UNSET, 

215 read: None | float | UnsetType = UNSET, 

216 write: None | float | UnsetType = UNSET, 

217 pool: None | float | UnsetType = UNSET, 

218 ) -> None: 

219 if isinstance(timeout, Timeout): 

220 # Passed as a single explicit Timeout. 

221 assert connect is UNSET 

222 assert read is UNSET 

223 assert write is UNSET 

224 assert pool is UNSET 

225 self.connect = timeout.connect # type: typing.Optional[float] 

226 self.read = timeout.read # type: typing.Optional[float] 

227 self.write = timeout.write # type: typing.Optional[float] 

228 self.pool = timeout.pool # type: typing.Optional[float] 

229 elif isinstance(timeout, tuple): 

230 # Passed as a tuple. 

231 self.connect = timeout[0] 

232 self.read = timeout[1] 

233 self.write = None if len(timeout) < 3 else timeout[2] 

234 self.pool = None if len(timeout) < 4 else timeout[3] 

235 elif not ( 

236 isinstance(connect, UnsetType) 

237 or isinstance(read, UnsetType) 

238 or isinstance(write, UnsetType) 

239 or isinstance(pool, UnsetType) 

240 ): 

241 self.connect = connect 

242 self.read = read 

243 self.write = write 

244 self.pool = pool 

245 else: 

246 if isinstance(timeout, UnsetType): 

247 raise ValueError( 

248 "httpx.Timeout must either include a default, or set all " 

249 "four parameters explicitly." 

250 ) 

251 self.connect = timeout if isinstance(connect, UnsetType) else connect 

252 self.read = timeout if isinstance(read, UnsetType) else read 

253 self.write = timeout if isinstance(write, UnsetType) else write 

254 self.pool = timeout if isinstance(pool, UnsetType) else pool 

255 

256 def as_dict(self) -> dict[str, float | None]: 

257 return { 

258 "connect": self.connect, 

259 "read": self.read, 

260 "write": self.write, 

261 "pool": self.pool, 

262 } 

263 

264 def __eq__(self, other: typing.Any) -> bool: 

265 return ( 

266 isinstance(other, self.__class__) 

267 and self.connect == other.connect 

268 and self.read == other.read 

269 and self.write == other.write 

270 and self.pool == other.pool 

271 ) 

272 

273 def __repr__(self) -> str: 

274 class_name = self.__class__.__name__ 

275 if len({self.connect, self.read, self.write, self.pool}) == 1: 

276 return f"{class_name}(timeout={self.connect})" 

277 return ( 

278 f"{class_name}(connect={self.connect}, " 

279 f"read={self.read}, write={self.write}, pool={self.pool})" 

280 ) 

281 

282 

283class Limits: 

284 """ 

285 Configuration for limits to various client behaviors. 

286 

287 **Parameters:** 

288 

289 * **max_connections** - The maximum number of concurrent connections that may be 

290 established. 

291 * **max_keepalive_connections** - Allow the connection pool to maintain 

292 keep-alive connections below this point. Should be less than or equal 

293 to `max_connections`. 

294 * **keepalive_expiry** - Time limit on idle keep-alive connections in seconds. 

295 """ 

296 

297 def __init__( 

298 self, 

299 *, 

300 max_connections: int | None = None, 

301 max_keepalive_connections: int | None = None, 

302 keepalive_expiry: float | None = 5.0, 

303 ) -> None: 

304 self.max_connections = max_connections 

305 self.max_keepalive_connections = max_keepalive_connections 

306 self.keepalive_expiry = keepalive_expiry 

307 

308 def __eq__(self, other: typing.Any) -> bool: 

309 return ( 

310 isinstance(other, self.__class__) 

311 and self.max_connections == other.max_connections 

312 and self.max_keepalive_connections == other.max_keepalive_connections 

313 and self.keepalive_expiry == other.keepalive_expiry 

314 ) 

315 

316 def __repr__(self) -> str: 

317 class_name = self.__class__.__name__ 

318 return ( 

319 f"{class_name}(max_connections={self.max_connections}, " 

320 f"max_keepalive_connections={self.max_keepalive_connections}, " 

321 f"keepalive_expiry={self.keepalive_expiry})" 

322 ) 

323 

324 

325class Proxy: 

326 def __init__( 

327 self, 

328 url: URL | str, 

329 *, 

330 ssl_context: ssl.SSLContext | None = None, 

331 auth: tuple[str, str] | None = None, 

332 headers: HeaderTypes | None = None, 

333 ) -> None: 

334 url = URL(url) 

335 headers = Headers(headers) 

336 

337 if url.scheme not in ("http", "https", "socks5"): 

338 raise ValueError(f"Unknown scheme for proxy URL {url!r}") 

339 

340 if url.username or url.password: 

341 # Remove any auth credentials from the URL. 

342 auth = (url.username, url.password) 

343 url = url.copy_with(username=None, password=None) 

344 

345 self.url = url 

346 self.auth = auth 

347 self.headers = headers 

348 self.ssl_context = ssl_context 

349 

350 @property 

351 def raw_auth(self) -> tuple[bytes, bytes] | None: 

352 # The proxy authentication as raw bytes. 

353 return ( 

354 None 

355 if self.auth is None 

356 else (self.auth[0].encode("utf-8"), self.auth[1].encode("utf-8")) 

357 ) 

358 

359 def __repr__(self) -> str: 

360 # The authentication is represented with the password component masked. 

361 auth = (self.auth[0], "********") if self.auth else None 

362 

363 # Build a nice concise representation. 

364 url_str = f"{str(self.url)!r}" 

365 auth_str = f", auth={auth!r}" if auth else "" 

366 headers_str = f", headers={dict(self.headers)!r}" if self.headers else "" 

367 return f"Proxy({url_str}{auth_str}{headers_str})" 

368 

369 

370DEFAULT_TIMEOUT_CONFIG = Timeout(timeout=5.0) 

371DEFAULT_LIMITS = Limits(max_connections=100, max_keepalive_connections=20) 

372DEFAULT_MAX_REDIRECTS = 20