Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpx/_config.py: 35%

150 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:12 +0000

1import os 

2import ssl 

3import sys 

4import typing 

5from pathlib import Path 

6 

7import certifi 

8 

9from ._compat import set_minimum_tls_version_1_2 

10from ._models import Headers 

11from ._types import CertTypes, HeaderTypes, TimeoutTypes, URLTypes, VerifyTypes 

12from ._urls import URL 

13from ._utils import get_ca_bundle_from_env, get_logger 

14 

15DEFAULT_CIPHERS = ":".join( 

16 [ 

17 "ECDHE+AESGCM", 

18 "ECDHE+CHACHA20", 

19 "DHE+AESGCM", 

20 "DHE+CHACHA20", 

21 "ECDH+AESGCM", 

22 "DH+AESGCM", 

23 "ECDH+AES", 

24 "DH+AES", 

25 "RSA+AESGCM", 

26 "RSA+AES", 

27 "!aNULL", 

28 "!eNULL", 

29 "!MD5", 

30 "!DSS", 

31 ] 

32) 

33 

34 

35logger = get_logger(__name__) 

36 

37 

38class UnsetType: 

39 pass # pragma: no cover 

40 

41 

42UNSET = UnsetType() 

43 

44 

45def create_ssl_context( 

46 cert: typing.Optional[CertTypes] = None, 

47 verify: VerifyTypes = True, 

48 trust_env: bool = True, 

49 http2: bool = False, 

50) -> ssl.SSLContext: 

51 return SSLConfig( 

52 cert=cert, verify=verify, trust_env=trust_env, http2=http2 

53 ).ssl_context 

54 

55 

56class SSLConfig: 

57 """ 

58 SSL Configuration. 

59 """ 

60 

61 DEFAULT_CA_BUNDLE_PATH = Path(certifi.where()) 

62 

63 def __init__( 

64 self, 

65 *, 

66 cert: typing.Optional[CertTypes] = None, 

67 verify: VerifyTypes = True, 

68 trust_env: bool = True, 

69 http2: bool = False, 

70 ): 

71 self.cert = cert 

72 self.verify = verify 

73 self.trust_env = trust_env 

74 self.http2 = http2 

75 self.ssl_context = self.load_ssl_context() 

76 

77 def load_ssl_context(self) -> ssl.SSLContext: 

78 logger.trace( 

79 f"load_ssl_context " 

80 f"verify={self.verify!r} " 

81 f"cert={self.cert!r} " 

82 f"trust_env={self.trust_env!r} " 

83 f"http2={self.http2!r}" 

84 ) 

85 

86 if self.verify: 

87 return self.load_ssl_context_verify() 

88 return self.load_ssl_context_no_verify() 

89 

90 def load_ssl_context_no_verify(self) -> ssl.SSLContext: 

91 """ 

92 Return an SSL context for unverified connections. 

93 """ 

94 context = self._create_default_ssl_context() 

95 context.check_hostname = False 

96 context.verify_mode = ssl.CERT_NONE 

97 self._load_client_certs(context) 

98 return context 

99 

100 def load_ssl_context_verify(self) -> ssl.SSLContext: 

101 """ 

102 Return an SSL context for verified connections. 

103 """ 

104 if self.trust_env and self.verify is True: 

105 ca_bundle = get_ca_bundle_from_env() 

106 if ca_bundle is not None: 

107 self.verify = ca_bundle 

108 

109 if isinstance(self.verify, ssl.SSLContext): 

110 # Allow passing in our own SSLContext object that's pre-configured. 

111 context = self.verify 

112 self._load_client_certs(context) 

113 return context 

114 elif isinstance(self.verify, bool): 

115 ca_bundle_path = self.DEFAULT_CA_BUNDLE_PATH 

116 elif Path(self.verify).exists(): 

117 ca_bundle_path = Path(self.verify) 

118 else: 

119 raise IOError( 

120 "Could not find a suitable TLS CA certificate bundle, " 

121 "invalid path: {}".format(self.verify) 

122 ) 

123 

124 context = self._create_default_ssl_context() 

125 context.verify_mode = ssl.CERT_REQUIRED 

126 context.check_hostname = True 

127 

128 # Signal to server support for PHA in TLS 1.3. Raises an 

129 # AttributeError if only read-only access is implemented. 

130 if sys.version_info >= (3, 8): # pragma: no cover 

131 try: 

132 context.post_handshake_auth = True 

133 except AttributeError: # pragma: no cover 

134 pass 

135 

136 # Disable using 'commonName' for SSLContext.check_hostname 

137 # when the 'subjectAltName' extension isn't available. 

