Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/httpcore/_sync/connection_pool.py: 20%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

170 statements  

1from __future__ import annotations 

2 

3import ssl 

4import sys 

5import types 

6import typing 

7 

8from .._backends.sync import SyncBackend 

9from .._backends.base import SOCKET_OPTION, NetworkBackend 

10from .._exceptions import ConnectionNotAvailable, UnsupportedProtocol 

11from .._models import Origin, Proxy, Request, Response 

12from .._synchronization import Event, ShieldCancellation, ThreadLock 

13from .connection import HTTPConnection 

14from .interfaces import ConnectionInterface, RequestInterface 

15 

16 

17class PoolRequest: 

18 def __init__(self, request: Request) -> None: 

19 self.request = request 

20 self.connection: ConnectionInterface | None = None 

21 self._connection_acquired = Event() 

22 

23 def assign_to_connection(self, connection: ConnectionInterface | None) -> None: 

24 self.connection = connection 

25 self._connection_acquired.set() 

26 

27 def clear_connection(self) -> None: 

28 self.connection = None 

29 self._connection_acquired = Event() 

30 

31 def wait_for_connection( 

32 self, timeout: float | None = None 

33 ) -> ConnectionInterface: 

34 if self.connection is None: 

35 self._connection_acquired.wait(timeout=timeout) 

36 assert self.connection is not None 

37 return self.connection 

38 

39 def is_queued(self) -> bool: 

40 return self.connection is None 

41 

42 

43class ConnectionPool(RequestInterface): 

44 """ 

45 A connection pool for making HTTP requests. 

46 """ 

47 

48 def __init__( 

49 self, 

50 ssl_context: ssl.SSLContext | None = None, 

51 proxy: Proxy | None = None, 

52 max_connections: int | None = 10, 

53 max_keepalive_connections: int | None = None, 

54 keepalive_expiry: float | None = None, 

55 http1: bool = True, 

56 http2: bool = False, 

57 retries: int = 0, 

58 local_address: str | None = None, 

59 uds: str | None = None, 

60 network_backend: NetworkBackend | None = None, 

61 socket_options: typing.Iterable[SOCKET_OPTION] | None = None, 

62 ) -> None: 

63 """ 

64 A connection pool for making HTTP requests. 

65 

66 Parameters: 

67 ssl_context: An SSL context to use for verifying connections. 

68 If not specified, the default `httpcore.default_ssl_context()` 

69 will be used. 

70 max_connections: The maximum number of concurrent HTTP connections that 

71 the pool should allow. Any attempt to send a request on a pool that 

72 would exceed this amount will block until a connection is available. 

73 max_keepalive_connections: The maximum number of idle HTTP connections 

74 that will be maintained in the pool. 

75 keepalive_expiry: The duration in seconds that an idle HTTP connection 

76 may be maintained for before being expired from the pool. 

77 http1: A boolean indicating if HTTP/1.1 requests should be supported 

78 by the connection pool. Defaults to True. 

79 http2: A boolean indicating if HTTP/2 requests should be supported by 

80 the connection pool. Defaults to False. 

81 retries: The maximum number of retries when trying to establish a 

82 connection. 

83 local_address: Local address to connect from. Can also be used to connect 

84 using a particular address family. Using `local_address="0.0.0.0"` 

85 will connect using an `AF_INET` address (IPv4), while using 

86 `local_address="::"` will connect using an `AF_INET6` address (IPv6). 

87 uds: Path to a Unix Domain Socket to use instead of TCP sockets. 

88 network_backend: A backend instance to use for handling network I/O. 

89 socket_options: Socket options that have to be included 

90 in the TCP socket when the connection was established. 

91 """ 

92 self._ssl_context = ssl_context 

93 self._proxy = proxy 

94 self._max_connections = ( 

95 sys.maxsize if max_connections is None else max_connections 

96 ) 

97 self._max_keepalive_connections = ( 

98 sys.maxsize 

99 if max_keepalive_connections is None 

100 else max_keepalive_connections 

101 ) 

102 self._max_keepalive_connections = min( 

103 self._max_connections, self._max_keepalive_connections 

104 ) 

105 

106 self._keepalive_expiry = keepalive_expiry 

107 self._http1 = http1 

108 self._http2 = http2 

109 self._retries = retries 

110 self._local_address = local_address 

111 self._uds = uds 

112 

113 self._network_backend = ( 

114 SyncBackend() if network_backend is None else network_backend 

115 ) 

116 self._socket_options = socket_options 

117 

118 # The mutable state on a connection pool is the queue of incoming requests, 

119 # and the set of connections that are servicing those requests. 

120 self._connections: list[ConnectionInterface] = [] 

121 self._requests: list[PoolRequest] = [] 

122 

123 # We only mutate the state of the connection pool within an 'optional_thread_lock' 

124 # context. This holds a threading lock unless we're running in async mode, 

125 # in which case it is a no-op. 

126 self._optional_thread_lock = ThreadLock() 

127 

128 def create_connection(self, origin: Origin) -> ConnectionInterface: 

129 if self._proxy is not None: 

130 if self._proxy.url.scheme in (b"socks5", b"socks5h"): 

