Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpcore/_sync/connection_pool.py: 21%

151 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:12 +0000

1import ssl 

2import sys 

3from types import TracebackType 

4from typing import Iterable, Iterator, List, Optional, Type 

5 

6from .._exceptions import ConnectionNotAvailable, UnsupportedProtocol 

7from .._models import Origin, Request, Response 

8from .._synchronization import Event, Lock 

9from ..backends.sync import SyncBackend 

10from ..backends.base import NetworkBackend 

11from .connection import HTTPConnection 

12from .interfaces import ConnectionInterface, RequestInterface 

13 

14 

15class RequestStatus: 

16 def __init__(self, request: Request): 

17 self.request = request 

18 self.connection: Optional[ConnectionInterface] = None 

19 self._connection_acquired = Event() 

20 

21 def set_connection(self, connection: ConnectionInterface) -> None: 

22 assert self.connection is None 

23 self.connection = connection 

24 self._connection_acquired.set() 

25 

26 def unset_connection(self) -> None: 

27 assert self.connection is not None 

28 self.connection = None 

29 self._connection_acquired = Event() 

30 

31 def wait_for_connection( 

32 self, timeout: Optional[float] = None 

33 ) -> ConnectionInterface: 

34 self._connection_acquired.wait(timeout=timeout) 

35 assert self.connection is not None 

36 return self.connection 

37 

38 

39class ConnectionPool(RequestInterface): 

40 """ 

41 A connection pool for making HTTP requests. 

42 """ 

43 

44 def __init__( 

45 self, 

46 ssl_context: Optional[ssl.SSLContext] = None, 

47 max_connections: Optional[int] = 10, 

48 max_keepalive_connections: Optional[int] = None, 

49 keepalive_expiry: Optional[float] = None, 

50 http1: bool = True, 

51 http2: bool = False, 

52 retries: int = 0, 

53 local_address: Optional[str] = None, 

54 uds: Optional[str] = None, 

55 network_backend: Optional[NetworkBackend] = None, 

56 ) -> None: 

57 """ 

58 A connection pool for making HTTP requests. 

59 

60 Parameters: 

61 ssl_context: An SSL context to use for verifying connections. 

62 If not specified, the default `httpcore.default_ssl_context()` 

63 will be used. 

64 max_connections: The maximum number of concurrent HTTP connections that 

65 the pool should allow. Any attempt to send a request on a pool that 

66 would exceed this amount will block until a connection is available. 

67 max_keepalive_connections: The maximum number of idle HTTP connections 

68 that will be maintained in the pool. 

69 keepalive_expiry: The duration in seconds that an idle HTTP connection 

70 may be maintained for before being expired from the pool. 

71 http1: A boolean indicating if HTTP/1.1 requests should be supported 

72 by the connection pool. Defaults to True. 

73 http2: A boolean indicating if HTTP/2 requests should be supported by 

74 the connection pool. Defaults to False. 

75 retries: The maximum number of retries when trying to establish a 

76 connection. 

77 local_address: Local address to connect from. Can also be used to connect 

78 using a particular address family. Using `local_address="0.0.0.0"` 

79 will connect using an `AF_INET` address (IPv4), while using 

80 `local_address="::"` will connect using an `AF_INET6` address (IPv6). 

81 uds: Path to a Unix Domain Socket to use instead of TCP sockets. 

82 network_backend: A backend instance to use for handling network I/O. 

83 """ 

84 self._ssl_context = ssl_context 

85 

86 self._max_connections = ( 

87 sys.maxsize if max_connections is None else max_connections 

88 ) 

89 self._max_keepalive_connections = ( 

90 sys.maxsize 

91 if max_keepalive_connections is None 

92 else max_keepalive_connections 

93 ) 

94 self._max_keepalive_connections = min( 

95 self._max_connections, self._max_keepalive_connections 

96 ) 

97 

98 self._keepalive_expiry = keepalive_expiry 

99 self._http1 = http1 

100 self._http2 = http2 

101 self._retries = retries 

102 self._local_address = local_address 

103 self._uds = uds 

104 

105 self._pool: List[ConnectionInterface] = [] 

106 self._requests: List[RequestStatus] = [] 

