Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpcore/_sync/connection_pool.py: 21%
154 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 07:19 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 07:19 +0000
1import ssl
2import sys
3from types import TracebackType
4from typing import Iterable, Iterator, Iterable, List, Optional, Type
6from .._exceptions import ConnectionNotAvailable, UnsupportedProtocol
7from .._models import Origin, Request, Response
8from .._synchronization import Event, Lock
9from ..backends.sync import SyncBackend
10from ..backends.base import SOCKET_OPTION, NetworkBackend
11from .connection import HTTPConnection
12from .interfaces import ConnectionInterface, RequestInterface
15class RequestStatus:
16 def __init__(self, request: Request):
17 self.request = request
18 self.connection: Optional[ConnectionInterface] = None
19 self._connection_acquired = Event()
21 def set_connection(self, connection: ConnectionInterface) -> None:
22 assert self.connection is None
23 self.connection = connection
24 self._connection_acquired.set()
26 def unset_connection(self) -> None:
27 assert self.connection is not None
28 self.connection = None
29 self._connection_acquired = Event()
31 def wait_for_connection(
32 self, timeout: Optional[float] = None
33 ) -> ConnectionInterface:
34 if self.connection is None:
35 self._connection_acquired.wait(timeout=timeout)
36 assert self.connection is not None
37 return self.connection
40class ConnectionPool(RequestInterface):
41 """
42 A connection pool for making HTTP requests.
43 """
45 def __init__(
46 self,
47 ssl_context: Optional[ssl.SSLContext] = None,
48 max_connections: Optional[int] = 10,
49 max_keepalive_connections: Optional[int] = None,
50 keepalive_expiry: Optional[float] = None,
51 http1: bool = True,
52 http2: bool = False,
53 retries: int = 0,
54 local_address: Optional[str] = None,
55 uds: Optional[str] = None,
56 network_backend: Optional[NetworkBackend] = None,
57 socket_options: Optional[Iterable[SOCKET_OPTION]] = None,
58 ) -> None:
59 """
60 A connection pool for making HTTP requests.
62 Parameters:
63 ssl_context: An SSL context to use for verifying connections.
64 If not specified, the default `httpcore.default_ssl_context()`
65 will be used.
66 max_connections: The maximum number of concurrent HTTP connections that
67 the pool should allow. Any attempt to send a request on a pool that
68 would exceed this amount will block until a connection is available.
69 max_keepalive_connections: The maximum number of idle HTTP connections
70 that will be maintained in the pool.
71 keepalive_expiry: The duration in seconds that an idle HTTP connection
72 may be maintained for before being expired from the pool.
73 http1: A boolean indicating if HTTP/1.1 requests should be supported
74 by the connection pool. Defaults to True.
75 http2: A boolean indicating if HTTP/2 requests should be supported by
76 the connection pool. Defaults to False.
77 retries: The maximum number of retries when trying to establish a
78 connection.
79 local_address: Local address to connect from. Can also be used to connect
80 using a particular address family. Using `local_address="0.0.0.0"`
81 will connect using an `AF_INET` address (IPv4), while using
82 `local_address="::"` will connect using an `AF_INET6` address (IPv6).
83 uds: Path to a Unix Domain Socket to use instead of TCP sockets.
84 network_backend: A backend instance to use for handling network I/O.
85 socket_options: Socket options that have to be included
86 in the TCP socket when the connection was established.
87 """
88 self._ssl_context = ssl_context
90 self._max_connections = (
91 sys.maxsize if max_connections is None else max_connections
92 )
93 self._max_keepalive_connections = (
94 sys.maxsize
95 if max_keepalive_connections is None
96 else max_keepalive_connections
97 )
98 self._max_keepalive_connections = min(
99 self._max_connections, self._max_keepalive_connections
100 )
102 self._keepalive_expiry = keepalive_expiry
103 self._http1 = http1
104 self._http2 = http2
105 self._retries = retries
106 self._local_address = local_address
107 self._uds = uds
109 self._pool: List[ConnectionInterface] = []
110 self._requests: List[RequestStatus] = []
111 self._pool_lock = Lock()
112 self._network_backend = (
113 SyncBackend() if network_backend is None else network_backend
114 )
115 self._socket_options = socket_options
117 def create_connection(self, origin: Origin) -> ConnectionInterface:
118 return HTTPConnection(
119 origin=origin,
120 ssl_context=self._ssl_context,
121 keepalive_expiry=self._keepalive_expiry,
122 http1=self._http1,
123 http2=self._http2,
124 retries=self._retries,
125 local_address=self._local_address,
126 uds=self._uds,
127 network_backend=self._network_backend,
128 socket_options=self._socket_options,
129 )
131 @property
132 def connections(self) -> List[ConnectionInterface]:
133 """
134 Return a list of the connections currently in the pool.
136 For example:
138 ```python
139 >>> pool.connections
140 [
141 <HTTPConnection ['https://example.com:443', HTTP/1.1, ACTIVE, Request Count: 6]>,
142 <HTTPConnection ['https://example.com:443', HTTP/1.1, IDLE, Request Count: 9]> ,
143 <HTTPConnection ['http://example.com:80', HTTP/1.1, IDLE, Request Count: 1]>,
144 ]
145 ```
146 """
147 return list(self._pool)
149 def _attempt_to_acquire_connection(self, status: RequestStatus) -> bool:
150 """
151 Attempt to provide a connection that can handle the given origin.
152 """
153 origin = status.request.url.origin
155 # If there are queued requests in front of us, then don't acquire a
156 # connection. We handle requests strictly in order.
157 waiting = [s for s in self._requests if s.connection is None]
158 if waiting and waiting[0] is not status:
159 return False
161 # Reuse an existing connection if one is currently available.
162 for idx, connection in enumerate(self._pool):
163 if connection.can_handle_request(origin) and connection.is_available():
164 self._pool.pop(idx)
165 self._pool.insert(0, connection)
166 status.set_connection(connection)
167 return True
169 # If the pool is currently full, attempt to close one idle connection.
170 if len(self._pool) >= self._max_connections:
171 for idx, connection in reversed(list(enumerate(self._pool))):
172 if connection.is_idle():
173 connection.close()
174 self._pool.pop(idx)
175 break
177 # If the pool is still full, then we cannot acquire a connection.
178 if len(self._pool) >= self._max_connections:
179 return False
181 # Otherwise create a new connection.
182 connection = self.create_connection(origin)
183 self._pool.insert(0, connection)
184 status.set_connection(connection)
185 return True
187 def _close_expired_connections(self) -> None:
188 """
189 Clean up the connection pool by closing off any connections that have expired.
190 """
191 # Close any connections that have expired their keep-alive time.
192 for idx, connection in reversed(list(enumerate(self._pool))):
193 if connection.has_expired():
194 connection.close()
195 self._pool.pop(idx)
197 # If the pool size exceeds the maximum number of allowed keep-alive connections,
198 # then close off idle connections as required.
199 pool_size = len(self._pool)
200 for idx, connection in reversed(list(enumerate(self._pool))):
201 if connection.is_idle() and pool_size > self._max_keepalive_connections:
202 connection.close()
203 self._pool.pop(idx)
204 pool_size -= 1
206 def handle_request(self, request: Request) -> Response:
207 """
208 Send an HTTP request, and return an HTTP response.
210 This is the core implementation that is called into by `.request()` or `.stream()`.
211 """
212 scheme = request.url.scheme.decode()
213 if scheme == "":
214 raise UnsupportedProtocol(
215 "Request URL is missing an 'http://' or 'https://' protocol."
216 )
217 if scheme not in ("http", "https", "ws", "wss"):
218 raise UnsupportedProtocol(
219 f"Request URL has an unsupported protocol '{scheme}://'."
220 )
222 status = RequestStatus(request)
224 with self._pool_lock:
225 self._requests.append(status)
226 self._close_expired_connections()
227 self._attempt_to_acquire_connection(status)
229 while True:
230 timeouts = request.extensions.get("timeout", {})
231 timeout = timeouts.get("pool", None)
232 try:
233 connection = status.wait_for_connection(timeout=timeout)
234 except BaseException as exc:
235 # If we timeout here, or if the task is cancelled, then make
236 # sure to remove the request from the queue before bubbling
237 # up the exception.
238 with self._pool_lock:
239 # Ensure only remove when task exists.
240 if status in self._requests:
241 self._requests.remove(status)
242 raise exc
244 try:
245 response = connection.handle_request(request)
246 except ConnectionNotAvailable:
247 # The ConnectionNotAvailable exception is a special case, that
248 # indicates we need to retry the request on a new connection.
249 #
250 # The most common case where this can occur is when multiple
251 # requests are queued waiting for a single connection, which
252 # might end up as an HTTP/2 connection, but which actually ends
253 # up as HTTP/1.1.
254 with self._pool_lock:
255 # Maintain our position in the request queue, but reset the
256 # status so that the request becomes queued again.
257 status.unset_connection()
258 self._attempt_to_acquire_connection(status)
259 except BaseException as exc:
260 self.response_closed(status)
261 raise exc
262 else:
263 break
265 # When we return the response, we wrap the stream in a special class
266 # that handles notifying the connection pool once the response
267 # has been released.
268 assert isinstance(response.stream, Iterable)
269 return Response(
270 status=response.status,
271 headers=response.headers,
272 content=ConnectionPoolByteStream(response.stream, self, status),
273 extensions=response.extensions,
274 )
276 def response_closed(self, status: RequestStatus) -> None:
277 """
278 This method acts as a callback once the request/response cycle is complete.
280 It is called into from the `ConnectionPoolByteStream.close()` method.
281 """
282 assert status.connection is not None
283 connection = status.connection
285 with self._pool_lock:
286 # Update the state of the connection pool.
287 if status in self._requests:
288 self._requests.remove(status)
290 if connection.is_closed() and connection in self._pool:
291 self._pool.remove(connection)
293 # Since we've had a response closed, it's possible we'll now be able
294 # to service one or more requests that are currently pending.
295 for status in self._requests:
296 if status.connection is None:
297 acquired = self._attempt_to_acquire_connection(status)
298 # If we could not acquire a connection for a queued request
299 # then we don't need to check anymore requests that are
300 # queued later behind it.
301 if not acquired:
302 break
304 # Housekeeping.
305 self._close_expired_connections()
307 def close(self) -> None:
308 """
309 Close any connections in the pool.
310 """
311 with self._pool_lock:
312 for connection in self._pool:
313 connection.close()
314 self._pool = []
315 self._requests = []
317 def __enter__(self) -> "ConnectionPool":
318 return self
320 def __exit__(
321 self,
322 exc_type: Optional[Type[BaseException]] = None,
323 exc_value: Optional[BaseException] = None,
324 traceback: Optional[TracebackType] = None,
325 ) -> None:
326 self.close()
329class ConnectionPoolByteStream:
330 """
331 A wrapper around the response byte stream, that additionally handles
332 notifying the connection pool when the response has been closed.
333 """
335 def __init__(
336 self,
337 stream: Iterable[bytes],
338 pool: ConnectionPool,
339 status: RequestStatus,
340 ) -> None:
341 self._stream = stream
342 self._pool = pool
343 self._status = status
345 def __iter__(self) -> Iterator[bytes]:
346 for part in self._stream:
347 yield part
349 def close(self) -> None:
350 try:
351 if hasattr(self._stream, "close"):
352 self._stream.close()
353 finally:
354 self._pool.response_closed(self._status)