Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpcore/_async/connection_pool.py: 21%
151 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
1import ssl
2import sys
3from types import TracebackType
4from typing import AsyncIterable, AsyncIterator, List, Optional, Type
6from .._exceptions import ConnectionNotAvailable, UnsupportedProtocol
7from .._models import Origin, Request, Response
8from .._synchronization import AsyncEvent, AsyncLock
9from ..backends.auto import AutoBackend
10from ..backends.base import AsyncNetworkBackend
11from .connection import AsyncHTTPConnection
12from .interfaces import AsyncConnectionInterface, AsyncRequestInterface
15class RequestStatus:
16 def __init__(self, request: Request):
17 self.request = request
18 self.connection: Optional[AsyncConnectionInterface] = None
19 self._connection_acquired = AsyncEvent()
21 def set_connection(self, connection: AsyncConnectionInterface) -> None:
22 assert self.connection is None
23 self.connection = connection
24 self._connection_acquired.set()
26 def unset_connection(self) -> None:
27 assert self.connection is not None
28 self.connection = None
29 self._connection_acquired = AsyncEvent()
31 async def wait_for_connection(
32 self, timeout: Optional[float] = None
33 ) -> AsyncConnectionInterface:
34 await self._connection_acquired.wait(timeout=timeout)
35 assert self.connection is not None
36 return self.connection
39class AsyncConnectionPool(AsyncRequestInterface):
40 """
41 A connection pool for making HTTP requests.
42 """
44 def __init__(
45 self,
46 ssl_context: Optional[ssl.SSLContext] = None,
47 max_connections: Optional[int] = 10,
48 max_keepalive_connections: Optional[int] = None,
49 keepalive_expiry: Optional[float] = None,
50 http1: bool = True,
51 http2: bool = False,
52 retries: int = 0,
53 local_address: Optional[str] = None,
54 uds: Optional[str] = None,
55 network_backend: Optional[AsyncNetworkBackend] = None,
56 ) -> None:
57 """
58 A connection pool for making HTTP requests.
60 Parameters:
61 ssl_context: An SSL context to use for verifying connections.
62 If not specified, the default `httpcore.default_ssl_context()`
63 will be used.
64 max_connections: The maximum number of concurrent HTTP connections that
65 the pool should allow. Any attempt to send a request on a pool that
66 would exceed this amount will block until a connection is available.
67 max_keepalive_connections: The maximum number of idle HTTP connections
68 that will be maintained in the pool.
69 keepalive_expiry: The duration in seconds that an idle HTTP connection
70 may be maintained for before being expired from the pool.
71 http1: A boolean indicating if HTTP/1.1 requests should be supported
72 by the connection pool. Defaults to True.
73 http2: A boolean indicating if HTTP/2 requests should be supported by
74 the connection pool. Defaults to False.
75 retries: The maximum number of retries when trying to establish a
76 connection.
77 local_address: Local address to connect from. Can also be used to connect
78 using a particular address family. Using `local_address="0.0.0.0"`
79 will connect using an `AF_INET` address (IPv4), while using
80 `local_address="::"` will connect using an `AF_INET6` address (IPv6).
81 uds: Path to a Unix Domain Socket to use instead of TCP sockets.
82 network_backend: A backend instance to use for handling network I/O.
83 """
84 self._ssl_context = ssl_context
86 self._max_connections = (
87 sys.maxsize if max_connections is None else max_connections
88 )
89 self._max_keepalive_connections = (
90 sys.maxsize
91 if max_keepalive_connections is None
92 else max_keepalive_connections
93 )
94 self._max_keepalive_connections = min(
95 self._max_connections, self._max_keepalive_connections
96 )
98 self._keepalive_expiry = keepalive_expiry
99 self._http1 = http1
100 self._http2 = http2
101 self._retries = retries
102 self._local_address = local_address
103 self._uds = uds
105 self._pool: List[AsyncConnectionInterface] = []
106 self._requests: List[RequestStatus] = []
107 self._pool_lock = AsyncLock()
108 self._network_backend = (
109 AutoBackend() if network_backend is None else network_backend
110 )
112 def create_connection(self, origin: Origin) -> AsyncConnectionInterface:
113 return AsyncHTTPConnection(
114 origin=origin,
115 ssl_context=self._ssl_context,
116 keepalive_expiry=self._keepalive_expiry,
117 http1=self._http1,
118 http2=self._http2,
119 retries=self._retries,
120 local_address=self._local_address,
121 uds=self._uds,
122 network_backend=self._network_backend,
123 )
125 @property
126 def connections(self) -> List[AsyncConnectionInterface]:
127 """
128 Return a list of the connections currently in the pool.
130 For example:
132 ```python
133 >>> pool.connections
134 [
135 <AsyncHTTPConnection ['https://example.com:443', HTTP/1.1, ACTIVE, Request Count: 6]>,
136 <AsyncHTTPConnection ['https://example.com:443', HTTP/1.1, IDLE, Request Count: 9]> ,
137 <AsyncHTTPConnection ['http://example.com:80', HTTP/1.1, IDLE, Request Count: 1]>,
138 ]
139 ```
140 """
141 return list(self._pool)
143 async def _attempt_to_acquire_connection(self, status: RequestStatus) -> bool:
144 """
145 Attempt to provide a connection that can handle the given origin.
146 """
147 origin = status.request.url.origin
149 # If there are queued requests in front of us, then don't acquire a
150 # connection. We handle requests strictly in order.
151 waiting = [s for s in self._requests if s.connection is None]
152 if waiting and waiting[0] is not status:
153 return False
155 # Reuse an existing connection if one is currently available.
156 for idx, connection in enumerate(self._pool):
157 if connection.can_handle_request(origin) and connection.is_available():
158 self._pool.pop(idx)
159 self._pool.insert(0, connection)
160 status.set_connection(connection)
161 return True
163 # If the pool is currently full, attempt to close one idle connection.
164 if len(self._pool) >= self._max_connections:
165 for idx, connection in reversed(list(enumerate(self._pool))):
166 if connection.is_idle():
167 await connection.aclose()
168 self._pool.pop(idx)
169 break
171 # If the pool is still full, then we cannot acquire a connection.
172 if len(self._pool) >= self._max_connections:
173 return False
175 # Otherwise create a new connection.
176 connection = self.create_connection(origin)
177 self._pool.insert(0, connection)
178 status.set_connection(connection)
179 return True
181 async def _close_expired_connections(self) -> None:
182 """
183 Clean up the connection pool by closing off any connections that have expired.
184 """
185 # Close any connections that have expired their keep-alive time.
186 for idx, connection in reversed(list(enumerate(self._pool))):
187 if connection.has_expired():
188 await connection.aclose()
189 self._pool.pop(idx)
191 # If the pool size exceeds the maximum number of allowed keep-alive connections,
192 # then close off idle connections as required.
193 pool_size = len(self._pool)
194 for idx, connection in reversed(list(enumerate(self._pool))):
195 if connection.is_idle() and pool_size > self._max_keepalive_connections:
196 await connection.aclose()
197 self._pool.pop(idx)
198 pool_size -= 1
200 async def handle_async_request(self, request: Request) -> Response:
201 """
202 Send an HTTP request, and return an HTTP response.
204 This is the core implementation that is called into by `.request()` or `.stream()`.
205 """
206 scheme = request.url.scheme.decode()
207 if scheme == "":
208 raise UnsupportedProtocol(
209 "Request URL is missing an 'http://' or 'https://' protocol."
210 )
211 if scheme not in ("http", "https", "ws", "wss"):
212 raise UnsupportedProtocol(
213 f"Request URL has an unsupported protocol '{scheme}://'."
214 )
216 status = RequestStatus(request)
218 async with self._pool_lock:
219 self._requests.append(status)
220 await self._close_expired_connections()
221 await self._attempt_to_acquire_connection(status)
223 while True:
224 timeouts = request.extensions.get("timeout", {})
225 timeout = timeouts.get("pool", None)
226 try:
227 connection = await status.wait_for_connection(timeout=timeout)
228 except BaseException as exc:
229 # If we timeout here, or if the task is cancelled, then make
230 # sure to remove the request from the queue before bubbling
231 # up the exception.
232 async with self._pool_lock:
233 self._requests.remove(status)
234 raise exc
236 try:
237 response = await connection.handle_async_request(request)
238 except ConnectionNotAvailable:
239 # The ConnectionNotAvailable exception is a special case, that
240 # indicates we need to retry the request on a new connection.
241 #
242 # The most common case where this can occur is when multiple
243 # requests are queued waiting for a single connection, which
244 # might end up as an HTTP/2 connection, but which actually ends
245 # up as HTTP/1.1.
246 async with self._pool_lock:
247 # Maintain our position in the request queue, but reset the
248 # status so that the request becomes queued again.
249 status.unset_connection()
250 await self._attempt_to_acquire_connection(status)
251 except BaseException as exc:
252 await self.response_closed(status)
253 raise exc
254 else:
255 break
257 # When we return the response, we wrap the stream in a special class
258 # that handles notifying the connection pool once the response
259 # has been released.
260 assert isinstance(response.stream, AsyncIterable)
261 return Response(
262 status=response.status,
263 headers=response.headers,
264 content=ConnectionPoolByteStream(response.stream, self, status),
265 extensions=response.extensions,
266 )
268 async def response_closed(self, status: RequestStatus) -> None:
269 """
270 This method acts as a callback once the request/response cycle is complete.
272 It is called into from the `ConnectionPoolByteStream.aclose()` method.
273 """
274 assert status.connection is not None
275 connection = status.connection
277 async with self._pool_lock:
278 # Update the state of the connection pool.
279 if status in self._requests:
280 self._requests.remove(status)
282 if connection.is_closed() and connection in self._pool:
283 self._pool.remove(connection)
285 # Since we've had a response closed, it's possible we'll now be able
286 # to service one or more requests that are currently pending.
287 for status in self._requests:
288 if status.connection is None:
289 acquired = await self._attempt_to_acquire_connection(status)
290 # If we could not acquire a connection for a queued request
291 # then we don't need to check anymore requests that are
292 # queued later behind it.
293 if not acquired:
294 break
296 # Housekeeping.
297 await self._close_expired_connections()
299 async def aclose(self) -> None:
300 """
301 Close any connections in the pool.
302 """
303 async with self._pool_lock:
304 for connection in self._pool:
305 await connection.aclose()
306 self._pool = []
307 self._requests = []
309 async def __aenter__(self) -> "AsyncConnectionPool":
310 return self
312 async def __aexit__(
313 self,
314 exc_type: Optional[Type[BaseException]] = None,
315 exc_value: Optional[BaseException] = None,
316 traceback: Optional[TracebackType] = None,
317 ) -> None:
318 await self.aclose()
321class ConnectionPoolByteStream:
322 """
323 A wrapper around the response byte stream, that additionally handles
324 notifying the connection pool when the response has been closed.
325 """
327 def __init__(
328 self,
329 stream: AsyncIterable[bytes],
330 pool: AsyncConnectionPool,
331 status: RequestStatus,
332 ) -> None:
333 self._stream = stream
334 self._pool = pool
335 self._status = status
337 async def __aiter__(self) -> AsyncIterator[bytes]:
338 async for part in self._stream:
339 yield part
341 async def aclose(self) -> None:
342 try:
343 if hasattr(self._stream, "aclose"):
344 await self._stream.aclose()
345 finally:
346 await self._pool.response_closed(self._status)