1"""
2Custom transports, with nicely configured defaults.
3
4The following additional keyword arguments are currently supported by httpcore...
5
6* uds: str
7* local_address: str
8* retries: int
9
10Example usages...
11
12# Disable HTTP/2 on a single specific domain.
13mounts = {
14 "all://": httpx.HTTPTransport(http2=True),
15 "all://*example.org": httpx.HTTPTransport()
16}
17
18# Using advanced httpcore configuration, with connection retries.
19transport = httpx.HTTPTransport(retries=1)
20client = httpx.Client(transport=transport)
21
22# Using advanced httpcore configuration, with unix domain sockets.
23transport = httpx.HTTPTransport(uds="socket.uds")
24client = httpx.Client(transport=transport)
25"""
26from __future__ import annotations
27
28import contextlib
29import typing
30from types import TracebackType
31
32import httpcore
33
34from .._config import DEFAULT_LIMITS, Limits, Proxy, create_ssl_context
35from .._exceptions import (
36 ConnectError,
37 ConnectTimeout,
38 LocalProtocolError,
39 NetworkError,
40 PoolTimeout,
41 ProtocolError,
42 ProxyError,
43 ReadError,
44 ReadTimeout,
45 RemoteProtocolError,
46 TimeoutException,
47 UnsupportedProtocol,
48 WriteError,
49 WriteTimeout,
50)
51from .._models import Request, Response
52from .._types import AsyncByteStream, CertTypes, ProxyTypes, SyncByteStream, VerifyTypes
53from .._urls import URL
54from .base import AsyncBaseTransport, BaseTransport
55
56T = typing.TypeVar("T", bound="HTTPTransport")
57A = typing.TypeVar("A", bound="AsyncHTTPTransport")
58
59SOCKET_OPTION = typing.Union[
60 typing.Tuple[int, int, int],
61 typing.Tuple[int, int, typing.Union[bytes, bytearray]],
62 typing.Tuple[int, int, None, int],
63]
64
65
66@contextlib.contextmanager
67def map_httpcore_exceptions() -> typing.Iterator[None]:
68 try:
69 yield
70 except Exception as exc:
71 mapped_exc = None
72
73 for from_exc, to_exc in HTTPCORE_EXC_MAP.items():
74 if not isinstance(exc, from_exc):
75 continue
76 # We want to map to the most specific exception we can find.
77 # Eg if `exc` is an `httpcore.ReadTimeout`, we want to map to
78 # `httpx.ReadTimeout`, not just `httpx.TimeoutException`.
79 if mapped_exc is None or issubclass(to_exc, mapped_exc):
80 mapped_exc = to_exc
81
82 if mapped_exc is None: # pragma: no cover
83 raise
84
85 message = str(exc)
86 raise mapped_exc(message) from exc
87
88
89HTTPCORE_EXC_MAP = {
90 httpcore.TimeoutException: TimeoutException,
91 httpcore.ConnectTimeout: ConnectTimeout,
92 httpcore.ReadTimeout: ReadTimeout,
93 httpcore.WriteTimeout: WriteTimeout,
94 httpcore.PoolTimeout: PoolTimeout,
95 httpcore.NetworkError: NetworkError,
96 httpcore.ConnectError: ConnectError,
97 httpcore.ReadError: ReadError,
98 httpcore.WriteError: WriteError,
99 httpcore.ProxyError: ProxyError,
100 httpcore.UnsupportedProtocol: UnsupportedProtocol,
101 httpcore.ProtocolError: ProtocolError,
102 httpcore.LocalProtocolError: LocalProtocolError,
103 httpcore.RemoteProtocolError: RemoteProtocolError,
104}
105
106
107class ResponseStream(SyncByteStream):
108 def __init__(self, httpcore_stream: typing.Iterable[bytes]) -> None:
109 self._httpcore_stream = httpcore_stream
110
111 def __iter__(self) -> typing.Iterator[bytes]:
112 with map_httpcore_exceptions():
113 for part in self._httpcore_stream:
114 yield part
115
116 def close(self) -> None:
117 if hasattr(self._httpcore_stream, "close"):
118 self._httpcore_stream.close()
119
120
121class HTTPTransport(BaseTransport):
122 def __init__(
123 self,
124 verify: VerifyTypes = True,
125 cert: CertTypes | None = None,
126 http1: bool = True,
127 http2: bool = False,
128 limits: Limits = DEFAULT_LIMITS,
129 trust_env: bool = True,
130 proxy: ProxyTypes | None = None,
131 uds: str | None = None,
132 local_address: str | None = None,
133 retries: int = 0,
134 socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
135 ) -> None:
136 ssl_context = create_ssl_context(verify=verify, cert=cert, trust_env=trust_env)
137 proxy = Proxy(url=proxy) if isinstance(proxy, (str, URL)) else proxy
138
139 if proxy is None:
140 self._pool = httpcore.ConnectionPool(
141 ssl_context=ssl_context,
142 max_connections=limits.max_connections,
143 max_keepalive_connections=limits.max_keepalive_connections,
144 keepalive_expiry=limits.keepalive_expiry,
145 http1=http1,
146 http2=http2,
147 uds=uds,
148 local_address=local_address,
149 retries=retries,
150 socket_options=socket_options,
151 )
152 elif proxy.url.scheme in ("http", "https"):
153 self._pool = httpcore.HTTPProxy(
154 proxy_url=httpcore.URL(
155 scheme=proxy.url.raw_scheme,
156 host=proxy.url.raw_host,
157 port=proxy.url.port,
158 target=proxy.url.raw_path,
159 ),
160 proxy_auth=proxy.raw_auth,
161 proxy_headers=proxy.headers.raw,
162 ssl_context=ssl_context,
163 proxy_ssl_context=proxy.ssl_context,
164 max_connections=limits.max_connections,
165 max_keepalive_connections=limits.max_keepalive_connections,
166 keepalive_expiry=limits.keepalive_expiry,
167 http1=http1,
168 http2=http2,
169 socket_options=socket_options,
170 )
171 elif proxy.url.scheme == "socks5":
172 try:
173 import socksio # noqa
174 except ImportError: # pragma: no cover
175 raise ImportError(
176 "Using SOCKS proxy, but the 'socksio' package is not installed. "
177 "Make sure to install httpx using `pip install httpx[socks]`."
178 ) from None
179
180 self._pool = httpcore.SOCKSProxy(
181 proxy_url=httpcore.URL(
182 scheme=proxy.url.raw_scheme,
183 host=proxy.url.raw_host,
184 port=proxy.url.port,
185 target=proxy.url.raw_path,
186 ),
187 proxy_auth=proxy.raw_auth,
188 ssl_context=ssl_context,
189 max_connections=limits.max_connections,
190 max_keepalive_connections=limits.max_keepalive_connections,
191 keepalive_expiry=limits.keepalive_expiry,
192 http1=http1,
193 http2=http2,
194 )
195 else: # pragma: no cover
196 raise ValueError(
197 "Proxy protocol must be either 'http', 'https', or 'socks5',"
198 f" but got {proxy.url.scheme!r}."
199 )
200
201 def __enter__(self: T) -> T: # Use generics for subclass support.
202 self._pool.__enter__()
203 return self
204
205 def __exit__(
206 self,
207 exc_type: type[BaseException] | None = None,
208 exc_value: BaseException | None = None,
209 traceback: TracebackType | None = None,
210 ) -> None:
211 with map_httpcore_exceptions():
212 self._pool.__exit__(exc_type, exc_value, traceback)
213
214 def handle_request(
215 self,
216 request: Request,
217 ) -> Response:
218 assert isinstance(request.stream, SyncByteStream)
219
220 req = httpcore.Request(
221 method=request.method,
222 url=httpcore.URL(
223 scheme=request.url.raw_scheme,
224 host=request.url.raw_host,
225 port=request.url.port,
226 target=request.url.raw_path,
227 ),
228 headers=request.headers.raw,
229 content=request.stream,
230 extensions=request.extensions,
231 )
232 with map_httpcore_exceptions():
233 resp = self._pool.handle_request(req)
234
235 assert isinstance(resp.stream, typing.Iterable)
236
237 return Response(
238 status_code=resp.status,
239 headers=resp.headers,
240 stream=ResponseStream(resp.stream),
241 extensions=resp.extensions,
242 )
243
244 def close(self) -> None:
245 self._pool.close()
246
247
248class AsyncResponseStream(AsyncByteStream):
249 def __init__(self, httpcore_stream: typing.AsyncIterable[bytes]) -> None:
250 self._httpcore_stream = httpcore_stream
251
252 async def __aiter__(self) -> typing.AsyncIterator[bytes]:
253 with map_httpcore_exceptions():
254 async for part in self._httpcore_stream:
255 yield part
256
257 async def aclose(self) -> None:
258 if hasattr(self._httpcore_stream, "aclose"):
259 await self._httpcore_stream.aclose()
260
261
262class AsyncHTTPTransport(AsyncBaseTransport):
263 def __init__(
264 self,
265 verify: VerifyTypes = True,
266 cert: CertTypes | None = None,
267 http1: bool = True,
268 http2: bool = False,
269 limits: Limits = DEFAULT_LIMITS,
270 trust_env: bool = True,
271 proxy: ProxyTypes | None = None,
272 uds: str | None = None,
273 local_address: str | None = None,
274 retries: int = 0,
275 socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
276 ) -> None:
277 ssl_context = create_ssl_context(verify=verify, cert=cert, trust_env=trust_env)
278 proxy = Proxy(url=proxy) if isinstance(proxy, (str, URL)) else proxy
279
280 if proxy is None:
281 self._pool = httpcore.AsyncConnectionPool(
282 ssl_context=ssl_context,
283 max_connections=limits.max_connections,
284 max_keepalive_connections=limits.max_keepalive_connections,
285 keepalive_expiry=limits.keepalive_expiry,
286 http1=http1,
287 http2=http2,
288 uds=uds,
289 local_address=local_address,
290 retries=retries,
291 socket_options=socket_options,
292 )
293 elif proxy.url.scheme in ("http", "https"):
294 self._pool = httpcore.AsyncHTTPProxy(
295 proxy_url=httpcore.URL(
296 scheme=proxy.url.raw_scheme,
297 host=proxy.url.raw_host,
298 port=proxy.url.port,
299 target=proxy.url.raw_path,
300 ),
301 proxy_auth=proxy.raw_auth,
302 proxy_headers=proxy.headers.raw,
303 ssl_context=ssl_context,
304 max_connections=limits.max_connections,
305 max_keepalive_connections=limits.max_keepalive_connections,
306 keepalive_expiry=limits.keepalive_expiry,
307 http1=http1,
308 http2=http2,
309 socket_options=socket_options,
310 )
311 elif proxy.url.scheme == "socks5":
312 try:
313 import socksio # noqa
314 except ImportError: # pragma: no cover
315 raise ImportError(
316 "Using SOCKS proxy, but the 'socksio' package is not installed. "
317 "Make sure to install httpx using `pip install httpx[socks]`."
318 ) from None
319
320 self._pool = httpcore.AsyncSOCKSProxy(
321 proxy_url=httpcore.URL(
322 scheme=proxy.url.raw_scheme,
323 host=proxy.url.raw_host,
324 port=proxy.url.port,
325 target=proxy.url.raw_path,
326 ),
327 proxy_auth=proxy.raw_auth,
328 ssl_context=ssl_context,
329 max_connections=limits.max_connections,
330 max_keepalive_connections=limits.max_keepalive_connections,
331 keepalive_expiry=limits.keepalive_expiry,
332 http1=http1,
333 http2=http2,
334 )
335 else: # pragma: no cover
336 raise ValueError(
337 "Proxy protocol must be either 'http', 'https', or 'socks5',"
338 " but got {proxy.url.scheme!r}."
339 )
340
341 async def __aenter__(self: A) -> A: # Use generics for subclass support.
342 await self._pool.__aenter__()
343 return self
344
345 async def __aexit__(
346 self,
347 exc_type: type[BaseException] | None = None,
348 exc_value: BaseException | None = None,
349 traceback: TracebackType | None = None,
350 ) -> None:
351 with map_httpcore_exceptions():
352 await self._pool.__aexit__(exc_type, exc_value, traceback)
353
354 async def handle_async_request(
355 self,
356 request: Request,
357 ) -> Response:
358 assert isinstance(request.stream, AsyncByteStream)
359
360 req = httpcore.Request(
361 method=request.method,
362 url=httpcore.URL(
363 scheme=request.url.raw_scheme,
364 host=request.url.raw_host,
365 port=request.url.port,
366 target=request.url.raw_path,
367 ),
368 headers=request.headers.raw,
369 content=request.stream,
370 extensions=request.extensions,
371 )
372 with map_httpcore_exceptions():
373 resp = await self._pool.handle_async_request(req)
374
375 assert isinstance(resp.stream, typing.AsyncIterable)
376
377 return Response(
378 status_code=resp.status,
379 headers=resp.headers,
380 stream=AsyncResponseStream(resp.stream),
381 extensions=resp.extensions,
382 )
383
384 async def aclose(self) -> None:
385 await self._pool.aclose()