Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpcore/_sync/connection_pool.py: 21%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

158 statements  

1import ssl 

2import sys 

3from types import TracebackType 

4from typing import Iterable, Iterator, Iterable, List, Optional, Type 

5 

6from .._backends.sync import SyncBackend 

7from .._backends.base import SOCKET_OPTION, NetworkBackend 

8from .._exceptions import ConnectionNotAvailable, UnsupportedProtocol 

9from .._models import Origin, Request, Response 

10from .._synchronization import Event, ShieldCancellation, ThreadLock 

11from .connection import HTTPConnection 

12from .interfaces import ConnectionInterface, RequestInterface 

13 

14 

15class PoolRequest: 

16 def __init__(self, request: Request) -> None: 

17 self.request = request 

18 self.connection: Optional[ConnectionInterface] = None 

19 self._connection_acquired = Event() 

20 

21 def assign_to_connection( 

22 self, connection: Optional[ConnectionInterface] 

23 ) -> None: 

24 self.connection = connection 

25 self._connection_acquired.set() 

26 

27 def clear_connection(self) -> None: 

28 self.connection = None 

29 self._connection_acquired = Event() 

30 

31 def wait_for_connection( 

32 self, timeout: Optional[float] = None 

33 ) -> ConnectionInterface: 

34 if self.connection is None: 

35 self._connection_acquired.wait(timeout=timeout) 

36 assert self.connection is not None 

37 return self.connection 

38 

39 def is_queued(self) -> bool: 

40 return self.connection is None 

41 

42 

43class ConnectionPool(RequestInterface): 

44 """ 

45 A connection pool for making HTTP requests. 

46 """ 

47 

48 def __init__( 

49 self, 

50 ssl_context: Optional[ssl.SSLContext] = None, 

51 max_connections: Optional[int] = 10, 

52 max_keepalive_connections: Optional[int] = None, 

53 keepalive_expiry: Optional[float] = None, 

54 http1: bool = True, 

55 http2: bool = False, 

56 retries: int = 0, 

57 local_address: Optional[str] = None, 

58 uds: Optional[str] = None, 

59 network_backend: Optional[NetworkBackend] = None, 

60 socket_options: Optional[Iterable[SOCKET_OPTION]] = None, 

61 ) -> None: 

62 """ 

63 A connection pool for making HTTP requests. 

64 

65 Parameters: 

66 ssl_context: An SSL context to use for verifying connections. 

67 If not specified, the default `httpcore.default_ssl_context()` 

68 will be used. 

69 max_connections: The maximum number of concurrent HTTP connections that 

70 the pool should allow. Any attempt to send a request on a pool that 

71 would exceed this amount will block until a connection is available. 

72 max_keepalive_connections: The maximum number of idle HTTP connections 

73 that will be maintained in the pool. 

74 keepalive_expiry: The duration in seconds that an idle HTTP connection 

75 may be maintained for before being expired from the pool. 

76 http1: A boolean indicating if HTTP/1.1 requests should be supported 

77 by the connection pool. Defaults to True. 

78 http2: A boolean indicating if HTTP/2 requests should be supported by 

79 the connection pool. Defaults to False. 

80 retries: The maximum number of retries when trying to establish a 

81 connection. 

82 local_address: Local address to connect from. Can also be used to connect 

83 using a particular address family. Using `local_address="0.0.0.0"` 

84 will connect using an `AF_INET` address (IPv4), while using 

85 `local_address="::"` will connect using an `AF_INET6` address (IPv6). 

86 uds: Path to a Unix Domain Socket to use instead of TCP sockets. 

87 network_backend: A backend instance to use for handling network I/O. 

88 socket_options: Socket options that have to be included 

89 in the TCP socket when the connection was established. 

90 """ 

91 self._ssl_context = ssl_context 

92 

93 self._max_connections = ( 

94 sys.maxsize if max_connections is None else max_connections 

95 ) 

96 self._max_keepalive_connections = ( 

97 sys.maxsize 

98 if max_keepalive_connections is None 

99 else max_keepalive_connections 

100 ) 

101 self._max_keepalive_connections = min( 

102 self._max_connections, self._max_keepalive_connections 

103 ) 

104 

105 self._keepalive_expiry = keepalive_expiry 

106 self._http1 = http1 

107 self._http2 = http2 

108 self._retries = retries 

109 self._local_address = local_address 

110 self._uds = uds 

111 

112 self._network_backend = ( 

113 SyncBackend() if network_backend is None else network_backend 

114 ) 

115 self._socket_options = socket_options 

116 

117 # The mutable state on a connection pool is the queue of incoming requests, 

118 # and the set of connections that are servicing those requests. 

