Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpx/_config.py: 35%

153 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 07:19 +0000

1import logging 

2import os 

3import ssl 

4import sys 

5import typing 

6from pathlib import Path 

7 

8import certifi 

9 

10from ._compat import set_minimum_tls_version_1_2 

11from ._models import Headers 

12from ._types import CertTypes, HeaderTypes, TimeoutTypes, URLTypes, VerifyTypes 

13from ._urls import URL 

14from ._utils import get_ca_bundle_from_env 

15 

16DEFAULT_CIPHERS = ":".join( 

17 [ 

18 "ECDHE+AESGCM", 

19 "ECDHE+CHACHA20", 

20 "DHE+AESGCM", 

21 "DHE+CHACHA20", 

22 "ECDH+AESGCM", 

23 "DH+AESGCM", 

24 "ECDH+AES", 

25 "DH+AES", 

26 "RSA+AESGCM", 

27 "RSA+AES", 

28 "!aNULL", 

29 "!eNULL", 

30 "!MD5", 

31 "!DSS", 

32 ] 

33) 

34 

35 

36logger = logging.getLogger("httpx") 

37 

38 

39class UnsetType: 

40 pass # pragma: no cover 

41 

42 

43UNSET = UnsetType() 

44 

45 

46def create_ssl_context( 

47 cert: typing.Optional[CertTypes] = None, 

48 verify: VerifyTypes = True, 

49 trust_env: bool = True, 

50 http2: bool = False, 

51) -> ssl.SSLContext: 

52 return SSLConfig( 

53 cert=cert, verify=verify, trust_env=trust_env, http2=http2 

54 ).ssl_context 

55 

56 

57class SSLConfig: 

58 """ 

59 SSL Configuration. 

60 """ 

61 

62 DEFAULT_CA_BUNDLE_PATH = Path(certifi.where()) 

63 

64 def __init__( 

65 self, 

66 *, 

67 cert: typing.Optional[CertTypes] = None, 

68 verify: VerifyTypes = True, 

69 trust_env: bool = True, 

70 http2: bool = False, 

71 ): 

72 self.cert = cert 

73 self.verify = verify 

74 self.trust_env = trust_env 

75 self.http2 = http2 

76 self.ssl_context = self.load_ssl_context() 

77 

78 def load_ssl_context(self) -> ssl.SSLContext: 

79 logger.debug( 

80 "load_ssl_context verify=%r cert=%r trust_env=%r http2=%r", 

81 self.verify, 

82 self.cert, 

83 self.trust_env, 

84 self.http2, 

85 ) 

86 

87 if self.verify: 

88 return self.load_ssl_context_verify() 

89 return self.load_ssl_context_no_verify() 

90 

91 def load_ssl_context_no_verify(self) -> ssl.SSLContext: 

92 """ 

93 Return an SSL context for unverified connections. 

94 """ 

95 context = self._create_default_ssl_context() 

96 context.check_hostname = False 

97 context.verify_mode = ssl.CERT_NONE 

98 self._load_client_certs(context) 

99 return context 

100 

101 def load_ssl_context_verify(self) -> ssl.SSLContext: 

102 """ 

103 Return an SSL context for verified connections. 

104 """ 

105 if self.trust_env and self.verify is True: 

106 ca_bundle = get_ca_bundle_from_env() 

107 if ca_bundle is not None: 

108 self.verify = ca_bundle 

109 

110 if isinstance(self.verify, ssl.SSLContext): 

111 # Allow passing in our own SSLContext object that's pre-configured. 

112 context = self.verify 

113 self._load_client_certs(context) 

114 return context 

115 elif isinstance(self.verify, bool): 

116 ca_bundle_path = self.DEFAULT_CA_BUNDLE_PATH 

117 elif Path(self.verify).exists(): 

118 ca_bundle_path = Path(self.verify) 

119 else: 

120 raise IOError( 

121 "Could not find a suitable TLS CA certificate bundle, " 

122 "invalid path: {}".format(self.verify) 

123 ) 

124 

125 context = self._create_default_ssl_context() 

126 context.verify_mode = ssl.CERT_REQUIRED 

127 context.check_hostname = True 

128 

129 # Signal to server support for PHA in TLS 1.3. Raises an 

130 # AttributeError if only read-only access is implemented. 

131 if sys.version_info >= (3, 8): # pragma: no cover 

132 try: 

133 context.post_handshake_auth = True 

134 except AttributeError: # pragma: no cover 

135 pass 

136 

137 # Disable using 'commonName' for SSLContext.check_hostname 

138 # when the 'subjectAltName' extension isn't available. 

139 try: 

