Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpx/_transports/default.py: 35%
97 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
1"""
2Custom transports, with nicely configured defaults.
4The following additional keyword arguments are currently supported by httpcore...
6* uds: str
7* local_address: str
8* retries: int
10Example usages...
12# Disable HTTP/2 on a single specific domain.
13mounts = {
14 "all://": httpx.HTTPTransport(http2=True),
15 "all://*example.org": httpx.HTTPTransport()
16}
18# Using advanced httpcore configuration, with connection retries.
19transport = httpx.HTTPTransport(retries=1)
20client = httpx.Client(transport=transport)
22# Using advanced httpcore configuration, with unix domain sockets.
23transport = httpx.HTTPTransport(uds="socket.uds")
24client = httpx.Client(transport=transport)
25"""
26import contextlib
27import typing
28from types import TracebackType
30import httpcore
32from .._config import DEFAULT_LIMITS, Limits, Proxy, create_ssl_context
33from .._exceptions import (
34 ConnectError,
35 ConnectTimeout,
36 LocalProtocolError,
37 NetworkError,
38 PoolTimeout,
39 ProtocolError,
40 ProxyError,
41 ReadError,
42 ReadTimeout,
43 RemoteProtocolError,
44 TimeoutException,
45 UnsupportedProtocol,
46 WriteError,
47 WriteTimeout,
48)
49from .._models import Request, Response
50from .._types import AsyncByteStream, CertTypes, SyncByteStream, VerifyTypes
51from .base import AsyncBaseTransport, BaseTransport
53T = typing.TypeVar("T", bound="HTTPTransport")
54A = typing.TypeVar("A", bound="AsyncHTTPTransport")
57@contextlib.contextmanager
58def map_httpcore_exceptions() -> typing.Iterator[None]:
59 try:
60 yield
61 except Exception as exc: # noqa: PIE-786
62 mapped_exc = None
64 for from_exc, to_exc in HTTPCORE_EXC_MAP.items():
65 if not isinstance(exc, from_exc):
66 continue
67 # We want to map to the most specific exception we can find.
68 # Eg if `exc` is an `httpcore.ReadTimeout`, we want to map to
69 # `httpx.ReadTimeout`, not just `httpx.TimeoutException`.
70 if mapped_exc is None or issubclass(to_exc, mapped_exc):
71 mapped_exc = to_exc
73 if mapped_exc is None: # pragma: no cover
74 raise
76 message = str(exc)
77 raise mapped_exc(message) from exc
80HTTPCORE_EXC_MAP = {
81 httpcore.TimeoutException: TimeoutException,
82 httpcore.ConnectTimeout: ConnectTimeout,
83 httpcore.ReadTimeout: ReadTimeout,
84 httpcore.WriteTimeout: WriteTimeout,
85 httpcore.PoolTimeout: PoolTimeout,
86 httpcore.NetworkError: NetworkError,
87 httpcore.ConnectError: ConnectError,
88 httpcore.ReadError: ReadError,
89 httpcore.WriteError: WriteError,
90 httpcore.ProxyError: ProxyError,
91 httpcore.UnsupportedProtocol: UnsupportedProtocol,
92 httpcore.ProtocolError: ProtocolError,
93 httpcore.LocalProtocolError: LocalProtocolError,
94 httpcore.RemoteProtocolError: RemoteProtocolError,
95}
98class ResponseStream(SyncByteStream):
99 def __init__(self, httpcore_stream: typing.Iterable[bytes]):
100 self._httpcore_stream = httpcore_stream
102 def __iter__(self) -> typing.Iterator[bytes]:
103 with map_httpcore_exceptions():
104 for part in self._httpcore_stream:
105 yield part
107 def close(self) -> None:
108 if hasattr(self._httpcore_stream, "close"):
109 self._httpcore_stream.close() # type: ignore
112class HTTPTransport(BaseTransport):
113 def __init__(
114 self,
115 verify: VerifyTypes = True,
116 cert: typing.Optional[CertTypes] = None,
117 http1: bool = True,
118 http2: bool = False,
119 limits: Limits = DEFAULT_LIMITS,
120 trust_env: bool = True,
121 proxy: typing.Optional[Proxy] = None,
122 uds: typing.Optional[str] = None,
123 local_address: typing.Optional[str] = None,
124 retries: int = 0,
125 ) -> None:
126 ssl_context = create_ssl_context(verify=verify, cert=cert, trust_env=trust_env)
128 if proxy is None:
129 self._pool = httpcore.ConnectionPool(
130 ssl_context=ssl_context,
131 max_connections=limits.max_connections,
132 max_keepalive_connections=limits.max_keepalive_connections,
133 keepalive_expiry=limits.keepalive_expiry,
134 http1=http1,
135 http2=http2,
136 uds=uds,
137 local_address=local_address,
138 retries=retries,
139 )
140 elif proxy.url.scheme in ("http", "https"):
141 self._pool = httpcore.HTTPProxy(
142 proxy_url=httpcore.URL(
143 scheme=proxy.url.raw_scheme,
144 host=proxy.url.raw_host,
145 port=proxy.url.port,
146 target=proxy.url.raw_path,
147 ),
148 proxy_auth=proxy.raw_auth,
149 proxy_headers=proxy.headers.raw,
150 ssl_context=ssl_context,
151 max_connections=limits.max_connections,
152 max_keepalive_connections=limits.max_keepalive_connections,
153 keepalive_expiry=limits.keepalive_expiry,
154 http1=http1,
155 http2=http2,
156 )
157 elif proxy.url.scheme == "socks5":
158 try:
159 import socksio # noqa
160 except ImportError: # pragma: no cover
161 raise ImportError(
162 "Using SOCKS proxy, but the 'socksio' package is not installed. "
163 "Make sure to install httpx using `pip install httpx[socks]`."
164 ) from None
166 self._pool = httpcore.SOCKSProxy(
167 proxy_url=httpcore.URL(
168 scheme=proxy.url.raw_scheme,
169 host=proxy.url.raw_host,
170 port=proxy.url.port,
171 target=proxy.url.raw_path,
172 ),
173 proxy_auth=proxy.raw_auth,
174 ssl_context=ssl_context,
175 max_connections=limits.max_connections,
176 max_keepalive_connections=limits.max_keepalive_connections,
177 keepalive_expiry=limits.keepalive_expiry,
178 http1=http1,
179 http2=http2,
180 )
181 else: # pragma: no cover
182 raise ValueError(
183 f"Proxy protocol must be either 'http', 'https', or 'socks5', but got {proxy.url.scheme!r}."
184 )
186 def __enter__(self: T) -> T: # Use generics for subclass support.
187 self._pool.__enter__()
188 return self
190 def __exit__(
191 self,
192 exc_type: typing.Optional[typing.Type[BaseException]] = None,
193 exc_value: typing.Optional[BaseException] = None,
194 traceback: typing.Optional[TracebackType] = None,
195 ) -> None:
196 with map_httpcore_exceptions():
197 self._pool.__exit__(exc_type, exc_value, traceback)
199 def handle_request(
200 self,
201 request: Request,
202 ) -> Response:
203 assert isinstance(request.stream, SyncByteStream)
205 req = httpcore.Request(
206 method=request.method,
207 url=httpcore.URL(
208 scheme=request.url.raw_scheme,
209 host=request.url.raw_host,
210 port=request.url.port,
211 target=request.url.raw_path,
212 ),
213 headers=request.headers.raw,
214 content=request.stream,
215 extensions=request.extensions,
216 )
217 with map_httpcore_exceptions():
218 resp = self._pool.handle_request(req)
220 assert isinstance(resp.stream, typing.Iterable)
222 return Response(
223 status_code=resp.status,
224 headers=resp.headers,
225 stream=ResponseStream(resp.stream),
226 extensions=resp.extensions,
227 )
229 def close(self) -> None:
230 self._pool.close()
233class AsyncResponseStream(AsyncByteStream):
234 def __init__(self, httpcore_stream: typing.AsyncIterable[bytes]):
235 self._httpcore_stream = httpcore_stream
237 async def __aiter__(self) -> typing.AsyncIterator[bytes]:
238 with map_httpcore_exceptions():
239 async for part in self._httpcore_stream:
240 yield part
242 async def aclose(self) -> None:
243 if hasattr(self._httpcore_stream, "aclose"):
244 await self._httpcore_stream.aclose() # type: ignore
247class AsyncHTTPTransport(AsyncBaseTransport):
248 def __init__(
249 self,
250 verify: VerifyTypes = True,
251 cert: typing.Optional[CertTypes] = None,
252 http1: bool = True,
253 http2: bool = False,
254 limits: Limits = DEFAULT_LIMITS,
255 trust_env: bool = True,
256 proxy: typing.Optional[Proxy] = None,
257 uds: typing.Optional[str] = None,
258 local_address: typing.Optional[str] = None,
259 retries: int = 0,
260 ) -> None:
261 ssl_context = create_ssl_context(verify=verify, cert=cert, trust_env=trust_env)
263 if proxy is None:
264 self._pool = httpcore.AsyncConnectionPool(
265 ssl_context=ssl_context,
266 max_connections=limits.max_connections,
267 max_keepalive_connections=limits.max_keepalive_connections,
268 keepalive_expiry=limits.keepalive_expiry,
269 http1=http1,
270 http2=http2,
271 uds=uds,
272 local_address=local_address,
273 retries=retries,
274 )
275 elif proxy.url.scheme in ("http", "https"):
276 self._pool = httpcore.AsyncHTTPProxy(
277 proxy_url=httpcore.URL(
278 scheme=proxy.url.raw_scheme,
279 host=proxy.url.raw_host,
280 port=proxy.url.port,
281 target=proxy.url.raw_path,
282 ),
283 proxy_auth=proxy.raw_auth,
284 proxy_headers=proxy.headers.raw,
285 ssl_context=ssl_context,
286 max_connections=limits.max_connections,
287 max_keepalive_connections=limits.max_keepalive_connections,
288 keepalive_expiry=limits.keepalive_expiry,
289 http1=http1,
290 http2=http2,
291 )
292 elif proxy.url.scheme == "socks5":
293 try:
294 import socksio # noqa
295 except ImportError: # pragma: no cover
296 raise ImportError(
297 "Using SOCKS proxy, but the 'socksio' package is not installed. "
298 "Make sure to install httpx using `pip install httpx[socks]`."
299 ) from None
301 self._pool = httpcore.AsyncSOCKSProxy(
302 proxy_url=httpcore.URL(
303 scheme=proxy.url.raw_scheme,
304 host=proxy.url.raw_host,
305 port=proxy.url.port,
306 target=proxy.url.raw_path,
307 ),
308 proxy_auth=proxy.raw_auth,
309 ssl_context=ssl_context,
310 max_connections=limits.max_connections,
311 max_keepalive_connections=limits.max_keepalive_connections,
312 keepalive_expiry=limits.keepalive_expiry,
313 http1=http1,
314 http2=http2,
315 )
316 else: # pragma: no cover
317 raise ValueError(
318 f"Proxy protocol must be either 'http', 'https', or 'socks5', but got {proxy.url.scheme!r}."
319 )
321 async def __aenter__(self: A) -> A: # Use generics for subclass support.
322 await self._pool.__aenter__()
323 return self
325 async def __aexit__(
326 self,
327 exc_type: typing.Optional[typing.Type[BaseException]] = None,
328 exc_value: typing.Optional[BaseException] = None,
329 traceback: typing.Optional[TracebackType] = None,
330 ) -> None:
331 with map_httpcore_exceptions():
332 await self._pool.__aexit__(exc_type, exc_value, traceback)
334 async def handle_async_request(
335 self,
336 request: Request,
337 ) -> Response:
338 assert isinstance(request.stream, AsyncByteStream)
340 req = httpcore.Request(
341 method=request.method,
342 url=httpcore.URL(
343 scheme=request.url.raw_scheme,
344 host=request.url.raw_host,
345 port=request.url.port,
346 target=request.url.raw_path,
347 ),
348 headers=request.headers.raw,
349 content=request.stream,
350 extensions=request.extensions,
351 )
352 with map_httpcore_exceptions():
353 resp = await self._pool.handle_async_request(req)
355 assert isinstance(resp.stream, typing.AsyncIterable)
357 return Response(
358 status_code=resp.status,
359 headers=resp.headers,
360 stream=AsyncResponseStream(resp.stream),
361 extensions=resp.extensions,
362 )
364 async def aclose(self) -> None:
365 await self._pool.aclose()