Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpx/_transports/default.py: 36%
98 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 07:19 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 07:19 +0000
1"""
2Custom transports, with nicely configured defaults.
4The following additional keyword arguments are currently supported by httpcore...
6* uds: str
7* local_address: str
8* retries: int
10Example usages...
12# Disable HTTP/2 on a single specific domain.
13mounts = {
14 "all://": httpx.HTTPTransport(http2=True),
15 "all://*example.org": httpx.HTTPTransport()
16}
18# Using advanced httpcore configuration, with connection retries.
19transport = httpx.HTTPTransport(retries=1)
20client = httpx.Client(transport=transport)
22# Using advanced httpcore configuration, with unix domain sockets.
23transport = httpx.HTTPTransport(uds="socket.uds")
24client = httpx.Client(transport=transport)
25"""
26import contextlib
27import typing
28from types import TracebackType
30import httpcore
32from .._config import DEFAULT_LIMITS, Limits, Proxy, create_ssl_context
33from .._exceptions import (
34 ConnectError,
35 ConnectTimeout,
36 LocalProtocolError,
37 NetworkError,
38 PoolTimeout,
39 ProtocolError,
40 ProxyError,
41 ReadError,
42 ReadTimeout,
43 RemoteProtocolError,
44 TimeoutException,
45 UnsupportedProtocol,
46 WriteError,
47 WriteTimeout,
48)
49from .._models import Request, Response
50from .._types import AsyncByteStream, CertTypes, SyncByteStream, VerifyTypes
51from .base import AsyncBaseTransport, BaseTransport
53T = typing.TypeVar("T", bound="HTTPTransport")
54A = typing.TypeVar("A", bound="AsyncHTTPTransport")
56SOCKET_OPTION = typing.Union[
57 typing.Tuple[int, int, int],
58 typing.Tuple[int, int, typing.Union[bytes, bytearray]],
59 typing.Tuple[int, int, None, int],
60]
63@contextlib.contextmanager
64def map_httpcore_exceptions() -> typing.Iterator[None]:
65 try:
66 yield
67 except Exception as exc: # noqa: PIE-786
68 mapped_exc = None
70 for from_exc, to_exc in HTTPCORE_EXC_MAP.items():
71 if not isinstance(exc, from_exc):
72 continue
73 # We want to map to the most specific exception we can find.
74 # Eg if `exc` is an `httpcore.ReadTimeout`, we want to map to
75 # `httpx.ReadTimeout`, not just `httpx.TimeoutException`.
76 if mapped_exc is None or issubclass(to_exc, mapped_exc):
77 mapped_exc = to_exc
79 if mapped_exc is None: # pragma: no cover
80 raise
82 message = str(exc)
83 raise mapped_exc(message) from exc
86HTTPCORE_EXC_MAP = {
87 httpcore.TimeoutException: TimeoutException,
88 httpcore.ConnectTimeout: ConnectTimeout,
89 httpcore.ReadTimeout: ReadTimeout,
90 httpcore.WriteTimeout: WriteTimeout,
91 httpcore.PoolTimeout: PoolTimeout,
92 httpcore.NetworkError: NetworkError,
93 httpcore.ConnectError: ConnectError,
94 httpcore.ReadError: ReadError,
95 httpcore.WriteError: WriteError,
96 httpcore.ProxyError: ProxyError,
97 httpcore.UnsupportedProtocol: UnsupportedProtocol,
98 httpcore.ProtocolError: ProtocolError,
99 httpcore.LocalProtocolError: LocalProtocolError,
100 httpcore.RemoteProtocolError: RemoteProtocolError,
101}
104class ResponseStream(SyncByteStream):
105 def __init__(self, httpcore_stream: typing.Iterable[bytes]):
106 self._httpcore_stream = httpcore_stream
108 def __iter__(self) -> typing.Iterator[bytes]:
109 with map_httpcore_exceptions():
110 for part in self._httpcore_stream:
111 yield part
113 def close(self) -> None:
114 if hasattr(self._httpcore_stream, "close"):
115 self._httpcore_stream.close()
118class HTTPTransport(BaseTransport):
119 def __init__(
120 self,
121 verify: VerifyTypes = True,
122 cert: typing.Optional[CertTypes] = None,
123 http1: bool = True,
124 http2: bool = False,
125 limits: Limits = DEFAULT_LIMITS,
126 trust_env: bool = True,
127 proxy: typing.Optional[Proxy] = None,
128 uds: typing.Optional[str] = None,
129 local_address: typing.Optional[str] = None,
130 retries: int = 0,
131 socket_options: typing.Optional[typing.Iterable[SOCKET_OPTION]] = None,
132 ) -> None:
133 ssl_context = create_ssl_context(verify=verify, cert=cert, trust_env=trust_env)
135 if proxy is None:
136 self._pool = httpcore.ConnectionPool(
137 ssl_context=ssl_context,
138 max_connections=limits.max_connections,
139 max_keepalive_connections=limits.max_keepalive_connections,
140 keepalive_expiry=limits.keepalive_expiry,
141 http1=http1,
142 http2=http2,
143 uds=uds,
144 local_address=local_address,
145 retries=retries,
146 socket_options=socket_options,
147 )
148 elif proxy.url.scheme in ("http", "https"):
149 self._pool = httpcore.HTTPProxy(
150 proxy_url=httpcore.URL(
151 scheme=proxy.url.raw_scheme,
152 host=proxy.url.raw_host,
153 port=proxy.url.port,
154 target=proxy.url.raw_path,
155 ),
156 proxy_auth=proxy.raw_auth,
157 proxy_headers=proxy.headers.raw,
158 ssl_context=ssl_context,
159 max_connections=limits.max_connections,
160 max_keepalive_connections=limits.max_keepalive_connections,
161 keepalive_expiry=limits.keepalive_expiry,
162 http1=http1,
163 http2=http2,
164 socket_options=socket_options,
165 )
166 elif proxy.url.scheme == "socks5":
167 try:
168 import socksio # noqa
169 except ImportError: # pragma: no cover
170 raise ImportError(
171 "Using SOCKS proxy, but the 'socksio' package is not installed. "
172 "Make sure to install httpx using `pip install httpx[socks]`."
173 ) from None
175 self._pool = httpcore.SOCKSProxy(
176 proxy_url=httpcore.URL(
177 scheme=proxy.url.raw_scheme,
178 host=proxy.url.raw_host,
179 port=proxy.url.port,
180 target=proxy.url.raw_path,
181 ),
182 proxy_auth=proxy.raw_auth,
183 ssl_context=ssl_context,
184 max_connections=limits.max_connections,
185 max_keepalive_connections=limits.max_keepalive_connections,
186 keepalive_expiry=limits.keepalive_expiry,
187 http1=http1,
188 http2=http2,
189 )
190 else: # pragma: no cover
191 raise ValueError(
192 f"Proxy protocol must be either 'http', 'https', or 'socks5', but got {proxy.url.scheme!r}."
193 )
195 def __enter__(self: T) -> T: # Use generics for subclass support.
196 self._pool.__enter__()
197 return self
199 def __exit__(
200 self,
201 exc_type: typing.Optional[typing.Type[BaseException]] = None,
202 exc_value: typing.Optional[BaseException] = None,
203 traceback: typing.Optional[TracebackType] = None,
204 ) -> None:
205 with map_httpcore_exceptions():
206 self._pool.__exit__(exc_type, exc_value, traceback)
208 def handle_request(
209 self,
210 request: Request,
211 ) -> Response:
212 assert isinstance(request.stream, SyncByteStream)
214 req = httpcore.Request(
215 method=request.method,
216 url=httpcore.URL(
217 scheme=request.url.raw_scheme,
218 host=request.url.raw_host,
219 port=request.url.port,
220 target=request.url.raw_path,
221 ),
222 headers=request.headers.raw,
223 content=request.stream,
224 extensions=request.extensions,
225 )
226 with map_httpcore_exceptions():
227 resp = self._pool.handle_request(req)
229 assert isinstance(resp.stream, typing.Iterable)
231 return Response(
232 status_code=resp.status,
233 headers=resp.headers,
234 stream=ResponseStream(resp.stream),
235 extensions=resp.extensions,
236 )
238 def close(self) -> None:
239 self._pool.close()
242class AsyncResponseStream(AsyncByteStream):
243 def __init__(self, httpcore_stream: typing.AsyncIterable[bytes]):
244 self._httpcore_stream = httpcore_stream
246 async def __aiter__(self) -> typing.AsyncIterator[bytes]:
247 with map_httpcore_exceptions():
248 async for part in self._httpcore_stream:
249 yield part
251 async def aclose(self) -> None:
252 if hasattr(self._httpcore_stream, "aclose"):
253 await self._httpcore_stream.aclose()
256class AsyncHTTPTransport(AsyncBaseTransport):
257 def __init__(
258 self,
259 verify: VerifyTypes = True,
260 cert: typing.Optional[CertTypes] = None,
261 http1: bool = True,
262 http2: bool = False,
263 limits: Limits = DEFAULT_LIMITS,
264 trust_env: bool = True,
265 proxy: typing.Optional[Proxy] = None,
266 uds: typing.Optional[str] = None,
267 local_address: typing.Optional[str] = None,
268 retries: int = 0,
269 socket_options: typing.Optional[typing.Iterable[SOCKET_OPTION]] = None,
270 ) -> None:
271 ssl_context = create_ssl_context(verify=verify, cert=cert, trust_env=trust_env)
273 if proxy is None:
274 self._pool = httpcore.AsyncConnectionPool(
275 ssl_context=ssl_context,
276 max_connections=limits.max_connections,
277 max_keepalive_connections=limits.max_keepalive_connections,
278 keepalive_expiry=limits.keepalive_expiry,
279 http1=http1,
280 http2=http2,
281 uds=uds,
282 local_address=local_address,
283 retries=retries,
284 socket_options=socket_options,
285 )
286 elif proxy.url.scheme in ("http", "https"):
287 self._pool = httpcore.AsyncHTTPProxy(
288 proxy_url=httpcore.URL(
289 scheme=proxy.url.raw_scheme,
290 host=proxy.url.raw_host,
291 port=proxy.url.port,
292 target=proxy.url.raw_path,
293 ),
294 proxy_auth=proxy.raw_auth,
295 proxy_headers=proxy.headers.raw,
296 ssl_context=ssl_context,
297 max_connections=limits.max_connections,
298 max_keepalive_connections=limits.max_keepalive_connections,
299 keepalive_expiry=limits.keepalive_expiry,
300 http1=http1,
301 http2=http2,
302 socket_options=socket_options,
303 )
304 elif proxy.url.scheme == "socks5":
305 try:
306 import socksio # noqa
307 except ImportError: # pragma: no cover
308 raise ImportError(
309 "Using SOCKS proxy, but the 'socksio' package is not installed. "
310 "Make sure to install httpx using `pip install httpx[socks]`."
311 ) from None
313 self._pool = httpcore.AsyncSOCKSProxy(
314 proxy_url=httpcore.URL(
315 scheme=proxy.url.raw_scheme,
316 host=proxy.url.raw_host,
317 port=proxy.url.port,
318 target=proxy.url.raw_path,
319 ),
320 proxy_auth=proxy.raw_auth,
321 ssl_context=ssl_context,
322 max_connections=limits.max_connections,
323 max_keepalive_connections=limits.max_keepalive_connections,
324 keepalive_expiry=limits.keepalive_expiry,
325 http1=http1,
326 http2=http2,
327 )
328 else: # pragma: no cover
329 raise ValueError(
330 f"Proxy protocol must be either 'http', 'https', or 'socks5', but got {proxy.url.scheme!r}."
331 )
333 async def __aenter__(self: A) -> A: # Use generics for subclass support.
334 await self._pool.__aenter__()
335 return self
337 async def __aexit__(
338 self,
339 exc_type: typing.Optional[typing.Type[BaseException]] = None,
340 exc_value: typing.Optional[BaseException] = None,
341 traceback: typing.Optional[TracebackType] = None,
342 ) -> None:
343 with map_httpcore_exceptions():
344 await self._pool.__aexit__(exc_type, exc_value, traceback)
346 async def handle_async_request(
347 self,
348 request: Request,
349 ) -> Response:
350 assert isinstance(request.stream, AsyncByteStream)
352 req = httpcore.Request(
353 method=request.method,
354 url=httpcore.URL(
355 scheme=request.url.raw_scheme,
356 host=request.url.raw_host,
357 port=request.url.port,
358 target=request.url.raw_path,
359 ),
360 headers=request.headers.raw,
361 content=request.stream,
362 extensions=request.extensions,
363 )
364 with map_httpcore_exceptions():
365 resp = await self._pool.handle_async_request(req)
367 assert isinstance(resp.stream, typing.AsyncIterable)
369 return Response(
370 status_code=resp.status,
371 headers=resp.headers,
372 stream=AsyncResponseStream(resp.stream),
373 extensions=resp.extensions,
374 )
376 async def aclose(self) -> None:
377 await self._pool.aclose()