1import ssl
2import sys
3from types import TracebackType
4from typing import Iterable, Iterator, Iterable, List, Optional, Type
5
6from .._backends.sync import SyncBackend
7from .._backends.base import SOCKET_OPTION, NetworkBackend
8from .._exceptions import ConnectionNotAvailable, UnsupportedProtocol
9from .._models import Origin, Request, Response
10from .._synchronization import Event, ShieldCancellation, ThreadLock
11from .connection import HTTPConnection
12from .interfaces import ConnectionInterface, RequestInterface
13
14
15class PoolRequest:
16 def __init__(self, request: Request) -> None:
17 self.request = request
18 self.connection: Optional[ConnectionInterface] = None
19 self._connection_acquired = Event()
20
21 def assign_to_connection(
22 self, connection: Optional[ConnectionInterface]
23 ) -> None:
24 self.connection = connection
25 self._connection_acquired.set()
26
27 def clear_connection(self) -> None:
28 self.connection = None
29 self._connection_acquired = Event()
30
31 def wait_for_connection(
32 self, timeout: Optional[float] = None
33 ) -> ConnectionInterface:
34 if self.connection is None:
35 self._connection_acquired.wait(timeout=timeout)
36 assert self.connection is not None
37 return self.connection
38
39 def is_queued(self) -> bool:
40 return self.connection is None
41
42
43class ConnectionPool(RequestInterface):
44 """
45 A connection pool for making HTTP requests.
46 """
47
48 def __init__(
49 self,
50 ssl_context: Optional[ssl.SSLContext] = None,
51 max_connections: Optional[int] = 10,
52 max_keepalive_connections: Optional[int] = None,
53 keepalive_expiry: Optional[float] = None,
54 http1: bool = True,
55 http2: bool = False,
56 retries: int = 0,
57 local_address: Optional[str] = None,
58 uds: Optional[str] = None,
59 network_backend: Optional[NetworkBackend] = None,
60 socket_options: Optional[Iterable[SOCKET_OPTION]] = None,
61 ) -> None:
62 """
63 A connection pool for making HTTP requests.
64
65 Parameters:
66 ssl_context: An SSL context to use for verifying connections.
67 If not specified, the default `httpcore.default_ssl_context()`
68 will be used.
69 max_connections: The maximum number of concurrent HTTP connections that
70 the pool should allow. Any attempt to send a request on a pool that
71 would exceed this amount will block until a connection is available.
72 max_keepalive_connections: The maximum number of idle HTTP connections
73 that will be maintained in the pool.
74 keepalive_expiry: The duration in seconds that an idle HTTP connection
75 may be maintained for before being expired from the pool.
76 http1: A boolean indicating if HTTP/1.1 requests should be supported
77 by the connection pool. Defaults to True.
78 http2: A boolean indicating if HTTP/2 requests should be supported by
79 the connection pool. Defaults to False.
80 retries: The maximum number of retries when trying to establish a
81 connection.
82 local_address: Local address to connect from. Can also be used to connect
83 using a particular address family. Using `local_address="0.0.0.0"`
84 will connect using an `AF_INET` address (IPv4), while using
85 `local_address="::"` will connect using an `AF_INET6` address (IPv6).
86 uds: Path to a Unix Domain Socket to use instead of TCP sockets.
87 network_backend: A backend instance to use for handling network I/O.
88 socket_options: Socket options that have to be included
89 in the TCP socket when the connection was established.
90 """
91 self._ssl_context = ssl_context
92
93 self._max_connections = (
94 sys.maxsize if max_connections is None else max_connections
95 )
96 self._max_keepalive_connections = (
97 sys.maxsize
98 if max_keepalive_connections is None
99 else max_keepalive_connections
100 )
101 self._max_keepalive_connections = min(
102 self._max_connections, self._max_keepalive_connections
103 )
104
105 self._keepalive_expiry = keepalive_expiry
106 self._http1 = http1
107 self._http2 = http2
108 self._retries = retries
109 self._local_address = local_address
110 self._uds = uds
111
112 self._network_backend = (
113 SyncBackend() if network_backend is None else network_backend
114 )
115 self._socket_options = socket_options
116
117 # The mutable state on a connection pool is the queue of incoming requests,
118 # and the set of connections that are servicing those requests.
119 self._connections: List[ConnectionInterface] = []
120 self._requests: List[PoolRequest] = []
121
122 # We only mutate the state of the connection pool within an 'optional_thread_lock'
123 # context. This holds a threading lock unless we're running in async mode,
124 # in which case it is a no-op.
125 self._optional_thread_lock = ThreadLock()
126
127 def create_connection(self, origin: Origin) -> ConnectionInterface:
128 return HTTPConnection(
129 origin=origin,
130 ssl_context=self._ssl_context,
131 keepalive_expiry=self._keepalive_expiry,
132 http1=self._http1,
133 http2=self._http2,
134 retries=self._retries,
135 local_address=self._local_address,
136 uds=self._uds,
137 network_backend=self._network_backend,
138 socket_options=self._socket_options,
139 )
140
141 @property
142 def connections(self) -> List[ConnectionInterface]:
143 """
144 Return a list of the connections currently in the pool.
145
146 For example:
147
148 ```python
149 >>> pool.connections
150 [
151 <HTTPConnection ['https://example.com:443', HTTP/1.1, ACTIVE, Request Count: 6]>,
152 <HTTPConnection ['https://example.com:443', HTTP/1.1, IDLE, Request Count: 9]> ,
153 <HTTPConnection ['http://example.com:80', HTTP/1.1, IDLE, Request Count: 1]>,
154 ]
155 ```
156 """
157 return list(self._connections)
158
159 def handle_request(self, request: Request) -> Response:
160 """
161 Send an HTTP request, and return an HTTP response.
162
163 This is the core implementation that is called into by `.request()` or `.stream()`.
164 """
165 scheme = request.url.scheme.decode()
166 if scheme == "":
167 raise UnsupportedProtocol(
168 "Request URL is missing an 'http://' or 'https://' protocol."
169 )
170 if scheme not in ("http", "https", "ws", "wss"):
171 raise UnsupportedProtocol(
172 f"Request URL has an unsupported protocol '{scheme}://'."
173 )
174
175 timeouts = request.extensions.get("timeout", {})
176 timeout = timeouts.get("pool", None)
177
178 with self._optional_thread_lock:
179 # Add the incoming request to our request queue.
180 pool_request = PoolRequest(request)
181 self._requests.append(pool_request)
182
183 try:
184 while True:
185 with self._optional_thread_lock:
186 # Assign incoming requests to available connections,
187 # closing or creating new connections as required.
188 closing = self._assign_requests_to_connections()
189 self._close_connections(closing)
190
191 # Wait until this request has an assigned connection.
192 connection = pool_request.wait_for_connection(timeout=timeout)
193
194 try:
195 # Send the request on the assigned connection.
196 response = connection.handle_request(
197 pool_request.request
198 )
199 except ConnectionNotAvailable:
200 # In some cases a connection may initially be available to
201 # handle a request, but then become unavailable.
202 #
203 # In this case we clear the connection and try again.
204 pool_request.clear_connection()
205 else:
206 break # pragma: nocover
207
208 except BaseException as exc:
209 with self._optional_thread_lock:
210 # For any exception or cancellation we remove the request from
211 # the queue, and then re-assign requests to connections.
212 self._requests.remove(pool_request)
213 closing = self._assign_requests_to_connections()
214
215 self._close_connections(closing)
216 raise exc from None
217
218 # Return the response. Note that in this case we still have to manage
219 # the point at which the response is closed.
220 assert isinstance(response.stream, Iterable)
221 return Response(
222 status=response.status,
223 headers=response.headers,
224 content=PoolByteStream(
225 stream=response.stream, pool_request=pool_request, pool=self
226 ),
227 extensions=response.extensions,
228 )
229
230 def _assign_requests_to_connections(self) -> List[ConnectionInterface]:
231 """
232 Manage the state of the connection pool, assigning incoming
233 requests to connections as available.
234
235 Called whenever a new request is added or removed from the pool.
236
237 Any closing connections are returned, allowing the I/O for closing
238 those connections to be handled seperately.
239 """
240 closing_connections = []
241
242 # First we handle cleaning up any connections that are closed,
243 # have expired their keep-alive, or surplus idle connections.
244 for connection in list(self._connections):
245 if connection.is_closed():
246 # log: "removing closed connection"
247 self._connections.remove(connection)
248 elif connection.has_expired():
249 # log: "closing expired connection"
250 self._connections.remove(connection)
251 closing_connections.append(connection)
252 elif (
253 connection.is_idle()
254 and len([connection.is_idle() for connection in self._connections])
255 > self._max_keepalive_connections
256 ):
257 # log: "closing idle connection"
258 self._connections.remove(connection)
259 closing_connections.append(connection)
260
261 # Assign queued requests to connections.
262 queued_requests = [request for request in self._requests if request.is_queued()]
263 for pool_request in queued_requests:
264 origin = pool_request.request.url.origin
265 avilable_connections = [
266 connection
267 for connection in self._connections
268 if connection.can_handle_request(origin) and connection.is_available()
269 ]
270 idle_connections = [
271 connection for connection in self._connections if connection.is_idle()
272 ]
273
274 # There are three cases for how we may be able to handle the request:
275 #
276 # 1. There is an existing connection that can handle the request.
277 # 2. We can create a new connection to handle the request.
278 # 3. We can close an idle connection and then create a new connection
279 # to handle the request.
280 if avilable_connections:
281 # log: "reusing existing connection"
282 connection = avilable_connections[0]
283 pool_request.assign_to_connection(connection)
284 elif len(self._connections) < self._max_connections:
285 # log: "creating new connection"
286 connection = self.create_connection(origin)
287 self._connections.append(connection)
288 pool_request.assign_to_connection(connection)
289 elif idle_connections:
290 # log: "closing idle connection"
291 connection = idle_connections[0]
292 self._connections.remove(connection)
293 closing_connections.append(connection)
294 # log: "creating new connection"
295 connection = self.create_connection(origin)
296 self._connections.append(connection)
297 pool_request.assign_to_connection(connection)
298
299 return closing_connections
300
301 def _close_connections(self, closing: List[ConnectionInterface]) -> None:
302 # Close connections which have been removed from the pool.
303 with ShieldCancellation():
304 for connection in closing:
305 connection.close()
306
307 def close(self) -> None:
308 # Explicitly close the connection pool.
309 # Clears all existing requests and connections.
310 with self._optional_thread_lock:
311 closing_connections = list(self._connections)
312 self._connections = []
313 self._close_connections(closing_connections)
314
315 def __enter__(self) -> "ConnectionPool":
316 return self
317
318 def __exit__(
319 self,
320 exc_type: Optional[Type[BaseException]] = None,
321 exc_value: Optional[BaseException] = None,
322 traceback: Optional[TracebackType] = None,
323 ) -> None:
324 self.close()
325
326 def __repr__(self) -> str:
327 class_name = self.__class__.__name__
328 with self._optional_thread_lock:
329 request_is_queued = [request.is_queued() for request in self._requests]
330 connection_is_idle = [
331 connection.is_idle() for connection in self._connections
332 ]
333
334 num_active_requests = request_is_queued.count(False)
335 num_queued_requests = request_is_queued.count(True)
336 num_active_connections = connection_is_idle.count(False)
337 num_idle_connections = connection_is_idle.count(True)
338
339 requests_info = (
340 f"Requests: {num_active_requests} active, {num_queued_requests} queued"
341 )
342 connection_info = (
343 f"Connections: {num_active_connections} active, {num_idle_connections} idle"
344 )
345
346 return f"<{class_name} [{requests_info} | {connection_info}]>"
347
348
349class PoolByteStream:
350 def __init__(
351 self,
352 stream: Iterable[bytes],
353 pool_request: PoolRequest,
354 pool: ConnectionPool,
355 ) -> None:
356 self._stream = stream
357 self._pool_request = pool_request
358 self._pool = pool
359 self._closed = False
360
361 def __iter__(self) -> Iterator[bytes]:
362 try:
363 for part in self._stream:
364 yield part
365 except BaseException as exc:
366 self.close()
367 raise exc from None
368
369 def close(self) -> None:
370 if not self._closed:
371 self._closed = True
372 with ShieldCancellation():
373 if hasattr(self._stream, "close"):
374 self._stream.close()
375
376 with self._pool._optional_thread_lock:
377 self._pool._requests.remove(self._pool_request)
378 closing = self._pool._assign_requests_to_connections()
379
380 self._pool._close_connections(closing)