119 self._connections: List[ConnectionInterface] = [] 

120 self._requests: List[PoolRequest] = [] 

121 

122 # We only mutate the state of the connection pool within an 'optional_thread_lock' 

123 # context. This holds a threading lock unless we're running in async mode, 

124 # in which case it is a no-op. 

125 self._optional_thread_lock = ThreadLock() 

126 

127 def create_connection(self, origin: Origin) -> ConnectionInterface: 

128 return HTTPConnection( 

129 origin=origin, 

130 ssl_context=self._ssl_context, 

131 keepalive_expiry=self._keepalive_expiry, 

132 http1=self._http1, 

133 http2=self._http2, 

134 retries=self._retries, 

135 local_address=self._local_address, 

136 uds=self._uds, 

137 network_backend=self._network_backend, 

138 socket_options=self._socket_options, 

139 ) 

140 

141 @property 

142 def connections(self) -> List[ConnectionInterface]: 

143 """ 

144 Return a list of the connections currently in the pool. 

145 

146 For example: 

147 

148 ```python 

149 >>> pool.connections 

150 [ 

151 <HTTPConnection ['https://example.com:443', HTTP/1.1, ACTIVE, Request Count: 6]>, 

152 <HTTPConnection ['https://example.com:443', HTTP/1.1, IDLE, Request Count: 9]> , 

153 <HTTPConnection ['http://example.com:80', HTTP/1.1, IDLE, Request Count: 1]>, 

154 ] 

155 ``` 

156 """ 

157 return list(self._connections) 

158 

159 def handle_request(self, request: Request) -> Response: 

160 """ 

161 Send an HTTP request, and return an HTTP response. 

162 

163 This is the core implementation that is called into by `.request()` or `.stream()`. 

164 """ 

165 scheme = request.url.scheme.decode() 

166 if scheme == "": 

167 raise UnsupportedProtocol( 

168 "Request URL is missing an 'http://' or 'https://' protocol." 

169 ) 

170 if scheme not in ("http", "https", "ws", "wss"): 

171 raise UnsupportedProtocol( 

172 f"Request URL has an unsupported protocol '{scheme}://'." 

173 ) 

174 

175 timeouts = request.extensions.get("timeout", {}) 

176 timeout = timeouts.get("pool", None) 

177 

178 with self._optional_thread_lock: 

179 # Add the incoming request to our request queue. 

180 pool_request = PoolRequest(request) 

181 self._requests.append(pool_request) 

182 

183 try: 

184 while True: 

185 with self._optional_thread_lock: 

186 # Assign incoming requests to available connections, 

187 # closing or creating new connections as required. 

188 closing = self._assign_requests_to_connections() 

189 self._close_connections(closing) 

190 

191 # Wait until this request has an assigned connection. 

192 connection = pool_request.wait_for_connection(timeout=timeout) 

193 

194 try: 

195 # Send the request on the assigned connection. 

196 response = connection.handle_request( 

197 pool_request.request 

198 ) 

199 except ConnectionNotAvailable: 

200 # In some cases a connection may initially be available to 

201 # handle a request, but then become unavailable. 

202 # 

203 # In this case we clear the connection and try again. 

204 pool_request.clear_connection() 

205 else: 

206 break # pragma: nocover 

207 

208 except BaseException as exc: 

209 with self._optional_thread_lock: 

210 # For any exception or cancellation we remove the request from 

211 # the queue, and then re-assign requests to connections. 

212 self._requests.remove(pool_request) 

213 closing = self._assign_requests_to_connections() 

214 

215 self._close_connections(closing) 

216 raise exc from None 

217 

218 # Return the response. Note that in this case we still have to manage 

219 # the point at which the response is closed. 

220 assert isinstance(response.stream, Iterable) 

221 return Response( 

222 status=response.status, 

223 headers=response.headers, 

224 content=PoolByteStream( 

225 stream=response.stream, pool_request=pool_request, pool=self 

226 ), 

227 extensions=response.extensions, 

228 ) 

229 

230 def _assign_requests_to_connections(self) -> List[ConnectionInterface]: 

231 """ 

232 Manage the state of the connection pool, assigning incoming 

233 requests to connections as available. 

234 

235 Called whenever a new request is added or removed from the pool. 

236 

237 Any closing connections are returned, allowing the I/O for closing 

238 those connections to be handled seperately. 

239 """ 

240 closing_connections = [] 

241 

242 # First we handle cleaning up any connections that are closed, 

243 # have expired their keep-alive, or surplus idle connections. 

244 for connection in list(self._connections): 

245 if connection.is_closed(): 

246 # log: "removing closed connection" 