131 from .socks_proxy import Socks5Connection 

132 

133 return Socks5Connection( 

134 proxy_origin=self._proxy.url.origin, 

135 proxy_auth=self._proxy.auth, 

136 remote_origin=origin, 

137 ssl_context=self._ssl_context, 

138 keepalive_expiry=self._keepalive_expiry, 

139 http1=self._http1, 

140 http2=self._http2, 

141 network_backend=self._network_backend, 

142 ) 

143 elif origin.scheme == b"http": 

144 from .http_proxy import ForwardHTTPConnection 

145 

146 return ForwardHTTPConnection( 

147 proxy_origin=self._proxy.url.origin, 

148 proxy_headers=self._proxy.headers, 

149 proxy_ssl_context=self._proxy.ssl_context, 

150 remote_origin=origin, 

151 keepalive_expiry=self._keepalive_expiry, 

152 network_backend=self._network_backend, 

153 ) 

154 from .http_proxy import TunnelHTTPConnection 

155 

156 return TunnelHTTPConnection( 

157 proxy_origin=self._proxy.url.origin, 

158 proxy_headers=self._proxy.headers, 

159 proxy_ssl_context=self._proxy.ssl_context, 

160 remote_origin=origin, 

161 ssl_context=self._ssl_context, 

162 keepalive_expiry=self._keepalive_expiry, 

163 http1=self._http1, 

164 http2=self._http2, 

165 network_backend=self._network_backend, 

166 ) 

167 

168 return HTTPConnection( 

169 origin=origin, 

170 ssl_context=self._ssl_context, 

171 keepalive_expiry=self._keepalive_expiry, 

172 http1=self._http1, 

173 http2=self._http2, 

174 retries=self._retries, 

175 local_address=self._local_address, 

176 uds=self._uds, 

177 network_backend=self._network_backend, 

178 socket_options=self._socket_options, 

179 ) 

180 

181 @property 

182 def connections(self) -> list[ConnectionInterface]: 

183 """ 

184 Return a list of the connections currently in the pool. 

185 

186 For example: 

187 

188 ```python 

189 >>> pool.connections 

190 [ 

191 <HTTPConnection ['https://example.com:443', HTTP/1.1, ACTIVE, Request Count: 6]>, 

192 <HTTPConnection ['https://example.com:443', HTTP/1.1, IDLE, Request Count: 9]> , 

193 <HTTPConnection ['http://example.com:80', HTTP/1.1, IDLE, Request Count: 1]>, 

194 ] 

195 ``` 

196 """ 

197 return list(self._connections) 

198 

199 def handle_request(self, request: Request) -> Response: 

200 """ 

201 Send an HTTP request, and return an HTTP response. 

202 

203 This is the core implementation that is called into by `.request()` or `.stream()`. 

204 """ 

205 scheme = request.url.scheme.decode() 

206 if scheme == "": 

207 raise UnsupportedProtocol( 

208 "Request URL is missing an 'http://' or 'https://' protocol." 

209 ) 

210 if scheme not in ("http", "https", "ws", "wss"): 

211 raise UnsupportedProtocol( 

212 f"Request URL has an unsupported protocol '{scheme}://'." 

213 ) 

214 

215 timeouts = request.extensions.get("timeout", {}) 

216 timeout = timeouts.get("pool", None) 

217 

218 with self._optional_thread_lock: 

219 # Add the incoming request to our request queue. 

220 pool_request = PoolRequest(request) 

221 self._requests.append(pool_request) 

222 

223 try: 

224 while True: 

225 with self._optional_thread_lock: 

226 # Assign incoming requests to available connections, 

227 # closing or creating new connections as required. 

228 closing = self._assign_requests_to_connections() 

229 self._close_connections(closing) 

230 

231 # Wait until this request has an assigned connection. 

232 connection = pool_request.wait_for_connection(timeout=timeout) 

233 

234 try: 

235 # Send the request on the assigned connection. 

236 response = connection.handle_request( 

237 pool_request.request 

238 ) 

239 except ConnectionNotAvailable: 

240 # In some cases a connection may initially be available to 

241 # handle a request, but then become unavailable. 

242 # 

243 # In this case we clear the connection and try again. 

244 pool_request.clear_connection() 

245 else: 

246 break # pragma: nocover 

247 

248 except BaseException as exc: 

249 with self._optional_thread_lock: 

250 # For any exception or cancellation we remove the request from 

251 # the queue, and then re-assign requests to connections. 

252 self._requests.remove(pool_request) 

253 closing = self._assign_requests_to_connections() 

254 

255 self._close_connections(closing) 

256 raise exc from None 

257 

258 # Return the response. Note that in this case we still have to manage 

259 # the point at which the response is closed. 

260 assert isinstance(response.stream, typing.Iterable) 

261 return Response( 

262 status=response.status, 

263 headers=response.headers, 

264 content=PoolByteStream( 

265 stream=response.stream, pool_request=pool_request, pool=self 

266 ), 

267 extensions=response.extensions, 

268 ) 

269 

270 def _assign_requests_to_connections(self) -> list[ConnectionInterface]: 

