1from __future__ import annotations
2
3import ssl
4import sys
5import types
6import typing
7
8from .._backends.auto import AutoBackend
9from .._backends.base import SOCKET_OPTION, AsyncNetworkBackend
10from .._exceptions import ConnectionNotAvailable, UnsupportedProtocol
11from .._models import Origin, Proxy, Request, Response
12from .._synchronization import AsyncEvent, AsyncShieldCancellation, AsyncThreadLock
13from .connection import AsyncHTTPConnection
14from .interfaces import AsyncConnectionInterface, AsyncRequestInterface
15
16
17class AsyncPoolRequest:
18 def __init__(self, request: Request) -> None:
19 self.request = request
20 self.connection: AsyncConnectionInterface | None = None
21 self._connection_acquired = AsyncEvent()
22
23 def assign_to_connection(self, connection: AsyncConnectionInterface | None) -> None:
24 self.connection = connection
25 self._connection_acquired.set()
26
27 def clear_connection(self) -> None:
28 self.connection = None
29 self._connection_acquired = AsyncEvent()
30
31 async def wait_for_connection(
32 self, timeout: float | None = None
33 ) -> AsyncConnectionInterface:
34 if self.connection is None:
35 await self._connection_acquired.wait(timeout=timeout)
36 assert self.connection is not None
37 return self.connection
38
39 def is_queued(self) -> bool:
40 return self.connection is None
41
42
43class AsyncConnectionPool(AsyncRequestInterface):
44 """
45 A connection pool for making HTTP requests.
46 """
47
48 def __init__(
49 self,
50 ssl_context: ssl.SSLContext | None = None,
51 proxy: Proxy | None = None,
52 max_connections: int | None = 10,
53 max_keepalive_connections: int | None = None,
54 keepalive_expiry: float | None = None,
55 http1: bool = True,
56 http2: bool = False,
57 retries: int = 0,
58 local_address: str | None = None,
59 uds: str | None = None,
60 network_backend: AsyncNetworkBackend | None = None,
61 socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
62 ) -> None:
63 """
64 A connection pool for making HTTP requests.
65
66 Parameters:
67 ssl_context: An SSL context to use for verifying connections.
68 If not specified, the default `httpcore.default_ssl_context()`
69 will be used.
70 max_connections: The maximum number of concurrent HTTP connections that
71 the pool should allow. Any attempt to send a request on a pool that
72 would exceed this amount will block until a connection is available.
73 max_keepalive_connections: The maximum number of idle HTTP connections
74 that will be maintained in the pool.
75 keepalive_expiry: The duration in seconds that an idle HTTP connection
76 may be maintained for before being expired from the pool.
77 http1: A boolean indicating if HTTP/1.1 requests should be supported
78 by the connection pool. Defaults to True.
79 http2: A boolean indicating if HTTP/2 requests should be supported by
80 the connection pool. Defaults to False.
81 retries: The maximum number of retries when trying to establish a
82 connection.
83 local_address: Local address to connect from. Can also be used to connect
84 using a particular address family. Using `local_address="0.0.0.0"`
85 will connect using an `AF_INET` address (IPv4), while using
86 `local_address="::"` will connect using an `AF_INET6` address (IPv6).
87 uds: Path to a Unix Domain Socket to use instead of TCP sockets.
88 network_backend: A backend instance to use for handling network I/O.
89 socket_options: Socket options that have to be included
90 in the TCP socket when the connection was established.
91 """
92 self._ssl_context = ssl_context
93 self._proxy = proxy
94 self._max_connections = (
95 sys.maxsize if max_connections is None else max_connections
96 )
97 self._max_keepalive_connections = (
98 sys.maxsize
99 if max_keepalive_connections is None
100 else max_keepalive_connections
101 )
102 self._max_keepalive_connections = min(
103 self._max_connections, self._max_keepalive_connections
104 )
105
106 self._keepalive_expiry = keepalive_expiry
107 self._http1 = http1
108 self._http2 = http2
109 self._retries = retries
110 self._local_address = local_address
111 self._uds = uds
112
113 self._network_backend = (
114 AutoBackend() if network_backend is None else network_backend
115 )
116 self._socket_options = socket_options
117
118 # The mutable state on a connection pool is the queue of incoming requests,
119 # and the set of connections that are servicing those requests.
120 self._connections: list[AsyncConnectionInterface] = []
121 self._requests: list[AsyncPoolRequest] = []
122
123 # We only mutate the state of the connection pool within an 'optional_thread_lock'
124 # context. This holds a threading lock unless we're running in async mode,
125 # in which case it is a no-op.
126 self._optional_thread_lock = AsyncThreadLock()
127
128 def create_connection(self, origin: Origin) -> AsyncConnectionInterface:
129 if self._proxy is not None:
130 if self._proxy.url.scheme in (b"socks5", b"socks5h"):
131 from .socks_proxy import AsyncSocks5Connection
132
133 return AsyncSocks5Connection(
134 proxy_origin=self._proxy.url.origin,
135 proxy_auth=self._proxy.auth,
136 remote_origin=origin,
137 ssl_context=self._ssl_context,
138 keepalive_expiry=self._keepalive_expiry,
139 http1=self._http1,
140 http2=self._http2,
141 network_backend=self._network_backend,
142 )
143 elif origin.scheme == b"http":
144 from .http_proxy import AsyncForwardHTTPConnection
145
146 return AsyncForwardHTTPConnection(
147 proxy_origin=self._proxy.url.origin,
148 proxy_headers=self._proxy.headers,
149 proxy_ssl_context=self._proxy.ssl_context,
150 remote_origin=origin,
151 keepalive_expiry=self._keepalive_expiry,
152 network_backend=self._network_backend,
153 )
154 from .http_proxy import AsyncTunnelHTTPConnection
155
156 return AsyncTunnelHTTPConnection(
157 proxy_origin=self._proxy.url.origin,
158 proxy_headers=self._proxy.headers,
159 proxy_ssl_context=self._proxy.ssl_context,
160 remote_origin=origin,
161 ssl_context=self._ssl_context,
162 keepalive_expiry=self._keepalive_expiry,
163 http1=self._http1,
164 http2=self._http2,
165 network_backend=self._network_backend,
166 )
167
168 return AsyncHTTPConnection(
169 origin=origin,
170 ssl_context=self._ssl_context,
171 keepalive_expiry=self._keepalive_expiry,
172 http1=self._http1,
173 http2=self._http2,
174 retries=self._retries,
175 local_address=self._local_address,
176 uds=self._uds,
177 network_backend=self._network_backend,
178 socket_options=self._socket_options,
179 )
180
181 @property
182 def connections(self) -> list[AsyncConnectionInterface]:
183 """
184 Return a list of the connections currently in the pool.
185
186 For example:
187
188 ```python
189 >>> pool.connections
190 [
191 <AsyncHTTPConnection ['https://example.com:443', HTTP/1.1, ACTIVE, Request Count: 6]>,
192 <AsyncHTTPConnection ['https://example.com:443', HTTP/1.1, IDLE, Request Count: 9]> ,
193 <AsyncHTTPConnection ['http://example.com:80', HTTP/1.1, IDLE, Request Count: 1]>,
194 ]
195 ```
196 """
197 return list(self._connections)
198
199 async def handle_async_request(self, request: Request) -> Response:
200 """
201 Send an HTTP request, and return an HTTP response.
202
203 This is the core implementation that is called into by `.request()` or `.stream()`.
204 """
205 scheme = request.url.scheme.decode()
206 if scheme == "":
207 raise UnsupportedProtocol(
208 "Request URL is missing an 'http://' or 'https://' protocol."
209 )
210 if scheme not in ("http", "https", "ws", "wss"):
211 raise UnsupportedProtocol(
212 f"Request URL has an unsupported protocol '{scheme}://'."
213 )
214
215 timeouts = request.extensions.get("timeout", {})
216 timeout = timeouts.get("pool", None)
217
218 with self._optional_thread_lock:
219 # Add the incoming request to our request queue.
220 pool_request = AsyncPoolRequest(request)
221 self._requests.append(pool_request)
222
223 try:
224 while True:
225 with self._optional_thread_lock:
226 # Assign incoming requests to available connections,
227 # closing or creating new connections as required.
228 closing = self._assign_requests_to_connections()
229 await self._close_connections(closing)
230
231 # Wait until this request has an assigned connection.
232 connection = await pool_request.wait_for_connection(timeout=timeout)
233
234 try:
235 # Send the request on the assigned connection.
236 response = await connection.handle_async_request(
237 pool_request.request
238 )
239 except ConnectionNotAvailable:
240 # In some cases a connection may initially be available to
241 # handle a request, but then become unavailable.
242 #
243 # In this case we clear the connection and try again.
244 pool_request.clear_connection()
245 else:
246 break # pragma: nocover
247
248 except BaseException as exc:
249 with self._optional_thread_lock:
250 # For any exception or cancellation we remove the request from
251 # the queue, and then re-assign requests to connections.
252 self._requests.remove(pool_request)
253 closing = self._assign_requests_to_connections()
254
255 await self._close_connections(closing)
256 raise exc from None
257
258 # Return the response. Note that in this case we still have to manage
259 # the point at which the response is closed.
260 assert isinstance(response.stream, typing.AsyncIterable)
261 return Response(
262 status=response.status,
263 headers=response.headers,
264 content=PoolByteStream(
265 stream=response.stream, pool_request=pool_request, pool=self
266 ),
267 extensions=response.extensions,
268 )
269
270 def _assign_requests_to_connections(self) -> list[AsyncConnectionInterface]:
271 """
272 Manage the state of the connection pool, assigning incoming
273 requests to connections as available.
274
275 Called whenever a new request is added or removed from the pool.
276
277 Any closing connections are returned, allowing the I/O for closing
278 those connections to be handled seperately.
279 """
280 closing_connections = []
281
282 # First we handle cleaning up any connections that are closed,
283 # have expired their keep-alive, or surplus idle connections.
284 for connection in list(self._connections):
285 if connection.is_closed():
286 # log: "removing closed connection"
287 self._connections.remove(connection)
288 elif connection.has_expired():
289 # log: "closing expired connection"
290 self._connections.remove(connection)
291 closing_connections.append(connection)
292 elif (
293 connection.is_idle()
294 and len([connection.is_idle() for connection in self._connections])
295 > self._max_keepalive_connections
296 ):
297 # log: "closing idle connection"
298 self._connections.remove(connection)
299 closing_connections.append(connection)
300
301 # Assign queued requests to connections.
302 queued_requests = [request for request in self._requests if request.is_queued()]
303 for pool_request in queued_requests:
304 origin = pool_request.request.url.origin
305 available_connections = [
306 connection
307 for connection in self._connections
308 if connection.can_handle_request(origin) and connection.is_available()
309 ]
310 idle_connections = [
311 connection for connection in self._connections if connection.is_idle()
312 ]
313
314 # There are three cases for how we may be able to handle the request:
315 #
316 # 1. There is an existing connection that can handle the request.
317 # 2. We can create a new connection to handle the request.
318 # 3. We can close an idle connection and then create a new connection
319 # to handle the request.
320 if available_connections:
321 # log: "reusing existing connection"
322 connection = available_connections[0]
323 pool_request.assign_to_connection(connection)
324 elif len(self._connections) < self._max_connections:
325 # log: "creating new connection"
326 connection = self.create_connection(origin)
327 self._connections.append(connection)
328 pool_request.assign_to_connection(connection)
329 elif idle_connections:
330 # log: "closing idle connection"
331 connection = idle_connections[0]
332 self._connections.remove(connection)
333 closing_connections.append(connection)
334 # log: "creating new connection"
335 connection = self.create_connection(origin)
336 self._connections.append(connection)
337 pool_request.assign_to_connection(connection)
338
339 return closing_connections
340
341 async def _close_connections(self, closing: list[AsyncConnectionInterface]) -> None:
342 # Close connections which have been removed from the pool.
343 with AsyncShieldCancellation():
344 for connection in closing:
345 await connection.aclose()
346
347 async def aclose(self) -> None:
348 # Explicitly close the connection pool.
349 # Clears all existing requests and connections.
350 with self._optional_thread_lock:
351 closing_connections = list(self._connections)
352 self._connections = []
353 await self._close_connections(closing_connections)
354
355 async def __aenter__(self) -> AsyncConnectionPool:
356 return self
357
358 async def __aexit__(
359 self,
360 exc_type: type[BaseException] | None = None,
361 exc_value: BaseException | None = None,
362 traceback: types.TracebackType | None = None,
363 ) -> None:
364 await self.aclose()
365
366 def __repr__(self) -> str:
367 class_name = self.__class__.__name__
368 with self._optional_thread_lock:
369 request_is_queued = [request.is_queued() for request in self._requests]
370 connection_is_idle = [
371 connection.is_idle() for connection in self._connections
372 ]
373
374 num_active_requests = request_is_queued.count(False)
375 num_queued_requests = request_is_queued.count(True)
376 num_active_connections = connection_is_idle.count(False)
377 num_idle_connections = connection_is_idle.count(True)
378
379 requests_info = (
380 f"Requests: {num_active_requests} active, {num_queued_requests} queued"
381 )
382 connection_info = (
383 f"Connections: {num_active_connections} active, {num_idle_connections} idle"
384 )
385
386 return f"<{class_name} [{requests_info} | {connection_info}]>"
387
388
389class PoolByteStream:
390 def __init__(
391 self,
392 stream: typing.AsyncIterable[bytes],
393 pool_request: AsyncPoolRequest,
394 pool: AsyncConnectionPool,
395 ) -> None:
396 self._stream = stream
397 self._pool_request = pool_request
398 self._pool = pool
399 self._closed = False
400
401 async def __aiter__(self) -> typing.AsyncIterator[bytes]:
402 try:
403 async for part in self._stream:
404 yield part
405 except BaseException as exc:
406 await self.aclose()
407 raise exc from None
408
409 async def aclose(self) -> None:
410 if not self._closed:
411 self._closed = True
412 with AsyncShieldCancellation():
413 if hasattr(self._stream, "aclose"):
414 await self._stream.aclose()
415
416 with self._pool._optional_thread_lock:
417 self._pool._requests.remove(self._pool_request)
418 closing = self._pool._assign_requests_to_connections()
419
420 await self._pool._close_connections(closing)