107 self._pool_lock = Lock() 

108 self._network_backend = ( 

109 SyncBackend() if network_backend is None else network_backend 

110 ) 

111 

112 def create_connection(self, origin: Origin) -> ConnectionInterface: 

113 return HTTPConnection( 

114 origin=origin, 

115 ssl_context=self._ssl_context, 

116 keepalive_expiry=self._keepalive_expiry, 

117 http1=self._http1, 

118 http2=self._http2, 

119 retries=self._retries, 

120 local_address=self._local_address, 

121 uds=self._uds, 

122 network_backend=self._network_backend, 

123 ) 

124 

125 @property 

126 def connections(self) -> List[ConnectionInterface]: 

127 """ 

128 Return a list of the connections currently in the pool. 

129 

130 For example: 

131 

132 ```python 

133 >>> pool.connections 

134 [ 

135 <HTTPConnection ['https://example.com:443', HTTP/1.1, ACTIVE, Request Count: 6]>, 

136 <HTTPConnection ['https://example.com:443', HTTP/1.1, IDLE, Request Count: 9]> , 

137 <HTTPConnection ['http://example.com:80', HTTP/1.1, IDLE, Request Count: 1]>, 

138 ] 

139 ``` 

140 """ 

141 return list(self._pool) 

142 

143 def _attempt_to_acquire_connection(self, status: RequestStatus) -> bool: 

144 """ 

145 Attempt to provide a connection that can handle the given origin. 

146 """ 

147 origin = status.request.url.origin 

148 

149 # If there are queued requests in front of us, then don't acquire a 

150 # connection. We handle requests strictly in order. 

151 waiting = [s for s in self._requests if s.connection is None] 

152 if waiting and waiting[0] is not status: 

153 return False 

154 

155 # Reuse an existing connection if one is currently available. 

156 for idx, connection in enumerate(self._pool): 

157 if connection.can_handle_request(origin) and connection.is_available(): 

158 self._pool.pop(idx) 

159 self._pool.insert(0, connection) 

160 status.set_connection(connection) 

161 return True 

162 

163 # If the pool is currently full, attempt to close one idle connection. 

164 if len(self._pool) >= self._max_connections: 

165 for idx, connection in reversed(list(enumerate(self._pool))): 

166 if connection.is_idle(): 

167 connection.close() 

168 self._pool.pop(idx) 

169 break 

170 

171 # If the pool is still full, then we cannot acquire a connection. 

172 if len(self._pool) >= self._max_connections: 

173 return False 

174 

175 # Otherwise create a new connection. 

176 connection = self.create_connection(origin) 

177 self._pool.insert(0, connection) 

178 status.set_connection(connection) 

179 return True 

180 

181 def _close_expired_connections(self) -> None: 

182 """ 

183 Clean up the connection pool by closing off any connections that have expired. 

184 """ 

185 # Close any connections that have expired their keep-alive time. 

186 for idx, connection in reversed(list(enumerate(self._pool))): 

187 if connection.has_expired(): 

188 connection.close() 

189 self._pool.pop(idx) 

190 

191 # If the pool size exceeds the maximum number of allowed keep-alive connections, 

192 # then close off idle connections as required. 

193 pool_size = len(self._pool) 

194 for idx, connection in reversed(list(enumerate(self._pool))): 

195 if connection.is_idle() and pool_size > self._max_keepalive_connections: 

196 connection.close() 

197 self._pool.pop(idx) 

198 pool_size -= 1 

199 

200 def handle_request(self, request: Request) -> Response: 

201 """ 

202 Send an HTTP request, and return an HTTP response. 

203 

204 This is the core implementation that is called into by `.request()` or `.stream()`. 

205 """ 

206 scheme = request.url.scheme.decode() 

207 if scheme == "": 

208 raise UnsupportedProtocol( 

209 "Request URL is missing an 'http://' or 'https://' protocol." 

210 ) 

211 if scheme not in ("http", "https", "ws", "wss"): 

212 raise UnsupportedProtocol( 

213 f"Request URL has an unsupported protocol '{scheme}://'." 

214 ) 

215 

216 status = RequestStatus(request) 

217 