271 """ 

272 Manage the state of the connection pool, assigning incoming 

273 requests to connections as available. 

274 

275 Called whenever a new request is added or removed from the pool. 

276 

277 Any closing connections are returned, allowing the I/O for closing 

278 those connections to be handled seperately. 

279 """ 

280 closing_connections = [] 

281 

282 # First we handle cleaning up any connections that are closed, 

283 # have expired their keep-alive, or surplus idle connections. 

284 for connection in list(self._connections): 

285 if connection.is_closed(): 

286 # log: "removing closed connection" 

287 self._connections.remove(connection) 

288 elif connection.has_expired(): 

289 # log: "closing expired connection" 

290 self._connections.remove(connection) 

291 closing_connections.append(connection) 

292 elif ( 

293 connection.is_idle() 

294 and len([connection.is_idle() for connection in self._connections]) 

295 > self._max_keepalive_connections 

296 ): 

297 # log: "closing idle connection" 

298 self._connections.remove(connection) 

299 closing_connections.append(connection) 

300 

301 # Assign queued requests to connections. 

302 queued_requests = [request for request in self._requests if request.is_queued()] 

303 for pool_request in queued_requests: 

304 origin = pool_request.request.url.origin 

305 available_connections = [ 

306 connection 

307 for connection in self._connections 

308 if connection.can_handle_request(origin) and connection.is_available() 

309 ] 

310 idle_connections = [ 

311 connection for connection in self._connections if connection.is_idle() 

312 ] 

313 

314 # There are three cases for how we may be able to handle the request: 

315 # 

316 # 1. There is an existing connection that can handle the request. 

317 # 2. We can create a new connection to handle the request. 

318 # 3. We can close an idle connection and then create a new connection 

319 # to handle the request. 

320 if available_connections: 

321 # log: "reusing existing connection" 

322 connection = available_connections[0] 

323 pool_request.assign_to_connection(connection) 

324 elif len(self._connections) < self._max_connections: 

325 # log: "creating new connection" 

326 connection = self.create_connection(origin) 

327 self._connections.append(connection) 

328 pool_request.assign_to_connection(connection) 

329 elif idle_connections: 

330 # log: "closing idle connection" 

331 connection = idle_connections[0] 

332 self._connections.remove(connection) 

333 closing_connections.append(connection) 

334 # log: "creating new connection" 

335 connection = self.create_connection(origin) 

336 self._connections.append(connection) 

337 pool_request.assign_to_connection(connection) 

338 

339 return closing_connections 

340 

341 def _close_connections(self, closing: list[ConnectionInterface]) -> None: 

342 # Close connections which have been removed from the pool. 

343 with ShieldCancellation(): 

344 for connection in closing: 

345 connection.close() 

346 

347 def close(self) -> None: 

348 # Explicitly close the connection pool. 

349 # Clears all existing requests and connections. 

350 with self._optional_thread_lock: 

351 closing_connections = list(self._connections) 

352 self._connections = [] 

353 self._close_connections(closing_connections) 

354 

355 def __enter__(self) -> ConnectionPool: 

356 return self 

357 

358 def __exit__( 

359 self, 

360 exc_type: type[BaseException] | None = None, 

361 exc_value: BaseException | None = None, 

362 traceback: types.TracebackType | None = None, 

363 ) -> None: 

364 self.close() 

365 

366 def __repr__(self) -> str: 

367 class_name = self.__class__.__name__ 

368 with self._optional_thread_lock: 

369 request_is_queued = [request.is_queued() for request in self._requests] 

370 connection_is_idle = [ 

371 connection.is_idle() for connection in self._connections 

372 ] 

373 

374 num_active_requests = request_is_queued.count(False) 

375 num_queued_requests = request_is_queued.count(True) 

376 num_active_connections = connection_is_idle.count(False) 

377 num_idle_connections = connection_is_idle.count(True) 

378 

379 requests_info = ( 

380 f"Requests: {num_active_requests} active, {num_queued_requests} queued" 

381 ) 

382 connection_info = ( 

383 f"Connections: {num_active_connections} active, {num_idle_connections} idle" 

384 ) 

385 

386 return f"<{class_name} [{requests_info} | {connection_info}]>" 

387 

388 

389class PoolByteStream: 

390 def __init__( 

391 self, 

392 stream: typing.Iterable[bytes], 

393 pool_request: PoolRequest, 

394 pool: ConnectionPool, 

395 ) -> None: 

396 self._stream = stream 

397 self._pool_request = pool_request 

398 self._pool = pool 

399 self._closed = False 

400 

401 def __iter__(self) -> typing.Iterator[bytes]: 

402 try: 

403 for part in self._stream: 

404 yield part 

405 except BaseException as exc: 

406 self.close() 

407 raise exc from None 

408 

409 def close(self) -> None: 

410 if not self._closed: 

411 self._closed = True 

412 with ShieldCancellation(): 

413 if hasattr(self._stream, "close"): 

414 self._stream.close() 

415 

416 with self._pool._optional_thread_lock: 

417 self._pool._requests.remove(self._pool_request) 

418 closing = self._pool._assign_requests_to_connections() 

419 

420 self._pool._close_connections(closing)