247 self._connections.remove(connection) 

248 elif connection.has_expired(): 

249 # log: "closing expired connection" 

250 self._connections.remove(connection) 

251 closing_connections.append(connection) 

252 elif ( 

253 connection.is_idle() 

254 and len([connection.is_idle() for connection in self._connections]) 

255 > self._max_keepalive_connections 

256 ): 

257 # log: "closing idle connection" 

258 self._connections.remove(connection) 

259 closing_connections.append(connection) 

260 

261 # Assign queued requests to connections. 

262 queued_requests = [request for request in self._requests if request.is_queued()] 

263 for pool_request in queued_requests: 

264 origin = pool_request.request.url.origin 

265 avilable_connections = [ 

266 connection 

267 for connection in self._connections 

268 if connection.can_handle_request(origin) and connection.is_available() 

269 ] 

270 idle_connections = [ 

271 connection for connection in self._connections if connection.is_idle() 

272 ] 

273 

274 # There are three cases for how we may be able to handle the request: 

275 # 

276 # 1. There is an existing connection that can handle the request. 

277 # 2. We can create a new connection to handle the request. 

278 # 3. We can close an idle connection and then create a new connection 

279 # to handle the request. 

280 if avilable_connections: 

281 # log: "reusing existing connection" 

282 connection = avilable_connections[0] 

283 pool_request.assign_to_connection(connection) 

284 elif len(self._connections) < self._max_connections: 

285 # log: "creating new connection" 

286 connection = self.create_connection(origin) 

287 self._connections.append(connection) 

288 pool_request.assign_to_connection(connection) 

289 elif idle_connections: 

290 # log: "closing idle connection" 

291 connection = idle_connections[0] 

292 self._connections.remove(connection) 

293 closing_connections.append(connection) 

294 # log: "creating new connection" 

295 connection = self.create_connection(origin) 

296 self._connections.append(connection) 

297 pool_request.assign_to_connection(connection) 

298 

299 return closing_connections 

300 

301 def _close_connections(self, closing: List[ConnectionInterface]) -> None: 

302 # Close connections which have been removed from the pool. 

303 with ShieldCancellation(): 

304 for connection in closing: 

305 connection.close() 

306 

307 def close(self) -> None: 

308 # Explicitly close the connection pool. 

309 # Clears all existing requests and connections. 

310 with self._optional_thread_lock: 

311 closing_connections = list(self._connections) 

312 self._connections = [] 

313 self._close_connections(closing_connections) 

314 

315 def __enter__(self) -> "ConnectionPool": 

316 return self 

317 

318 def __exit__( 

319 self, 

320 exc_type: Optional[Type[BaseException]] = None, 

321 exc_value: Optional[BaseException] = None, 

322 traceback: Optional[TracebackType] = None, 

323 ) -> None: 

324 self.close() 

325 

326 def __repr__(self) -> str: 

327 class_name = self.__class__.__name__ 

328 with self._optional_thread_lock: 

329 request_is_queued = [request.is_queued() for request in self._requests] 

330 connection_is_idle = [ 

331 connection.is_idle() for connection in self._connections 

332 ] 

333 

334 num_active_requests = request_is_queued.count(False) 

335 num_queued_requests = request_is_queued.count(True) 

336 num_active_connections = connection_is_idle.count(False) 

337 num_idle_connections = connection_is_idle.count(True) 

338 

339 requests_info = ( 

340 f"Requests: {num_active_requests} active, {num_queued_requests} queued" 

341 ) 

342 connection_info = ( 

343 f"Connections: {num_active_connections} active, {num_idle_connections} idle" 

344 ) 

345 

346 return f"<{class_name} [{requests_info} | {connection_info}]>" 

347 

348 

349class PoolByteStream: 

350 def __init__( 

351 self, 

352 stream: Iterable[bytes], 

353 pool_request: PoolRequest, 

354 pool: ConnectionPool, 

355 ) -> None: 

356 self._stream = stream 

357 self._pool_request = pool_request 

358 self._pool = pool 

359 self._closed = False 

360 

361 def __iter__(self) -> Iterator[bytes]: 

362 try: 

363 for part in self._stream: 

364 yield part 

365 except BaseException as exc: 

366 self.close() 

367 raise exc from None 

368 

369 def close(self) -> None: 

370 if not self._closed: 

371 self._closed = True 

372 with ShieldCancellation(): 

373 if hasattr(self._stream, "close"): 

374 self._stream.close() 

375 

376 with self._pool._optional_thread_lock: 

377 self._pool._requests.remove(self._pool_request) 

378 closing = self._pool._assign_requests_to_connections() 

379 

380 self._pool._close_connections(closing)