Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpcore/_sync/connection_pool.py: 21%

154 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 07:19 +0000

1import ssl 

2import sys 

3from types import TracebackType 

4from typing import Iterable, Iterator, Iterable, List, Optional, Type 

5 

6from .._exceptions import ConnectionNotAvailable, UnsupportedProtocol 

7from .._models import Origin, Request, Response 

8from .._synchronization import Event, Lock 

9from ..backends.sync import SyncBackend 

10from ..backends.base import SOCKET_OPTION, NetworkBackend 

11from .connection import HTTPConnection 

12from .interfaces import ConnectionInterface, RequestInterface 

13 

14 

15class RequestStatus: 

16 def __init__(self, request: Request): 

17 self.request = request 

18 self.connection: Optional[ConnectionInterface] = None 

19 self._connection_acquired = Event() 

20 

21 def set_connection(self, connection: ConnectionInterface) -> None: 

22 assert self.connection is None 

23 self.connection = connection 

24 self._connection_acquired.set() 

25 

26 def unset_connection(self) -> None: 

27 assert self.connection is not None 

28 self.connection = None 

29 self._connection_acquired = Event() 

30 

31 def wait_for_connection( 

32 self, timeout: Optional[float] = None 

33 ) -> ConnectionInterface: 

34 if self.connection is None: 

35 self._connection_acquired.wait(timeout=timeout) 

36 assert self.connection is not None 

37 return self.connection 

38 

39 

40class ConnectionPool(RequestInterface): 

41 """ 

42 A connection pool for making HTTP requests. 

43 """ 

44 

45 def __init__( 

46 self, 

47 ssl_context: Optional[ssl.SSLContext] = None, 

48 max_connections: Optional[int] = 10, 

49 max_keepalive_connections: Optional[int] = None, 

50 keepalive_expiry: Optional[float] = None, 

51 http1: bool = True, 

52 http2: bool = False, 

53 retries: int = 0, 

54 local_address: Optional[str] = None, 

55 uds: Optional[str] = None, 

56 network_backend: Optional[NetworkBackend] = None, 

57 socket_options: Optional[Iterable[SOCKET_OPTION]] = None, 

58 ) -> None: 

59 """ 

60 A connection pool for making HTTP requests. 

61 

62 Parameters: 

63 ssl_context: An SSL context to use for verifying connections. 

64 If not specified, the default `httpcore.default_ssl_context()` 

65 will be used. 

66 max_connections: The maximum number of concurrent HTTP connections that 

67 the pool should allow. Any attempt to send a request on a pool that 

68 would exceed this amount will block until a connection is available. 

69 max_keepalive_connections: The maximum number of idle HTTP connections 

70 that will be maintained in the pool. 

71 keepalive_expiry: The duration in seconds that an idle HTTP connection 

72 may be maintained for before being expired from the pool. 

73 http1: A boolean indicating if HTTP/1.1 requests should be supported 

74 by the connection pool. Defaults to True. 

75 http2: A boolean indicating if HTTP/2 requests should be supported by 

76 the connection pool. Defaults to False. 

77 retries: The maximum number of retries when trying to establish a 

78 connection. 

79 local_address: Local address to connect from. Can also be used to connect 

80 using a particular address family. Using `local_address="0.0.0.0"` 

81 will connect using an `AF_INET` address (IPv4), while using 

82 `local_address="::"` will connect using an `AF_INET6` address (IPv6). 

83 uds: Path to a Unix Domain Socket to use instead of TCP sockets. 

84 network_backend: A backend instance to use for handling network I/O. 

85 socket_options: Socket options that have to be included 

86 in the TCP socket when the connection was established. 

87 """ 

88 self._ssl_context = ssl_context 

89 

90 self._max_connections = ( 

91 sys.maxsize if max_connections is None else max_connections 

92 ) 

93 self._max_keepalive_connections = ( 

94 sys.maxsize 

95 if max_keepalive_connections is None 

96 else max_keepalive_connections 

97 ) 

98 self._max_keepalive_connections = min( 

99 self._max_connections, self._max_keepalive_connections 

100 ) 

101 

102 self._keepalive_expiry = keepalive_expiry 

103 self._http1 = http1 

104 self._http2 = http2 

105 self._retries = retries 

106 self._local_address = local_address 

107 self._uds = uds 

