Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpx/_config.py: 35%
153 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 07:19 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 07:19 +0000
1import logging
2import os
3import ssl
4import sys
5import typing
6from pathlib import Path
8import certifi
10from ._compat import set_minimum_tls_version_1_2
11from ._models import Headers
12from ._types import CertTypes, HeaderTypes, TimeoutTypes, URLTypes, VerifyTypes
13from ._urls import URL
14from ._utils import get_ca_bundle_from_env
16DEFAULT_CIPHERS = ":".join(
17 [
18 "ECDHE+AESGCM",
19 "ECDHE+CHACHA20",
20 "DHE+AESGCM",
21 "DHE+CHACHA20",
22 "ECDH+AESGCM",
23 "DH+AESGCM",
24 "ECDH+AES",
25 "DH+AES",
26 "RSA+AESGCM",
27 "RSA+AES",
28 "!aNULL",
29 "!eNULL",
30 "!MD5",
31 "!DSS",
32 ]
33)
36logger = logging.getLogger("httpx")
39class UnsetType:
40 pass # pragma: no cover
43UNSET = UnsetType()
46def create_ssl_context(
47 cert: typing.Optional[CertTypes] = None,
48 verify: VerifyTypes = True,
49 trust_env: bool = True,
50 http2: bool = False,
51) -> ssl.SSLContext:
52 return SSLConfig(
53 cert=cert, verify=verify, trust_env=trust_env, http2=http2
54 ).ssl_context
57class SSLConfig:
58 """
59 SSL Configuration.
60 """
62 DEFAULT_CA_BUNDLE_PATH = Path(certifi.where())
64 def __init__(
65 self,
66 *,
67 cert: typing.Optional[CertTypes] = None,
68 verify: VerifyTypes = True,
69 trust_env: bool = True,
70 http2: bool = False,
71 ):
72 self.cert = cert
73 self.verify = verify
74 self.trust_env = trust_env
75 self.http2 = http2
76 self.ssl_context = self.load_ssl_context()
78 def load_ssl_context(self) -> ssl.SSLContext:
79 logger.debug(
80 "load_ssl_context verify=%r cert=%r trust_env=%r http2=%r",
81 self.verify,
82 self.cert,
83 self.trust_env,
84 self.http2,
85 )
87 if self.verify:
88 return self.load_ssl_context_verify()
89 return self.load_ssl_context_no_verify()
91 def load_ssl_context_no_verify(self) -> ssl.SSLContext:
92 """
93 Return an SSL context for unverified connections.
94 """
95 context = self._create_default_ssl_context()
96 context.check_hostname = False
97 context.verify_mode = ssl.CERT_NONE
98 self._load_client_certs(context)
99 return context
101 def load_ssl_context_verify(self) -> ssl.SSLContext:
102 """
103 Return an SSL context for verified connections.
104 """
105 if self.trust_env and self.verify is True:
106 ca_bundle = get_ca_bundle_from_env()
107 if ca_bundle is not None:
108 self.verify = ca_bundle
110 if isinstance(self.verify, ssl.SSLContext):
111 # Allow passing in our own SSLContext object that's pre-configured.
112 context = self.verify
113 self._load_client_certs(context)
114 return context
115 elif isinstance(self.verify, bool):
116 ca_bundle_path = self.DEFAULT_CA_BUNDLE_PATH
117 elif Path(self.verify).exists():
118 ca_bundle_path = Path(self.verify)
119 else:
120 raise IOError(
121 "Could not find a suitable TLS CA certificate bundle, "
122 "invalid path: {}".format(self.verify)
123 )
125 context = self._create_default_ssl_context()
126 context.verify_mode = ssl.CERT_REQUIRED
127 context.check_hostname = True
129 # Signal to server support for PHA in TLS 1.3. Raises an
130 # AttributeError if only read-only access is implemented.
131 if sys.version_info >= (3, 8): # pragma: no cover
132 try:
133 context.post_handshake_auth = True
134 except AttributeError: # pragma: no cover
135 pass
137 # Disable using 'commonName' for SSLContext.check_hostname
138 # when the 'subjectAltName' extension isn't available.
139 try:
140 context.hostname_checks_common_name = False
141 except AttributeError: # pragma: no cover
142 pass
144 if ca_bundle_path.is_file():
145 cafile = str(ca_bundle_path)
146 logger.debug("load_verify_locations cafile=%r", cafile)
147 context.load_verify_locations(cafile=cafile)
148 elif ca_bundle_path.is_dir():
149 capath = str(ca_bundle_path)
150 logger.debug("load_verify_locations capath=%r", capath)
151 context.load_verify_locations(capath=capath)
153 self._load_client_certs(context)
155 return context
157 def _create_default_ssl_context(self) -> ssl.SSLContext:
158 """
159 Creates the default SSLContext object that's used for both verified
160 and unverified connections.
161 """
162 context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
163 set_minimum_tls_version_1_2(context)
164 context.options |= ssl.OP_NO_COMPRESSION
165 context.set_ciphers(DEFAULT_CIPHERS)
167 if ssl.HAS_ALPN:
168 alpn_idents = ["http/1.1", "h2"] if self.http2 else ["http/1.1"]
169 context.set_alpn_protocols(alpn_idents)
171 if sys.version_info >= (3, 8): # pragma: no cover
172 keylogfile = os.environ.get("SSLKEYLOGFILE")
173 if keylogfile and self.trust_env:
174 context.keylog_filename = keylogfile
176 return context
178 def _load_client_certs(self, ssl_context: ssl.SSLContext) -> None:
179 """
180 Loads client certificates into our SSLContext object
181 """
182 if self.cert is not None:
183 if isinstance(self.cert, str):
184 ssl_context.load_cert_chain(certfile=self.cert)
185 elif isinstance(self.cert, tuple) and len(self.cert) == 2:
186 ssl_context.load_cert_chain(certfile=self.cert[0], keyfile=self.cert[1])
187 elif isinstance(self.cert, tuple) and len(self.cert) == 3:
188 ssl_context.load_cert_chain(
189 certfile=self.cert[0],
190 keyfile=self.cert[1],
191 password=self.cert[2], # type: ignore
192 )
195class Timeout:
196 """
197 Timeout configuration.
199 **Usage**:
201 Timeout(None) # No timeouts.
202 Timeout(5.0) # 5s timeout on all operations.
203 Timeout(None, connect=5.0) # 5s timeout on connect, no other timeouts.
204 Timeout(5.0, connect=10.0) # 10s timeout on connect. 5s timeout elsewhere.
205 Timeout(5.0, pool=None) # No timeout on acquiring connection from pool.
206 # 5s timeout elsewhere.
207 """
209 def __init__(
210 self,
211 timeout: typing.Union[TimeoutTypes, UnsetType] = UNSET,
212 *,
213 connect: typing.Union[None, float, UnsetType] = UNSET,
214 read: typing.Union[None, float, UnsetType] = UNSET,
215 write: typing.Union[None, float, UnsetType] = UNSET,
216 pool: typing.Union[None, float, UnsetType] = UNSET,
217 ):
218 if isinstance(timeout, Timeout):
219 # Passed as a single explicit Timeout.
220 assert connect is UNSET
221 assert read is UNSET
222 assert write is UNSET
223 assert pool is UNSET
224 self.connect = timeout.connect # type: typing.Optional[float]
225 self.read = timeout.read # type: typing.Optional[float]
226 self.write = timeout.write # type: typing.Optional[float]
227 self.pool = timeout.pool # type: typing.Optional[float]
228 elif isinstance(timeout, tuple):
229 # Passed as a tuple.
230 self.connect = timeout[0]
231 self.read = timeout[1]
232 self.write = None if len(timeout) < 3 else timeout[2]
233 self.pool = None if len(timeout) < 4 else timeout[3]
234 elif not (
235 isinstance(connect, UnsetType)
236 or isinstance(read, UnsetType)
237 or isinstance(write, UnsetType)
238 or isinstance(pool, UnsetType)
239 ):
240 self.connect = connect
241 self.read = read
242 self.write = write
243 self.pool = pool
244 else:
245 if isinstance(timeout, UnsetType):
246 raise ValueError(
247 "httpx.Timeout must either include a default, or set all "
248 "four parameters explicitly."
249 )
250 self.connect = timeout if isinstance(connect, UnsetType) else connect
251 self.read = timeout if isinstance(read, UnsetType) else read
252 self.write = timeout if isinstance(write, UnsetType) else write
253 self.pool = timeout if isinstance(pool, UnsetType) else pool
255 def as_dict(self) -> typing.Dict[str, typing.Optional[float]]:
256 return {
257 "connect": self.connect,
258 "read": self.read,
259 "write": self.write,
260 "pool": self.pool,
261 }
263 def __eq__(self, other: typing.Any) -> bool:
264 return (
265 isinstance(other, self.__class__)
266 and self.connect == other.connect
267 and self.read == other.read
268 and self.write == other.write
269 and self.pool == other.pool
270 )
272 def __repr__(self) -> str:
273 class_name = self.__class__.__name__
274 if len({self.connect, self.read, self.write, self.pool}) == 1:
275 return f"{class_name}(timeout={self.connect})"
276 return (
277 f"{class_name}(connect={self.connect}, "
278 f"read={self.read}, write={self.write}, pool={self.pool})"
279 )
282class Limits:
283 """
284 Configuration for limits to various client behaviors.
286 **Parameters:**
288 * **max_connections** - The maximum number of concurrent connections that may be
289 established.
290 * **max_keepalive_connections** - Allow the connection pool to maintain
291 keep-alive connections below this point. Should be less than or equal
292 to `max_connections`.
293 * **keepalive_expiry** - Time limit on idle keep-alive connections in seconds.
294 """
296 def __init__(
297 self,
298 *,
299 max_connections: typing.Optional[int] = None,
300 max_keepalive_connections: typing.Optional[int] = None,
301 keepalive_expiry: typing.Optional[float] = 5.0,
302 ):
303 self.max_connections = max_connections
304 self.max_keepalive_connections = max_keepalive_connections
305 self.keepalive_expiry = keepalive_expiry
307 def __eq__(self, other: typing.Any) -> bool:
308 return (
309 isinstance(other, self.__class__)
310 and self.max_connections == other.max_connections
311 and self.max_keepalive_connections == other.max_keepalive_connections
312 and self.keepalive_expiry == other.keepalive_expiry
313 )
315 def __repr__(self) -> str:
316 class_name = self.__class__.__name__
317 return (
318 f"{class_name}(max_connections={self.max_connections}, "
319 f"max_keepalive_connections={self.max_keepalive_connections}, "
320 f"keepalive_expiry={self.keepalive_expiry})"
321 )
324class Proxy:
325 def __init__(
326 self,
327 url: URLTypes,
328 *,
329 auth: typing.Optional[typing.Tuple[str, str]] = None,
330 headers: typing.Optional[HeaderTypes] = None,
331 ):
332 url = URL(url)
333 headers = Headers(headers)
335 if url.scheme not in ("http", "https", "socks5"):
336 raise ValueError(f"Unknown scheme for proxy URL {url!r}")
338 if url.username or url.password:
339 # Remove any auth credentials from the URL.
340 auth = (url.username, url.password)
341 url = url.copy_with(username=None, password=None)
343 self.url = url
344 self.auth = auth
345 self.headers = headers
347 @property
348 def raw_auth(self) -> typing.Optional[typing.Tuple[bytes, bytes]]:
349 # The proxy authentication as raw bytes.
350 return (
351 None
352 if self.auth is None
353 else (self.auth[0].encode("utf-8"), self.auth[1].encode("utf-8"))
354 )
356 def __repr__(self) -> str:
357 # The authentication is represented with the password component masked.
358 auth = (self.auth[0], "********") if self.auth else None
360 # Build a nice concise representation.
361 url_str = f"{str(self.url)!r}"
362 auth_str = f", auth={auth!r}" if auth else ""
363 headers_str = f", headers={dict(self.headers)!r}" if self.headers else ""
364 return f"Proxy({url_str}{auth_str}{headers_str})"
367DEFAULT_TIMEOUT_CONFIG = Timeout(timeout=5.0)
368DEFAULT_LIMITS = Limits(max_connections=100, max_keepalive_connections=20)
369DEFAULT_MAX_REDIRECTS = 20