1"""
2Custom transports, with nicely configured defaults.
3
4The following additional keyword arguments are currently supported by httpcore...
5
6* uds: str
7* local_address: str
8* retries: int
9
10Example usages...
11
12# Disable HTTP/2 on a single specific domain.
13mounts = {
14 "all://": httpx.HTTPTransport(http2=True),
15 "all://*example.org": httpx.HTTPTransport()
16}
17
18# Using advanced httpcore configuration, with connection retries.
19transport = httpx.HTTPTransport(retries=1)
20client = httpx.Client(transport=transport)
21
22# Using advanced httpcore configuration, with unix domain sockets.
23transport = httpx.HTTPTransport(uds="socket.uds")
24client = httpx.Client(transport=transport)
25"""
26
27from __future__ import annotations
28
29import contextlib
30import typing
31from types import TracebackType
32
33if typing.TYPE_CHECKING:
34 import ssl # pragma: no cover
35
36 import httpx # pragma: no cover
37
38from .._config import DEFAULT_LIMITS, Limits, Proxy, create_ssl_context
39from .._exceptions import (
40 ConnectError,
41 ConnectTimeout,
42 LocalProtocolError,
43 NetworkError,
44 PoolTimeout,
45 ProtocolError,
46 ProxyError,
47 ReadError,
48 ReadTimeout,
49 RemoteProtocolError,
50 TimeoutException,
51 UnsupportedProtocol,
52 WriteError,
53 WriteTimeout,
54)
55from .._models import Request, Response
56from .._types import AsyncByteStream, CertTypes, ProxyTypes, SyncByteStream
57from .._urls import URL
58from .base import AsyncBaseTransport, BaseTransport
59
60T = typing.TypeVar("T", bound="HTTPTransport")
61A = typing.TypeVar("A", bound="AsyncHTTPTransport")
62
63SOCKET_OPTION = typing.Union[
64 typing.Tuple[int, int, int],
65 typing.Tuple[int, int, typing.Union[bytes, bytearray]],
66 typing.Tuple[int, int, None, int],
67]
68
69__all__ = ["AsyncHTTPTransport", "HTTPTransport"]
70
71HTTPCORE_EXC_MAP: dict[type[Exception], type[httpx.HTTPError]] = {}
72
73
74def _load_httpcore_exceptions() -> dict[type[Exception], type[httpx.HTTPError]]:
75 import httpcore
76
77 return {
78 httpcore.TimeoutException: TimeoutException,
79 httpcore.ConnectTimeout: ConnectTimeout,
80 httpcore.ReadTimeout: ReadTimeout,
81 httpcore.WriteTimeout: WriteTimeout,
82 httpcore.PoolTimeout: PoolTimeout,
83 httpcore.NetworkError: NetworkError,
84 httpcore.ConnectError: ConnectError,
85 httpcore.ReadError: ReadError,
86 httpcore.WriteError: WriteError,
87 httpcore.ProxyError: ProxyError,
88 httpcore.UnsupportedProtocol: UnsupportedProtocol,
89 httpcore.ProtocolError: ProtocolError,
90 httpcore.LocalProtocolError: LocalProtocolError,
91 httpcore.RemoteProtocolError: RemoteProtocolError,
92 }
93
94
95@contextlib.contextmanager
96def map_httpcore_exceptions() -> typing.Iterator[None]:
97 global HTTPCORE_EXC_MAP
98 if len(HTTPCORE_EXC_MAP) == 0:
99 HTTPCORE_EXC_MAP = _load_httpcore_exceptions()
100 try:
101 yield
102 except Exception as exc:
103 mapped_exc = None
104
105 for from_exc, to_exc in HTTPCORE_EXC_MAP.items():
106 if not isinstance(exc, from_exc):
107 continue
108 # We want to map to the most specific exception we can find.
109 # Eg if `exc` is an `httpcore.ReadTimeout`, we want to map to
110 # `httpx.ReadTimeout`, not just `httpx.TimeoutException`.
111 if mapped_exc is None or issubclass(to_exc, mapped_exc):
112 mapped_exc = to_exc
113
114 if mapped_exc is None: # pragma: no cover
115 raise
116
117 message = str(exc)
118 raise mapped_exc(message) from exc
119
120
121class ResponseStream(SyncByteStream):
122 def __init__(self, httpcore_stream: typing.Iterable[bytes]) -> None:
123 self._httpcore_stream = httpcore_stream
124
125 def __iter__(self) -> typing.Iterator[bytes]:
126 with map_httpcore_exceptions():
127 for part in self._httpcore_stream:
128 yield part
129
130 def close(self) -> None:
131 if hasattr(self._httpcore_stream, "close"):
132 self._httpcore_stream.close()
133
134
135class HTTPTransport(BaseTransport):
136 def __init__(
137 self,
138 verify: ssl.SSLContext | str | bool = True,
139 cert: CertTypes | None = None,
140 trust_env: bool = True,
141 http1: bool = True,
142 http2: bool = False,
143 limits: Limits = DEFAULT_LIMITS,
144 proxy: ProxyTypes | None = None,
145 uds: str | None = None,
146 local_address: str | None = None,
147 retries: int = 0,
148 socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
149 ) -> None:
150 import httpcore
151
152 proxy = Proxy(url=proxy) if isinstance(proxy, (str, URL)) else proxy
153 ssl_context = create_ssl_context(verify=verify, cert=cert, trust_env=trust_env)
154
155 if proxy is None:
156 self._pool = httpcore.ConnectionPool(
157 ssl_context=ssl_context,
158 max_connections=limits.max_connections,
159 max_keepalive_connections=limits.max_keepalive_connections,
160 keepalive_expiry=limits.keepalive_expiry,
161 http1=http1,
162 http2=http2,
163 uds=uds,
164 local_address=local_address,
165 retries=retries,
166 socket_options=socket_options,
167 )
168 elif proxy.url.scheme in ("http", "https"):
169 self._pool = httpcore.HTTPProxy(
170 proxy_url=httpcore.URL(
171 scheme=proxy.url.raw_scheme,
172 host=proxy.url.raw_host,
173 port=proxy.url.port,
174 target=proxy.url.raw_path,
175 ),
176 proxy_auth=proxy.raw_auth,
177 proxy_headers=proxy.headers.raw,
178 ssl_context=ssl_context,
179 proxy_ssl_context=proxy.ssl_context,
180 max_connections=limits.max_connections,
181 max_keepalive_connections=limits.max_keepalive_connections,
182 keepalive_expiry=limits.keepalive_expiry,
183 http1=http1,
184 http2=http2,
185 socket_options=socket_options,
186 )
187 elif proxy.url.scheme in ("socks5", "socks5h"):
188 try:
189 import socksio # noqa
190 except ImportError: # pragma: no cover
191 raise ImportError(
192 "Using SOCKS proxy, but the 'socksio' package is not installed. "
193 "Make sure to install httpx using `pip install httpx[socks]`."
194 ) from None
195
196 self._pool = httpcore.SOCKSProxy(
197 proxy_url=httpcore.URL(
198 scheme=proxy.url.raw_scheme,
199 host=proxy.url.raw_host,
200 port=proxy.url.port,
201 target=proxy.url.raw_path,
202 ),
203 proxy_auth=proxy.raw_auth,
204 ssl_context=ssl_context,
205 max_connections=limits.max_connections,
206 max_keepalive_connections=limits.max_keepalive_connections,
207 keepalive_expiry=limits.keepalive_expiry,
208 http1=http1,
209 http2=http2,
210 )
211 else: # pragma: no cover
212 raise ValueError(
213 "Proxy protocol must be either 'http', 'https', 'socks5', or 'socks5h',"
214 f" but got {proxy.url.scheme!r}."
215 )
216
217 def __enter__(self: T) -> T: # Use generics for subclass support.
218 self._pool.__enter__()
219 return self
220
221 def __exit__(
222 self,
223 exc_type: type[BaseException] | None = None,
224 exc_value: BaseException | None = None,
225 traceback: TracebackType | None = None,
226 ) -> None:
227 with map_httpcore_exceptions():
228 self._pool.__exit__(exc_type, exc_value, traceback)
229
230 def handle_request(
231 self,
232 request: Request,
233 ) -> Response:
234 assert isinstance(request.stream, SyncByteStream)
235 import httpcore
236
237 req = httpcore.Request(
238 method=request.method,
239 url=httpcore.URL(
240 scheme=request.url.raw_scheme,
241 host=request.url.raw_host,
242 port=request.url.port,
243 target=request.url.raw_path,
244 ),
245 headers=request.headers.raw,
246 content=request.stream,
247 extensions=request.extensions,
248 )
249 with map_httpcore_exceptions():
250 resp = self._pool.handle_request(req)
251
252 assert isinstance(resp.stream, typing.Iterable)
253
254 return Response(
255 status_code=resp.status,
256 headers=resp.headers,
257 stream=ResponseStream(resp.stream),
258 extensions=resp.extensions,
259 )
260
261 def close(self) -> None:
262 self._pool.close()
263
264
265class AsyncResponseStream(AsyncByteStream):
266 def __init__(self, httpcore_stream: typing.AsyncIterable[bytes]) -> None:
267 self._httpcore_stream = httpcore_stream
268
269 async def __aiter__(self) -> typing.AsyncIterator[bytes]:
270 with map_httpcore_exceptions():
271 async for part in self._httpcore_stream:
272 yield part
273
274 async def aclose(self) -> None:
275 if hasattr(self._httpcore_stream, "aclose"):
276 await self._httpcore_stream.aclose()
277
278
279class AsyncHTTPTransport(AsyncBaseTransport):
280 def __init__(
281 self,
282 verify: ssl.SSLContext | str | bool = True,
283 cert: CertTypes | None = None,
284 trust_env: bool = True,
285 http1: bool = True,
286 http2: bool = False,
287 limits: Limits = DEFAULT_LIMITS,
288 proxy: ProxyTypes | None = None,
289 uds: str | None = None,
290 local_address: str | None = None,
291 retries: int = 0,
292 socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
293 ) -> None:
294 import httpcore
295
296 proxy = Proxy(url=proxy) if isinstance(proxy, (str, URL)) else proxy
297 ssl_context = create_ssl_context(verify=verify, cert=cert, trust_env=trust_env)
298
299 if proxy is None:
300 self._pool = httpcore.AsyncConnectionPool(
301 ssl_context=ssl_context,
302 max_connections=limits.max_connections,
303 max_keepalive_connections=limits.max_keepalive_connections,
304 keepalive_expiry=limits.keepalive_expiry,
305 http1=http1,
306 http2=http2,
307 uds=uds,
308 local_address=local_address,
309 retries=retries,
310 socket_options=socket_options,
311 )
312 elif proxy.url.scheme in ("http", "https"):
313 self._pool = httpcore.AsyncHTTPProxy(
314 proxy_url=httpcore.URL(
315 scheme=proxy.url.raw_scheme,
316 host=proxy.url.raw_host,
317 port=proxy.url.port,
318 target=proxy.url.raw_path,
319 ),
320 proxy_auth=proxy.raw_auth,
321 proxy_headers=proxy.headers.raw,
322 proxy_ssl_context=proxy.ssl_context,
323 ssl_context=ssl_context,
324 max_connections=limits.max_connections,
325 max_keepalive_connections=limits.max_keepalive_connections,
326 keepalive_expiry=limits.keepalive_expiry,
327 http1=http1,
328 http2=http2,
329 socket_options=socket_options,
330 )
331 elif proxy.url.scheme in ("socks5", "socks5h"):
332 try:
333 import socksio # noqa
334 except ImportError: # pragma: no cover
335 raise ImportError(
336 "Using SOCKS proxy, but the 'socksio' package is not installed. "
337 "Make sure to install httpx using `pip install httpx[socks]`."
338 ) from None
339
340 self._pool = httpcore.AsyncSOCKSProxy(
341 proxy_url=httpcore.URL(
342 scheme=proxy.url.raw_scheme,
343 host=proxy.url.raw_host,
344 port=proxy.url.port,
345 target=proxy.url.raw_path,
346 ),
347 proxy_auth=proxy.raw_auth,
348 ssl_context=ssl_context,
349 max_connections=limits.max_connections,
350 max_keepalive_connections=limits.max_keepalive_connections,
351 keepalive_expiry=limits.keepalive_expiry,
352 http1=http1,
353 http2=http2,
354 )
355 else: # pragma: no cover
356 raise ValueError(
357 "Proxy protocol must be either 'http', 'https', 'socks5', or 'socks5h',"
358 " but got {proxy.url.scheme!r}."
359 )
360
361 async def __aenter__(self: A) -> A: # Use generics for subclass support.
362 await self._pool.__aenter__()
363 return self
364
365 async def __aexit__(
366 self,
367 exc_type: type[BaseException] | None = None,
368 exc_value: BaseException | None = None,
369 traceback: TracebackType | None = None,
370 ) -> None:
371 with map_httpcore_exceptions():
372 await self._pool.__aexit__(exc_type, exc_value, traceback)
373
374 async def handle_async_request(
375 self,
376 request: Request,
377 ) -> Response:
378 assert isinstance(request.stream, AsyncByteStream)
379 import httpcore
380
381 req = httpcore.Request(
382 method=request.method,
383 url=httpcore.URL(
384 scheme=request.url.raw_scheme,
385 host=request.url.raw_host,
386 port=request.url.port,
387 target=request.url.raw_path,
388 ),
389 headers=request.headers.raw,
390 content=request.stream,
391 extensions=request.extensions,
392 )
393 with map_httpcore_exceptions():
394 resp = await self._pool.handle_async_request(req)
395
396 assert isinstance(resp.stream, typing.AsyncIterable)
397
398 return Response(
399 status_code=resp.status,
400 headers=resp.headers,
401 stream=AsyncResponseStream(resp.stream),
402 extensions=resp.extensions,
403 )
404
405 async def aclose(self) -> None:
406 await self._pool.aclose()