140 context.hostname_checks_common_name = False 

141 except AttributeError: # pragma: no cover 

142 pass 

143 

144 if ca_bundle_path.is_file(): 

145 cafile = str(ca_bundle_path) 

146 logger.debug("load_verify_locations cafile=%r", cafile) 

147 context.load_verify_locations(cafile=cafile) 

148 elif ca_bundle_path.is_dir(): 

149 capath = str(ca_bundle_path) 

150 logger.debug("load_verify_locations capath=%r", capath) 

151 context.load_verify_locations(capath=capath) 

152 

153 self._load_client_certs(context) 

154 

155 return context 

156 

157 def _create_default_ssl_context(self) -> ssl.SSLContext: 

158 """ 

159 Creates the default SSLContext object that's used for both verified 

160 and unverified connections. 

161 """ 

162 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 

163 set_minimum_tls_version_1_2(context) 

164 context.options |= ssl.OP_NO_COMPRESSION 

165 context.set_ciphers(DEFAULT_CIPHERS) 

166 

167 if ssl.HAS_ALPN: 

168 alpn_idents = ["http/1.1", "h2"] if self.http2 else ["http/1.1"] 

169 context.set_alpn_protocols(alpn_idents) 

170 

171 if sys.version_info >= (3, 8): # pragma: no cover 

172 keylogfile = os.environ.get("SSLKEYLOGFILE") 

173 if keylogfile and self.trust_env: 

174 context.keylog_filename = keylogfile 

175 

176 return context 

177 

178 def _load_client_certs(self, ssl_context: ssl.SSLContext) -> None: 

179 """ 

180 Loads client certificates into our SSLContext object 

181 """ 

182 if self.cert is not None: 

183 if isinstance(self.cert, str): 

184 ssl_context.load_cert_chain(certfile=self.cert) 

185 elif isinstance(self.cert, tuple) and len(self.cert) == 2: 

186 ssl_context.load_cert_chain(certfile=self.cert[0], keyfile=self.cert[1]) 

187 elif isinstance(self.cert, tuple) and len(self.cert) == 3: 

188 ssl_context.load_cert_chain( 

189 certfile=self.cert[0], 

190 keyfile=self.cert[1], 

191 password=self.cert[2], # type: ignore 

192 ) 

193 

194 

195class Timeout: 

196 """ 

197 Timeout configuration. 

198 

199 **Usage**: 

200 

201 Timeout(None) # No timeouts. 

202 Timeout(5.0) # 5s timeout on all operations. 

203 Timeout(None, connect=5.0) # 5s timeout on connect, no other timeouts. 

204 Timeout(5.0, connect=10.0) # 10s timeout on connect. 5s timeout elsewhere. 

205 Timeout(5.0, pool=None) # No timeout on acquiring connection from pool. 

206 # 5s timeout elsewhere. 

207 """ 

208 

209 def __init__( 

210 self, 

211 timeout: typing.Union[TimeoutTypes, UnsetType] = UNSET, 

212 *, 

213 connect: typing.Union[None, float, UnsetType] = UNSET, 

214 read: typing.Union[None, float, UnsetType] = UNSET, 

215 write: typing.Union[None, float, UnsetType] = UNSET, 

216 pool: typing.Union[None, float, UnsetType] = UNSET, 

217 ): 

218 if isinstance(timeout, Timeout): 

219 # Passed as a single explicit Timeout. 

220 assert connect is UNSET 

221 assert read is UNSET 

222 assert write is UNSET 

223 assert pool is UNSET 

224 self.connect = timeout.connect # type: typing.Optional[float] 

225 self.read = timeout.read # type: typing.Optional[float] 

226 self.write = timeout.write # type: typing.Optional[float] 

227 self.pool = timeout.pool # type: typing.Optional[float] 

228 elif isinstance(timeout, tuple): 

229 # Passed as a tuple. 

230 self.connect = timeout[0] 

231 self.read = timeout[1] 

232 self.write = None if len(timeout) < 3 else timeout[2] 

233 self.pool = None if len(timeout) < 4 else timeout[3] 

234 elif not ( 

235 isinstance(connect, UnsetType) 

236 or isinstance(read, UnsetType) 

237 or isinstance(write, UnsetType) 

238 or isinstance(pool, UnsetType) 

239 ): 

240 self.connect = connect 

241 self.read = read 

242 self.write = write 

243 self.pool = pool 

244 else: 

245 if isinstance(timeout, UnsetType): 

246 raise ValueError( 

247 "httpx.Timeout must either include a default, or set all " 

248 "four parameters explicitly." 

249 ) 