108 

109 self._pool: List[ConnectionInterface] = [] 

110 self._requests: List[RequestStatus] = [] 

111 self._pool_lock = Lock() 

112 self._network_backend = ( 

113 SyncBackend() if network_backend is None else network_backend 

114 ) 

115 self._socket_options = socket_options 

116 

117 def create_connection(self, origin: Origin) -> ConnectionInterface: 

118 return HTTPConnection( 

119 origin=origin, 

120 ssl_context=self._ssl_context, 

121 keepalive_expiry=self._keepalive_expiry, 

122 http1=self._http1, 

123 http2=self._http2, 

124 retries=self._retries, 

125 local_address=self._local_address, 

126 uds=self._uds, 

127 network_backend=self._network_backend, 

128 socket_options=self._socket_options, 

129 ) 

130 

131 @property 

132 def connections(self) -> List[ConnectionInterface]: 

133 """ 

134 Return a list of the connections currently in the pool. 

135 

136 For example: 

137 

138 ```python 

139 >>> pool.connections 

140 [ 

141 <HTTPConnection ['https://example.com:443', HTTP/1.1, ACTIVE, Request Count: 6]>, 

142 <HTTPConnection ['https://example.com:443', HTTP/1.1, IDLE, Request Count: 9]> , 

143 <HTTPConnection ['http://example.com:80', HTTP/1.1, IDLE, Request Count: 1]>, 

144 ] 

145 ``` 

146 """ 

147 return list(self._pool) 

148 

149 def _attempt_to_acquire_connection(self, status: RequestStatus) -> bool: 

150 """ 

151 Attempt to provide a connection that can handle the given origin. 

152 """ 

153 origin = status.request.url.origin 

154 

155 # If there are queued requests in front of us, then don't acquire a 

156 # connection. We handle requests strictly in order. 

157 waiting = [s for s in self._requests if s.connection is None] 

158 if waiting and waiting[0] is not status: 

159 return False 

160 

161 # Reuse an existing connection if one is currently available. 

162 for idx, connection in enumerate(self._pool): 

163 if connection.can_handle_request(origin) and connection.is_available(): 

164 self._pool.pop(idx) 

165 self._pool.insert(0, connection) 

166 status.set_connection(connection) 

167 return True 

168 

169 # If the pool is currently full, attempt to close one idle connection. 

170 if len(self._pool) >= self._max_connections: 

171 for idx, connection in reversed(list(enumerate(self._pool))): 

172 if connection.is_idle(): 

173 connection.close() 

174 self._pool.pop(idx) 

175 break 

176 

177 # If the pool is still full, then we cannot acquire a connection. 

178 if len(self._pool) >= self._max_connections: 

179 return False 

180 

181 # Otherwise create a new connection. 

182 connection = self.create_connection(origin) 

183 self._pool.insert(0, connection) 

184 status.set_connection(connection) 

185 return True 

186 

187 def _close_expired_connections(self) -> None: 

188 """ 

189 Clean up the connection pool by closing off any connections that have expired. 

190 """ 

191 # Close any connections that have expired their keep-alive time. 

192 for idx, connection in reversed(list(enumerate(self._pool))): 

193 if connection.has_expired(): 

194 connection.close() 

195 self._pool.pop(idx) 

196 

197 # If the pool size exceeds the maximum number of allowed keep-alive connections, 

198 # then close off idle connections as required. 

199 pool_size = len(self._pool) 

200 for idx, connection in reversed(list(enumerate(self._pool))): 

201 if connection.is_idle() and pool_size > self._max_keepalive_connections: 

202 connection.close() 

203 self._pool.pop(idx) 

204 pool_size -= 1 

205 

206 def handle_request(self, request: Request) -> Response: 

207 """ 

208 Send an HTTP request, and return an HTTP response. 

209 

210 This is the core implementation that is called into by `.request()` or `.stream()`. 

211 """ 

212 scheme = request.url.scheme.decode() 

213 if scheme == "": 

214 raise UnsupportedProtocol( 

215 "Request URL is missing an 'http://' or 'https://' protocol." 

216 ) 

217 if scheme not in ("http", "https", "ws", "wss"): 

218 raise UnsupportedProtocol( 

219 f"Request URL has an unsupported protocol '{scheme}://'." 

220 ) 