218 with self._pool_lock: 

219 self._requests.append(status) 

220 self._close_expired_connections() 

221 self._attempt_to_acquire_connection(status) 

222 

223 while True: 

224 timeouts = request.extensions.get("timeout", {}) 

225 timeout = timeouts.get("pool", None) 

226 try: 

227 connection = status.wait_for_connection(timeout=timeout) 

228 except BaseException as exc: 

229 # If we timeout here, or if the task is cancelled, then make 

230 # sure to remove the request from the queue before bubbling 

231 # up the exception. 

232 with self._pool_lock: 

233 self._requests.remove(status) 

234 raise exc 

235 

236 try: 

237 response = connection.handle_request(request) 

238 except ConnectionNotAvailable: 

239 # The ConnectionNotAvailable exception is a special case, that 

240 # indicates we need to retry the request on a new connection. 

241 # 

242 # The most common case where this can occur is when multiple 

243 # requests are queued waiting for a single connection, which 

244 # might end up as an HTTP/2 connection, but which actually ends 

245 # up as HTTP/1.1. 

246 with self._pool_lock: 

247 # Maintain our position in the request queue, but reset the 

248 # status so that the request becomes queued again. 

249 status.unset_connection() 

250 self._attempt_to_acquire_connection(status) 

251 except BaseException as exc: 

252 self.response_closed(status) 

253 raise exc 

254 else: 

255 break 

256 

257 # When we return the response, we wrap the stream in a special class 

258 # that handles notifying the connection pool once the response 

259 # has been released. 

260 assert isinstance(response.stream, Iterable) 

261 return Response( 

262 status=response.status, 

263 headers=response.headers, 

264 content=ConnectionPoolByteStream(response.stream, self, status), 

265 extensions=response.extensions, 

266 ) 

267 

268 def response_closed(self, status: RequestStatus) -> None: 

269 """ 

270 This method acts as a callback once the request/response cycle is complete. 

271 

272 It is called into from the `ConnectionPoolByteStream.close()` method. 

273 """ 

274 assert status.connection is not None 

275 connection = status.connection 

276 

277 with self._pool_lock: 

278 # Update the state of the connection pool. 

279 if status in self._requests: 

280 self._requests.remove(status) 

281 

282 if connection.is_closed() and connection in self._pool: 

283 self._pool.remove(connection) 

284 

285 # Since we've had a response closed, it's possible we'll now be able 

286 # to service one or more requests that are currently pending. 

287 for status in self._requests: 

288 if status.connection is None: 

289 acquired = self._attempt_to_acquire_connection(status) 

290 # If we could not acquire a connection for a queued request 

291 # then we don't need to check anymore requests that are 

292 # queued later behind it. 

293 if not acquired: 

294 break 

295 

296 # Housekeeping. 

297 self._close_expired_connections() 

298 

299 def close(self) -> None: 

300 """ 

301 Close any connections in the pool. 

302 """ 

303 with self._pool_lock: 

304 for connection in self._pool: 

305 connection.close() 

306 self._pool = [] 

307 self._requests = [] 

308 

309 def __enter__(self) -> "ConnectionPool": 

310 return self 

311 

312 def __exit__( 

313 self, 

314 exc_type: Optional[Type[BaseException]] = None, 

315 exc_value: Optional[BaseException] = None, 

316 traceback: Optional[TracebackType] = None, 

317 ) -> None: 

318 self.close() 

319 

320 

321class ConnectionPoolByteStream: 

322 """ 

323 A wrapper around the response byte stream, that additionally handles 

324 notifying the connection pool when the response has been closed. 

325 """ 

326 

327 def __init__( 

328 self, 

329 stream: Iterable[bytes], 

330 pool: ConnectionPool, 

331 status: RequestStatus, 

332 ) -> None: 

333 self._stream = stream 

334 self._pool = pool 

335 self._status = status 

336 

337 def __iter__(self) -> Iterator[bytes]: 

338 for part in self._stream: 

339 yield part 

340 

341 def close(self) -> None: 

342 try: 

343 if hasattr(self._stream, "close"): 

344 self._stream.close() 

345 finally: 

346 self._pool.response_closed(self._status)