250 self.connect = timeout if isinstance(connect, UnsetType) else connect 

251 self.read = timeout if isinstance(read, UnsetType) else read 

252 self.write = timeout if isinstance(write, UnsetType) else write 

253 self.pool = timeout if isinstance(pool, UnsetType) else pool 

254 

255 def as_dict(self) -> typing.Dict[str, typing.Optional[float]]: 

256 return { 

257 "connect": self.connect, 

258 "read": self.read, 

259 "write": self.write, 

260 "pool": self.pool, 

261 } 

262 

263 def __eq__(self, other: typing.Any) -> bool: 

264 return ( 

265 isinstance(other, self.__class__) 

266 and self.connect == other.connect 

267 and self.read == other.read 

268 and self.write == other.write 

269 and self.pool == other.pool 

270 ) 

271 

272 def __repr__(self) -> str: 

273 class_name = self.__class__.__name__ 

274 if len({self.connect, self.read, self.write, self.pool}) == 1: 

275 return f"{class_name}(timeout={self.connect})" 

276 return ( 

277 f"{class_name}(connect={self.connect}, " 

278 f"read={self.read}, write={self.write}, pool={self.pool})" 

279 ) 

280 

281 

282class Limits: 

283 """ 

284 Configuration for limits to various client behaviors. 

285 

286 **Parameters:** 

287 

288 * **max_connections** - The maximum number of concurrent connections that may be 

289 established. 

290 * **max_keepalive_connections** - Allow the connection pool to maintain 

291 keep-alive connections below this point. Should be less than or equal 

292 to `max_connections`. 

293 * **keepalive_expiry** - Time limit on idle keep-alive connections in seconds. 

294 """ 

295 

296 def __init__( 

297 self, 

298 *, 

299 max_connections: typing.Optional[int] = None, 

300 max_keepalive_connections: typing.Optional[int] = None, 

301 keepalive_expiry: typing.Optional[float] = 5.0, 

302 ): 

303 self.max_connections = max_connections 

304 self.max_keepalive_connections = max_keepalive_connections 

305 self.keepalive_expiry = keepalive_expiry 

306 

307 def __eq__(self, other: typing.Any) -> bool: 

308 return ( 

309 isinstance(other, self.__class__) 

310 and self.max_connections == other.max_connections 

311 and self.max_keepalive_connections == other.max_keepalive_connections 

312 and self.keepalive_expiry == other.keepalive_expiry 

313 ) 

314 

315 def __repr__(self) -> str: 

316 class_name = self.__class__.__name__ 

317 return ( 

318 f"{class_name}(max_connections={self.max_connections}, " 

319 f"max_keepalive_connections={self.max_keepalive_connections}, " 

320 f"keepalive_expiry={self.keepalive_expiry})" 

321 ) 

322 

323 

324class Proxy: 

325 def __init__( 

326 self, 

327 url: URLTypes, 

328 *, 

329 auth: typing.Optional[typing.Tuple[str, str]] = None, 

330 headers: typing.Optional[HeaderTypes] = None, 

331 ): 

332 url = URL(url) 

333 headers = Headers(headers) 

334 

335 if url.scheme not in ("http", "https", "socks5"): 

336 raise ValueError(f"Unknown scheme for proxy URL {url!r}") 

337 

338 if url.username or url.password: 

339 # Remove any auth credentials from the URL. 

340 auth = (url.username, url.password) 

341 url = url.copy_with(username=None, password=None) 

342 

343 self.url = url 

344 self.auth = auth 

345 self.headers = headers 

346 

347 @property 

348 def raw_auth(self) -> typing.Optional[typing.Tuple[bytes, bytes]]: 

349 # The proxy authentication as raw bytes. 

350 return ( 

351 None 

352 if self.auth is None 

353 else (self.auth[0].encode("utf-8"), self.auth[1].encode("utf-8")) 

354 ) 

355 

356 def __repr__(self) -> str: 

357 # The authentication is represented with the password component masked. 

358 auth = (self.auth[0], "********") if self.auth else None 

359 

360 # Build a nice concise representation. 

361 url_str = f"{str(self.url)!r}" 

362 auth_str = f", auth={auth!r}" if auth else "" 

363 headers_str = f", headers={dict(self.headers)!r}" if self.headers else "" 

364 return f"Proxy({url_str}{auth_str}{headers_str})" 

365 

366 

367DEFAULT_TIMEOUT_CONFIG = Timeout(timeout=5.0) 

368DEFAULT_LIMITS = Limits(max_connections=100, max_keepalive_connections=20) 

369DEFAULT_MAX_REDIRECTS = 20