221 

222 status = RequestStatus(request) 

223 

224 with self._pool_lock: 

225 self._requests.append(status) 

226 self._close_expired_connections() 

227 self._attempt_to_acquire_connection(status) 

228 

229 while True: 

230 timeouts = request.extensions.get("timeout", {}) 

231 timeout = timeouts.get("pool", None) 

232 try: 

233 connection = status.wait_for_connection(timeout=timeout) 

234 except BaseException as exc: 

235 # If we timeout here, or if the task is cancelled, then make 

236 # sure to remove the request from the queue before bubbling 

237 # up the exception. 

238 with self._pool_lock: 

239 # Ensure only remove when task exists. 

240 if status in self._requests: 

241 self._requests.remove(status) 

242 raise exc 

243 

244 try: 

245 response = connection.handle_request(request) 

246 except ConnectionNotAvailable: 

247 # The ConnectionNotAvailable exception is a special case, that 

248 # indicates we need to retry the request on a new connection. 

249 # 

250 # The most common case where this can occur is when multiple 

251 # requests are queued waiting for a single connection, which 

252 # might end up as an HTTP/2 connection, but which actually ends 

253 # up as HTTP/1.1. 

254 with self._pool_lock: 

255 # Maintain our position in the request queue, but reset the 

256 # status so that the request becomes queued again. 

257 status.unset_connection() 

258 self._attempt_to_acquire_connection(status) 

259 except BaseException as exc: 

260 self.response_closed(status) 

261 raise exc 

262 else: 

263 break 

264 

265 # When we return the response, we wrap the stream in a special class 

266 # that handles notifying the connection pool once the response 

267 # has been released. 

268 assert isinstance(response.stream, Iterable) 

269 return Response( 

270 status=response.status, 

271 headers=response.headers, 

272 content=ConnectionPoolByteStream(response.stream, self, status), 

273 extensions=response.extensions, 

274 ) 

275 

276 def response_closed(self, status: RequestStatus) -> None: 

277 """ 

278 This method acts as a callback once the request/response cycle is complete. 

279 

280 It is called into from the `ConnectionPoolByteStream.close()` method. 

281 """ 

282 assert status.connection is not None 

283 connection = status.connection 

284 

285 with self._pool_lock: 

286 # Update the state of the connection pool. 

287 if status in self._requests: 

288 self._requests.remove(status) 

289 

290 if connection.is_closed() and connection in self._pool: 

291 self._pool.remove(connection) 

292 

293 # Since we've had a response closed, it's possible we'll now be able 

294 # to service one or more requests that are currently pending. 

295 for status in self._requests: 

296 if status.connection is None: 

297 acquired = self._attempt_to_acquire_connection(status) 

298 # If we could not acquire a connection for a queued request 

299 # then we don't need to check anymore requests that are 

300 # queued later behind it. 

301 if not acquired: 

302 break 

303 

304 # Housekeeping. 

305 self._close_expired_connections() 

306 

307 def close(self) -> None: 

308 """ 

309 Close any connections in the pool. 

310 """ 

311 with self._pool_lock: 

312 for connection in self._pool: 

313 connection.close() 

314 self._pool = [] 

315 self._requests = [] 

316 

317 def __enter__(self) -> "ConnectionPool": 

318 return self 

319 

320 def __exit__( 

321 self, 

322 exc_type: Optional[Type[BaseException]] = None, 

323 exc_value: Optional[BaseException] = None, 

324 traceback: Optional[TracebackType] = None, 

325 ) -> None: 

326 self.close() 

327 

328 

329class ConnectionPoolByteStream: 

330 """ 

331 A wrapper around the response byte stream, that additionally handles 

332 notifying the connection pool when the response has been closed. 

333 """ 

334 

335 def __init__( 

336 self, 

337 stream: Iterable[bytes], 

338 pool: ConnectionPool, 

339 status: RequestStatus, 

340 ) -> None: 

341 self._stream = stream 

342 self._pool = pool 

343 self._status = status 

344 

345 def __iter__(self) -> Iterator[bytes]: 

346 for part in self._stream: 

347 yield part 

348 

349 def close(self) -> None: 

350 try: 

351 if hasattr(self._stream, "close"): 

352 self._stream.close() 

353 finally: 

354 self._pool.response_closed(self._status)