Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/httpx/_config.py: 42%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import os
4import typing
6from ._models import Headers
7from ._types import CertTypes, HeaderTypes, TimeoutTypes
8from ._urls import URL
10if typing.TYPE_CHECKING:
11 import ssl # pragma: no cover
13__all__ = ["Limits", "Proxy", "Timeout", "create_ssl_context"]
16class UnsetType:
17 pass # pragma: no cover
20UNSET = UnsetType()
23def create_ssl_context(
24 verify: ssl.SSLContext | str | bool = True,
25 cert: CertTypes | None = None,
26 trust_env: bool = True,
27) -> ssl.SSLContext:
28 import ssl
29 import warnings
31 import certifi
33 if verify is True:
34 if trust_env and os.environ.get("SSL_CERT_FILE"): # pragma: nocover
35 ctx = ssl.create_default_context(cafile=os.environ["SSL_CERT_FILE"])
36 elif trust_env and os.environ.get("SSL_CERT_DIR"): # pragma: nocover
37 ctx = ssl.create_default_context(capath=os.environ["SSL_CERT_DIR"])
38 else:
39 # Default case...
40 ctx = ssl.create_default_context(cafile=certifi.where())
41 elif verify is False:
42 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
43 ctx.check_hostname = False
44 ctx.verify_mode = ssl.CERT_NONE
45 elif isinstance(verify, str): # pragma: nocover
46 message = (
47 "`verify=<str>` is deprecated. "
48 "Use `verify=ssl.create_default_context(cafile=...)` "
49 "or `verify=ssl.create_default_context(capath=...)` instead."
50 )
51 warnings.warn(message, DeprecationWarning)
52 if os.path.isdir(verify):
53 return ssl.create_default_context(capath=verify)
54 return ssl.create_default_context(cafile=verify)
55 else:
56 ctx = verify
58 if cert: # pragma: nocover
59 message = (
60 "`cert=...` is deprecated. Use `verify=<ssl_context>` instead,"
61 "with `.load_cert_chain()` to configure the certificate chain."
62 )
63 warnings.warn(message, DeprecationWarning)
64 if isinstance(cert, str):
65 ctx.load_cert_chain(cert)
66 else:
67 ctx.load_cert_chain(*cert)
69 return ctx
72class Timeout:
73 """
74 Timeout configuration.
76 **Usage**:
78 Timeout(None) # No timeouts.
79 Timeout(5.0) # 5s timeout on all operations.
80 Timeout(None, connect=5.0) # 5s timeout on connect, no other timeouts.
81 Timeout(5.0, connect=10.0) # 10s timeout on connect. 5s timeout elsewhere.
82 Timeout(5.0, pool=None) # No timeout on acquiring connection from pool.
83 # 5s timeout elsewhere.
84 """
86 def __init__(
87 self,
88 timeout: TimeoutTypes | UnsetType = UNSET,
89 *,
90 connect: None | float | UnsetType = UNSET,
91 read: None | float | UnsetType = UNSET,
92 write: None | float | UnsetType = UNSET,
93 pool: None | float | UnsetType = UNSET,
94 ) -> None:
95 if isinstance(timeout, Timeout):
96 # Passed as a single explicit Timeout.
97 assert connect is UNSET
98 assert read is UNSET
99 assert write is UNSET
100 assert pool is UNSET
101 self.connect = timeout.connect # type: typing.Optional[float]
102 self.read = timeout.read # type: typing.Optional[float]
103 self.write = timeout.write # type: typing.Optional[float]
104 self.pool = timeout.pool # type: typing.Optional[float]
105 elif isinstance(timeout, tuple):
106 # Passed as a tuple.
107 self.connect = timeout[0]
108 self.read = timeout[1]
109 self.write = None if len(timeout) < 3 else timeout[2]
110 self.pool = None if len(timeout) < 4 else timeout[3]
111 elif not (
112 isinstance(connect, UnsetType)
113 or isinstance(read, UnsetType)
114 or isinstance(write, UnsetType)
115 or isinstance(pool, UnsetType)
116 ):
117 self.connect = connect
118 self.read = read
119 self.write = write
120 self.pool = pool
121 else:
122 if isinstance(timeout, UnsetType):
123 raise ValueError(
124 "httpx.Timeout must either include a default, or set all "
125 "four parameters explicitly."
126 )
127 self.connect = timeout if isinstance(connect, UnsetType) else connect
128 self.read = timeout if isinstance(read, UnsetType) else read
129 self.write = timeout if isinstance(write, UnsetType) else write
130 self.pool = timeout if isinstance(pool, UnsetType) else pool
132 def as_dict(self) -> dict[str, float | None]:
133 return {
134 "connect": self.connect,
135 "read": self.read,
136 "write": self.write,
137 "pool": self.pool,
138 }
140 def __eq__(self, other: typing.Any) -> bool:
141 return (
142 isinstance(other, self.__class__)
143 and self.connect == other.connect
144 and self.read == other.read
145 and self.write == other.write
146 and self.pool == other.pool
147 )
149 def __repr__(self) -> str:
150 class_name = self.__class__.__name__
151 if len({self.connect, self.read, self.write, self.pool}) == 1:
152 return f"{class_name}(timeout={self.connect})"
153 return (
154 f"{class_name}(connect={self.connect}, "
155 f"read={self.read}, write={self.write}, pool={self.pool})"
156 )
159class Limits:
160 """
161 Configuration for limits to various client behaviors.
163 **Parameters:**
165 * **max_connections** - The maximum number of concurrent connections that may be
166 established.
167 * **max_keepalive_connections** - Allow the connection pool to maintain
168 keep-alive connections below this point. Should be less than or equal
169 to `max_connections`.
170 * **keepalive_expiry** - Time limit on idle keep-alive connections in seconds.
171 """
173 def __init__(
174 self,
175 *,
176 max_connections: int | None = None,
177 max_keepalive_connections: int | None = None,
178 keepalive_expiry: float | None = 5.0,
179 ) -> None:
180 self.max_connections = max_connections
181 self.max_keepalive_connections = max_keepalive_connections
182 self.keepalive_expiry = keepalive_expiry
184 def __eq__(self, other: typing.Any) -> bool:
185 return (
186 isinstance(other, self.__class__)
187 and self.max_connections == other.max_connections
188 and self.max_keepalive_connections == other.max_keepalive_connections
189 and self.keepalive_expiry == other.keepalive_expiry
190 )
192 def __repr__(self) -> str:
193 class_name = self.__class__.__name__
194 return (
195 f"{class_name}(max_connections={self.max_connections}, "
196 f"max_keepalive_connections={self.max_keepalive_connections}, "
197 f"keepalive_expiry={self.keepalive_expiry})"
198 )
201class Proxy:
202 def __init__(
203 self,
204 url: URL | str,
205 *,
206 ssl_context: ssl.SSLContext | None = None,
207 auth: tuple[str, str] | None = None,
208 headers: HeaderTypes | None = None,
209 ) -> None:
210 url = URL(url)
211 headers = Headers(headers)
213 if url.scheme not in ("http", "https", "socks5", "socks5h"):
214 raise ValueError(f"Unknown scheme for proxy URL {url!r}")
216 if url.username or url.password:
217 # Remove any auth credentials from the URL.
218 auth = (url.username, url.password)
219 url = url.copy_with(username=None, password=None)
221 self.url = url
222 self.auth = auth
223 self.headers = headers
224 self.ssl_context = ssl_context
226 @property
227 def raw_auth(self) -> tuple[bytes, bytes] | None:
228 # The proxy authentication as raw bytes.
229 return (
230 None
231 if self.auth is None
232 else (self.auth[0].encode("utf-8"), self.auth[1].encode("utf-8"))
233 )
235 def __repr__(self) -> str:
236 # The authentication is represented with the password component masked.
237 auth = (self.auth[0], "********") if self.auth else None
239 # Build a nice concise representation.
240 url_str = f"{str(self.url)!r}"
241 auth_str = f", auth={auth!r}" if auth else ""
242 headers_str = f", headers={dict(self.headers)!r}" if self.headers else ""
243 return f"Proxy({url_str}{auth_str}{headers_str})"
246DEFAULT_TIMEOUT_CONFIG = Timeout(timeout=5.0)
247DEFAULT_LIMITS = Limits(max_connections=100, max_keepalive_connections=20)
248DEFAULT_MAX_REDIRECTS = 20