138 try: 

139 context.hostname_checks_common_name = False 

140 except AttributeError: # pragma: no cover 

141 pass 

142 

143 if ca_bundle_path.is_file(): 

144 logger.trace(f"load_verify_locations cafile={ca_bundle_path!s}") 

145 context.load_verify_locations(cafile=str(ca_bundle_path)) 

146 elif ca_bundle_path.is_dir(): 

147 logger.trace(f"load_verify_locations capath={ca_bundle_path!s}") 

148 context.load_verify_locations(capath=str(ca_bundle_path)) 

149 

150 self._load_client_certs(context) 

151 

152 return context 

153 

154 def _create_default_ssl_context(self) -> ssl.SSLContext: 

155 """ 

156 Creates the default SSLContext object that's used for both verified 

157 and unverified connections. 

158 """ 

159 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 

160 set_minimum_tls_version_1_2(context) 

161 context.options |= ssl.OP_NO_COMPRESSION 

162 context.set_ciphers(DEFAULT_CIPHERS) 

163 

164 if ssl.HAS_ALPN: 

165 alpn_idents = ["http/1.1", "h2"] if self.http2 else ["http/1.1"] 

166 context.set_alpn_protocols(alpn_idents) 

167 

168 if sys.version_info >= (3, 8): # pragma: no cover 

169 keylogfile = os.environ.get("SSLKEYLOGFILE") 

170 if keylogfile and self.trust_env: 

171 context.keylog_filename = keylogfile 

172 

173 return context 

174 

175 def _load_client_certs(self, ssl_context: ssl.SSLContext) -> None: 

176 """ 

177 Loads client certificates into our SSLContext object 

178 """ 

179 if self.cert is not None: 

180 if isinstance(self.cert, str): 

181 ssl_context.load_cert_chain(certfile=self.cert) 

182 elif isinstance(self.cert, tuple) and len(self.cert) == 2: 

183 ssl_context.load_cert_chain(certfile=self.cert[0], keyfile=self.cert[1]) 

184 elif isinstance(self.cert, tuple) and len(self.cert) == 3: 

185 ssl_context.load_cert_chain( 

186 certfile=self.cert[0], 

187 keyfile=self.cert[1], 

188 password=self.cert[2], # type: ignore 

189 ) 

190 

191 

192class Timeout: 

193 """ 

194 Timeout configuration. 

195 

196 **Usage**: 

197 

198 Timeout(None) # No timeouts. 

199 Timeout(5.0) # 5s timeout on all operations. 

200 Timeout(None, connect=5.0) # 5s timeout on connect, no other timeouts. 

201 Timeout(5.0, connect=10.0) # 10s timeout on connect. 5s timeout elsewhere. 

202 Timeout(5.0, pool=None) # No timeout on acquiring connection from pool. 

203 # 5s timeout elsewhere. 

204 """ 

205 

206 def __init__( 

207 self, 

208 timeout: typing.Union[TimeoutTypes, UnsetType] = UNSET, 

209 *, 

210 connect: typing.Union[None, float, UnsetType] = UNSET, 

211 read: typing.Union[None, float, UnsetType] = UNSET, 

212 write: typing.Union[None, float, UnsetType] = UNSET, 

213 pool: typing.Union[None, float, UnsetType] = UNSET, 

214 ): 

215 if isinstance(timeout, Timeout): 

216 # Passed as a single explicit Timeout. 

217 assert connect is UNSET 

218 assert read is UNSET 

219 assert write is UNSET 

220 assert pool is UNSET 

221 self.connect = timeout.connect # type: typing.Optional[float] 

222 self.read = timeout.read # type: typing.Optional[float] 

223 self.write = timeout.write # type: typing.Optional[float] 

224 self.pool = timeout.pool # type: typing.Optional[float] 

225 elif isinstance(timeout, tuple): 

226 # Passed as a tuple. 

227 self.connect = timeout[0] 

228 self.read = timeout[1] 

229 self.write = None if len(timeout) < 3 else timeout[2] 

230 self.pool = None if len(timeout) < 4 else timeout[3] 

231 elif not ( 

232 isinstance(connect, UnsetType) 

233 or isinstance(read, UnsetType) 

234 or isinstance(write, UnsetType) 

235 or isinstance(pool, UnsetType) 

236 ): 

237 self.connect = connect 

238 self.read = read 

239 self.write = write 

240 self.pool = pool 

241 else: 

242 if isinstance(timeout, UnsetType): 

243 raise ValueError( 

244 "httpx.Timeout must either include a default, or set all " 

245 "four parameters explicitly." 

246 ) 

