1"""
2Custom transports, with nicely configured defaults.
3
4The following additional keyword arguments are currently supported by httpcore...
5
6* uds: str
7* local_address: str
8* retries: int
9
10Example usages...
11
12# Disable HTTP/2 on a single specific domain.
13mounts = {
14 "all://": httpx.HTTPTransport(http2=True),
15 "all://*example.org": httpx.HTTPTransport()
16}
17
18# Using advanced httpcore configuration, with connection retries.
19transport = httpx.HTTPTransport(retries=1)
20client = httpx.Client(transport=transport)
21
22# Using advanced httpcore configuration, with unix domain sockets.
23transport = httpx.HTTPTransport(uds="socket.uds")
24client = httpx.Client(transport=transport)
25"""
26
27from __future__ import annotations
28
29import contextlib
30import typing
31from types import TracebackType
32
33import httpcore
34
35from .._config import DEFAULT_LIMITS, Limits, Proxy, create_ssl_context
36from .._exceptions import (
37 ConnectError,
38 ConnectTimeout,
39 LocalProtocolError,
40 NetworkError,
41 PoolTimeout,
42 ProtocolError,
43 ProxyError,
44 ReadError,
45 ReadTimeout,
46 RemoteProtocolError,
47 TimeoutException,
48 UnsupportedProtocol,
49 WriteError,
50 WriteTimeout,
51)
52from .._models import Request, Response
53from .._types import AsyncByteStream, CertTypes, ProxyTypes, SyncByteStream, VerifyTypes
54from .._urls import URL
55from .base import AsyncBaseTransport, BaseTransport
56
57T = typing.TypeVar("T", bound="HTTPTransport")
58A = typing.TypeVar("A", bound="AsyncHTTPTransport")
59
60SOCKET_OPTION = typing.Union[
61 typing.Tuple[int, int, int],
62 typing.Tuple[int, int, typing.Union[bytes, bytearray]],
63 typing.Tuple[int, int, None, int],
64]
65
66__all__ = ["AsyncHTTPTransport", "HTTPTransport"]
67
68
69@contextlib.contextmanager
70def map_httpcore_exceptions() -> typing.Iterator[None]:
71 try:
72 yield
73 except Exception as exc:
74 mapped_exc = None
75
76 for from_exc, to_exc in HTTPCORE_EXC_MAP.items():
77 if not isinstance(exc, from_exc):
78 continue
79 # We want to map to the most specific exception we can find.
80 # Eg if `exc` is an `httpcore.ReadTimeout`, we want to map to
81 # `httpx.ReadTimeout`, not just `httpx.TimeoutException`.
82 if mapped_exc is None or issubclass(to_exc, mapped_exc):
83 mapped_exc = to_exc
84
85 if mapped_exc is None: # pragma: no cover
86 raise
87
88 message = str(exc)
89 raise mapped_exc(message) from exc
90
91
92HTTPCORE_EXC_MAP = {
93 httpcore.TimeoutException: TimeoutException,
94 httpcore.ConnectTimeout: ConnectTimeout,
95 httpcore.ReadTimeout: ReadTimeout,
96 httpcore.WriteTimeout: WriteTimeout,
97 httpcore.PoolTimeout: PoolTimeout,
98 httpcore.NetworkError: NetworkError,
99 httpcore.ConnectError: ConnectError,
100 httpcore.ReadError: ReadError,
101 httpcore.WriteError: WriteError,
102 httpcore.ProxyError: ProxyError,
103 httpcore.UnsupportedProtocol: UnsupportedProtocol,
104 httpcore.ProtocolError: ProtocolError,
105 httpcore.LocalProtocolError: LocalProtocolError,
106 httpcore.RemoteProtocolError: RemoteProtocolError,
107}
108
109
110class ResponseStream(SyncByteStream):
111 def __init__(self, httpcore_stream: typing.Iterable[bytes]) -> None:
112 self._httpcore_stream = httpcore_stream
113
114 def __iter__(self) -> typing.Iterator[bytes]:
115 with map_httpcore_exceptions():
116 for part in self._httpcore_stream:
117 yield part
118
119 def close(self) -> None:
120 if hasattr(self._httpcore_stream, "close"):
121 self._httpcore_stream.close()
122
123
124class HTTPTransport(BaseTransport):
125 def __init__(
126 self,
127 verify: VerifyTypes = True,
128 cert: CertTypes | None = None,
129 http1: bool = True,
130 http2: bool = False,
131 limits: Limits = DEFAULT_LIMITS,
132 trust_env: bool = True,
133 proxy: ProxyTypes | None = None,
134 uds: str | None = None,
135 local_address: str | None = None,
136 retries: int = 0,
137 socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
138 ) -> None:
139 ssl_context = create_ssl_context(verify=verify, cert=cert, trust_env=trust_env)
140 proxy = Proxy(url=proxy) if isinstance(proxy, (str, URL)) else proxy
141
142 if proxy is None:
143 self._pool = httpcore.ConnectionPool(
144 ssl_context=ssl_context,
145 max_connections=limits.max_connections,
146 max_keepalive_connections=limits.max_keepalive_connections,
147 keepalive_expiry=limits.keepalive_expiry,
148 http1=http1,
149 http2=http2,
150 uds=uds,
151 local_address=local_address,
152 retries=retries,
153 socket_options=socket_options,
154 )
155 elif proxy.url.scheme in ("http", "https"):
156 self._pool = httpcore.HTTPProxy(
157 proxy_url=httpcore.URL(
158 scheme=proxy.url.raw_scheme,
159 host=proxy.url.raw_host,
160 port=proxy.url.port,
161 target=proxy.url.raw_path,
162 ),
163 proxy_auth=proxy.raw_auth,
164 proxy_headers=proxy.headers.raw,
165 ssl_context=ssl_context,
166 proxy_ssl_context=proxy.ssl_context,
167 max_connections=limits.max_connections,
168 max_keepalive_connections=limits.max_keepalive_connections,
169 keepalive_expiry=limits.keepalive_expiry,
170 http1=http1,
171 http2=http2,
172 socket_options=socket_options,
173 )
174 elif proxy.url.scheme == "socks5":
175 try:
176 import socksio # noqa
177 except ImportError: # pragma: no cover
178 raise ImportError(
179 "Using SOCKS proxy, but the 'socksio' package is not installed. "
180 "Make sure to install httpx using `pip install httpx[socks]`."
181 ) from None
182
183 self._pool = httpcore.SOCKSProxy(
184 proxy_url=httpcore.URL(
185 scheme=proxy.url.raw_scheme,
186 host=proxy.url.raw_host,
187 port=proxy.url.port,
188 target=proxy.url.raw_path,
189 ),
190 proxy_auth=proxy.raw_auth,
191 ssl_context=ssl_context,
192 max_connections=limits.max_connections,
193 max_keepalive_connections=limits.max_keepalive_connections,
194 keepalive_expiry=limits.keepalive_expiry,
195 http1=http1,
196 http2=http2,
197 )
198 else: # pragma: no cover
199 raise ValueError(
200 "Proxy protocol must be either 'http', 'https', or 'socks5',"
201 f" but got {proxy.url.scheme!r}."
202 )
203
204 def __enter__(self: T) -> T: # Use generics for subclass support.
205 self._pool.__enter__()
206 return self
207
208 def __exit__(
209 self,
210 exc_type: type[BaseException] | None = None,
211 exc_value: BaseException | None = None,
212 traceback: TracebackType | None = None,
213 ) -> None:
214 with map_httpcore_exceptions():
215 self._pool.__exit__(exc_type, exc_value, traceback)
216
217 def handle_request(
218 self,
219 request: Request,
220 ) -> Response:
221 assert isinstance(request.stream, SyncByteStream)
222
223 req = httpcore.Request(
224 method=request.method,
225 url=httpcore.URL(
226 scheme=request.url.raw_scheme,
227 host=request.url.raw_host,
228 port=request.url.port,
229 target=request.url.raw_path,
230 ),
231 headers=request.headers.raw,
232 content=request.stream,
233 extensions=request.extensions,
234 )
235 with map_httpcore_exceptions():
236 resp = self._pool.handle_request(req)
237
238 assert isinstance(resp.stream, typing.Iterable)
239
240 return Response(
241 status_code=resp.status,
242 headers=resp.headers,
243 stream=ResponseStream(resp.stream),
244 extensions=resp.extensions,
245 )
246
247 def close(self) -> None:
248 self._pool.close()
249
250
251class AsyncResponseStream(AsyncByteStream):
252 def __init__(self, httpcore_stream: typing.AsyncIterable[bytes]) -> None:
253 self._httpcore_stream = httpcore_stream
254
255 async def __aiter__(self) -> typing.AsyncIterator[bytes]:
256 with map_httpcore_exceptions():
257 async for part in self._httpcore_stream:
258 yield part
259
260 async def aclose(self) -> None:
261 if hasattr(self._httpcore_stream, "aclose"):
262 await self._httpcore_stream.aclose()
263
264
265class AsyncHTTPTransport(AsyncBaseTransport):
266 def __init__(
267 self,
268 verify: VerifyTypes = True,
269 cert: CertTypes | None = None,
270 http1: bool = True,
271 http2: bool = False,
272 limits: Limits = DEFAULT_LIMITS,
273 trust_env: bool = True,
274 proxy: ProxyTypes | None = None,
275 uds: str | None = None,
276 local_address: str | None = None,
277 retries: int = 0,
278 socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
279 ) -> None:
280 ssl_context = create_ssl_context(verify=verify, cert=cert, trust_env=trust_env)
281 proxy = Proxy(url=proxy) if isinstance(proxy, (str, URL)) else proxy
282
283 if proxy is None:
284 self._pool = httpcore.AsyncConnectionPool(
285 ssl_context=ssl_context,
286 max_connections=limits.max_connections,
287 max_keepalive_connections=limits.max_keepalive_connections,
288 keepalive_expiry=limits.keepalive_expiry,
289 http1=http1,
290 http2=http2,
291 uds=uds,
292 local_address=local_address,
293 retries=retries,
294 socket_options=socket_options,
295 )
296 elif proxy.url.scheme in ("http", "https"):
297 self._pool = httpcore.AsyncHTTPProxy(
298 proxy_url=httpcore.URL(
299 scheme=proxy.url.raw_scheme,
300 host=proxy.url.raw_host,
301 port=proxy.url.port,
302 target=proxy.url.raw_path,
303 ),
304 proxy_auth=proxy.raw_auth,
305 proxy_headers=proxy.headers.raw,
306 proxy_ssl_context=proxy.ssl_context,
307 ssl_context=ssl_context,
308 max_connections=limits.max_connections,
309 max_keepalive_connections=limits.max_keepalive_connections,
310 keepalive_expiry=limits.keepalive_expiry,
311 http1=http1,
312 http2=http2,
313 socket_options=socket_options,
314 )
315 elif proxy.url.scheme == "socks5":
316 try:
317 import socksio # noqa
318 except ImportError: # pragma: no cover
319 raise ImportError(
320 "Using SOCKS proxy, but the 'socksio' package is not installed. "
321 "Make sure to install httpx using `pip install httpx[socks]`."
322 ) from None
323
324 self._pool = httpcore.AsyncSOCKSProxy(
325 proxy_url=httpcore.URL(
326 scheme=proxy.url.raw_scheme,
327 host=proxy.url.raw_host,
328 port=proxy.url.port,
329 target=proxy.url.raw_path,
330 ),
331 proxy_auth=proxy.raw_auth,
332 ssl_context=ssl_context,
333 max_connections=limits.max_connections,
334 max_keepalive_connections=limits.max_keepalive_connections,
335 keepalive_expiry=limits.keepalive_expiry,
336 http1=http1,
337 http2=http2,
338 )
339 else: # pragma: no cover
340 raise ValueError(
341 "Proxy protocol must be either 'http', 'https', or 'socks5',"
342 " but got {proxy.url.scheme!r}."
343 )
344
345 async def __aenter__(self: A) -> A: # Use generics for subclass support.
346 await self._pool.__aenter__()
347 return self
348
349 async def __aexit__(
350 self,
351 exc_type: type[BaseException] | None = None,
352 exc_value: BaseException | None = None,
353 traceback: TracebackType | None = None,
354 ) -> None:
355 with map_httpcore_exceptions():
356 await self._pool.__aexit__(exc_type, exc_value, traceback)
357
358 async def handle_async_request(
359 self,
360 request: Request,
361 ) -> Response:
362 assert isinstance(request.stream, AsyncByteStream)
363
364 req = httpcore.Request(
365 method=request.method,
366 url=httpcore.URL(
367 scheme=request.url.raw_scheme,
368 host=request.url.raw_host,
369 port=request.url.port,
370 target=request.url.raw_path,
371 ),
372 headers=request.headers.raw,
373 content=request.stream,
374 extensions=request.extensions,
375 )
376 with map_httpcore_exceptions():
377 resp = await self._pool.handle_async_request(req)
378
379 assert isinstance(resp.stream, typing.AsyncIterable)
380
381 return Response(
382 status_code=resp.status,
383 headers=resp.headers,
384 stream=AsyncResponseStream(resp.stream),
385 extensions=resp.extensions,
386 )
387
388 async def aclose(self) -> None:
389 await self._pool.aclose()