247 self.connect = timeout if isinstance(connect, UnsetType) else connect 

248 self.read = timeout if isinstance(read, UnsetType) else read 

249 self.write = timeout if isinstance(write, UnsetType) else write 

250 self.pool = timeout if isinstance(pool, UnsetType) else pool 

251 

252 def as_dict(self) -> typing.Dict[str, typing.Optional[float]]: 

253 return { 

254 "connect": self.connect, 

255 "read": self.read, 

256 "write": self.write, 

257 "pool": self.pool, 

258 } 

259 

260 def __eq__(self, other: typing.Any) -> bool: 

261 return ( 

262 isinstance(other, self.__class__) 

263 and self.connect == other.connect 

264 and self.read == other.read 

265 and self.write == other.write 

266 and self.pool == other.pool 

267 ) 

268 

269 def __repr__(self) -> str: 

270 class_name = self.__class__.__name__ 

271 if len({self.connect, self.read, self.write, self.pool}) == 1: 

272 return f"{class_name}(timeout={self.connect})" 

273 return ( 

274 f"{class_name}(connect={self.connect}, " 

275 f"read={self.read}, write={self.write}, pool={self.pool})" 

276 ) 

277 

278 

279class Limits: 

280 """ 

281 Configuration for limits to various client behaviors. 

282 

283 **Parameters:** 

284 

285 * **max_connections** - The maximum number of concurrent connections that may be 

286 established. 

287 * **max_keepalive_connections** - Allow the connection pool to maintain 

288 keep-alive connections below this point. Should be less than or equal 

289 to `max_connections`. 

290 * **keepalive_expiry** - Time limit on idle keep-alive connections in seconds. 

291 """ 

292 

293 def __init__( 

294 self, 

295 *, 

296 max_connections: typing.Optional[int] = None, 

297 max_keepalive_connections: typing.Optional[int] = None, 

298 keepalive_expiry: typing.Optional[float] = 5.0, 

299 ): 

300 self.max_connections = max_connections 

301 self.max_keepalive_connections = max_keepalive_connections 

302 self.keepalive_expiry = keepalive_expiry 

303 

304 def __eq__(self, other: typing.Any) -> bool: 

305 return ( 

306 isinstance(other, self.__class__) 

307 and self.max_connections == other.max_connections 

308 and self.max_keepalive_connections == other.max_keepalive_connections 

309 and self.keepalive_expiry == other.keepalive_expiry 

310 ) 

311 

312 def __repr__(self) -> str: 

313 class_name = self.__class__.__name__ 

314 return ( 

315 f"{class_name}(max_connections={self.max_connections}, " 

316 f"max_keepalive_connections={self.max_keepalive_connections}, " 

317 f"keepalive_expiry={self.keepalive_expiry})" 

318 ) 

319 

320 

321class Proxy: 

322 def __init__( 

323 self, 

324 url: URLTypes, 

325 *, 

326 auth: typing.Optional[typing.Tuple[str, str]] = None, 

327 headers: typing.Optional[HeaderTypes] = None, 

328 ): 

329 url = URL(url) 

330 headers = Headers(headers) 

331 

332 if url.scheme not in ("http", "https", "socks5"): 

333 raise ValueError(f"Unknown scheme for proxy URL {url!r}") 

334 

335 if url.username or url.password: 

336 # Remove any auth credentials from the URL. 

337 auth = (url.username, url.password) 

338 url = url.copy_with(username=None, password=None) 

339 

340 self.url = url 

341 self.auth = auth 

342 self.headers = headers 

343 

344 @property 

345 def raw_auth(self) -> typing.Optional[typing.Tuple[bytes, bytes]]: 

346 # The proxy authentication as raw bytes. 

347 return ( 

348 None 

349 if self.auth is None 

350 else (self.auth[0].encode("utf-8"), self.auth[1].encode("utf-8")) 

351 ) 

352 

353 def __repr__(self) -> str: 

354 # The authentication is represented with the password component masked. 

355 auth = (self.auth[0], "********") if self.auth else None 

356 

357 # Build a nice concise representation. 

358 url_str = f"{str(self.url)!r}" 

359 auth_str = f", auth={auth!r}" if auth else "" 

360 headers_str = f", headers={dict(self.headers)!r}" if self.headers else "" 

361 return f"Proxy({url_str}{auth_str}{headers_str})" 

362 

363 

364DEFAULT_TIMEOUT_CONFIG = Timeout(timeout=5.0) 

365DEFAULT_LIMITS = Limits(max_connections=100, max_keepalive_connections=20) 

366DEFAULT_MAX_REDIRECTS = 20