Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/impl.py: 33%

215 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1# sqlalchemy/pool.py 

2# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8 

9"""Pool implementation classes. 

10 

11""" 

12 

13import traceback 

14import weakref 

15 

16from .base import _AsyncConnDialect 

17from .base import _ConnectionFairy 

18from .base import _ConnectionRecord 

19from .base import Pool 

20from .. import exc 

21from .. import util 

22from ..util import chop_traceback 

23from ..util import queue as sqla_queue 

24from ..util import threading 

25 

26 

27class QueuePool(Pool): 

28 

29 """A :class:`_pool.Pool` 

30 that imposes a limit on the number of open connections. 

31 

32 :class:`.QueuePool` is the default pooling implementation used for 

33 all :class:`_engine.Engine` objects, unless the SQLite dialect is in use. 

34 

35 """ 

36 

37 _is_asyncio = False 

38 _queue_class = sqla_queue.Queue 

39 

40 def __init__( 

41 self, 

42 creator, 

43 pool_size=5, 

44 max_overflow=10, 

45 timeout=30.0, 

46 use_lifo=False, 

47 **kw 

48 ): 

49 r""" 

50 Construct a QueuePool. 

51 

52 :param creator: a callable function that returns a DB-API 

53 connection object, same as that of :paramref:`_pool.Pool.creator`. 

54 

55 :param pool_size: The size of the pool to be maintained, 

56 defaults to 5. This is the largest number of connections that 

57 will be kept persistently in the pool. Note that the pool 

58 begins with no connections; once this number of connections 

59 is requested, that number of connections will remain. 

60 ``pool_size`` can be set to 0 to indicate no size limit; to 

61 disable pooling, use a :class:`~sqlalchemy.pool.NullPool` 

62 instead. 

63 

64 :param max_overflow: The maximum overflow size of the 

65 pool. When the number of checked-out connections reaches the 

66 size set in pool_size, additional connections will be 

67 returned up to this limit. When those additional connections 

68 are returned to the pool, they are disconnected and 

69 discarded. It follows then that the total number of 

70 simultaneous connections the pool will allow is pool_size + 

71 `max_overflow`, and the total number of "sleeping" 

72 connections the pool will allow is pool_size. `max_overflow` 

73 can be set to -1 to indicate no overflow limit; no limit 

74 will be placed on the total number of concurrent 

75 connections. Defaults to 10. 

76 

77 :param timeout: The number of seconds to wait before giving up 

78 on returning a connection. Defaults to 30.0. This can be a float 

79 but is subject to the limitations of Python time functions which 

80 may not be reliable in the tens of milliseconds. 

81 

82 :param use_lifo: use LIFO (last-in-first-out) when retrieving 

83 connections instead of FIFO (first-in-first-out). Using LIFO, a 

84 server-side timeout scheme can reduce the number of connections used 

85 during non-peak periods of use. When planning for server-side 

86 timeouts, ensure that a recycle or pre-ping strategy is in use to 

87 gracefully handle stale connections. 

88 

89 .. versionadded:: 1.3 

90 

91 .. seealso:: 

92 

93 :ref:`pool_use_lifo` 

94 

95 :ref:`pool_disconnects` 

96 

97 :param \**kw: Other keyword arguments including 

98 :paramref:`_pool.Pool.recycle`, :paramref:`_pool.Pool.echo`, 

99 :paramref:`_pool.Pool.reset_on_return` and others are passed to the 

100 :class:`_pool.Pool` constructor. 

101 

102 """ 

103 Pool.__init__(self, creator, **kw) 

104 self._pool = self._queue_class(pool_size, use_lifo=use_lifo) 

105 self._overflow = 0 - pool_size 

106 self._max_overflow = max_overflow 

107 self._timeout = timeout 

108 self._overflow_lock = threading.Lock() 

109 

110 def _do_return_conn(self, conn): 

111 try: 

112 self._pool.put(conn, False) 

113 except sqla_queue.Full: 

114 try: 

115 conn.close() 

116 finally: 

117 self._dec_overflow() 

118 

119 def _do_get(self): 

120 use_overflow = self._max_overflow > -1 

121 

122 try: 

123 wait = use_overflow and self._overflow >= self._max_overflow 

124 return self._pool.get(wait, self._timeout) 

125 except sqla_queue.Empty: 

126 # don't do things inside of "except Empty", because when we say 

127 # we timed out or can't connect and raise, Python 3 tells 

128 # people the real error is queue.Empty which it isn't. 

129 pass 

130 if use_overflow and self._overflow >= self._max_overflow: 

131 if not wait: 

132 return self._do_get() 

133 else: 

134 raise exc.TimeoutError( 

135 "QueuePool limit of size %d overflow %d reached, " 

136 "connection timed out, timeout %0.2f" 

137 % (self.size(), self.overflow(), self._timeout), 

138 code="3o7r", 

139 ) 

140 

141 if self._inc_overflow(): 

142 try: 

143 return self._create_connection() 

144 except: 

145 with util.safe_reraise(): 

146 self._dec_overflow() 

147 else: 

148 return self._do_get() 

149 

150 def _inc_overflow(self): 

151 if self._max_overflow == -1: 

152 self._overflow += 1 

153 return True 

154 with self._overflow_lock: 

155 if self._overflow < self._max_overflow: 

156 self._overflow += 1 

157 return True 

158 else: 

159 return False 

160 

161 def _dec_overflow(self): 

162 if self._max_overflow == -1: 

163 self._overflow -= 1 

164 return True 

165 with self._overflow_lock: 

166 self._overflow -= 1 

167 return True 

168 

169 def recreate(self): 

170 self.logger.info("Pool recreating") 

171 return self.__class__( 

172 self._creator, 

173 pool_size=self._pool.maxsize, 

174 max_overflow=self._max_overflow, 

175 pre_ping=self._pre_ping, 

176 use_lifo=self._pool.use_lifo, 

177 timeout=self._timeout, 

178 recycle=self._recycle, 

179 echo=self.echo, 

180 logging_name=self._orig_logging_name, 

181 reset_on_return=self._reset_on_return, 

182 _dispatch=self.dispatch, 

183 dialect=self._dialect, 

184 ) 

185 

186 def dispose(self): 

187 while True: 

188 try: 

189 conn = self._pool.get(False) 

190 conn.close() 

191 except sqla_queue.Empty: 

192 break 

193 

194 self._overflow = 0 - self.size() 

195 self.logger.info("Pool disposed. %s", self.status()) 

196 

197 def status(self): 

198 return ( 

199 "Pool size: %d Connections in pool: %d " 

200 "Current Overflow: %d Current Checked out " 

201 "connections: %d" 

202 % ( 

203 self.size(), 

204 self.checkedin(), 

205 self.overflow(), 

206 self.checkedout(), 

207 ) 

208 ) 

209 

210 def size(self): 

211 return self._pool.maxsize 

212 

213 def timeout(self): 

214 return self._timeout 

215 

216 def checkedin(self): 

217 return self._pool.qsize() 

218 

219 def overflow(self): 

220 return self._overflow 

221 

222 def checkedout(self): 

223 return self._pool.maxsize - self._pool.qsize() + self._overflow 

224 

225 

226class AsyncAdaptedQueuePool(QueuePool): 

227 _is_asyncio = True 

228 _queue_class = sqla_queue.AsyncAdaptedQueue 

229 _dialect = _AsyncConnDialect() 

230 

231 

232class FallbackAsyncAdaptedQueuePool(AsyncAdaptedQueuePool): 

233 _queue_class = sqla_queue.FallbackAsyncAdaptedQueue 

234 

235 

236class NullPool(Pool): 

237 

238 """A Pool which does not pool connections. 

239 

240 Instead it literally opens and closes the underlying DB-API connection 

241 per each connection open/close. 

242 

243 Reconnect-related functions such as ``recycle`` and connection 

244 invalidation are not supported by this Pool implementation, since 

245 no connections are held persistently. 

246 

247 """ 

248 

249 def status(self): 

250 return "NullPool" 

251 

252 def _do_return_conn(self, conn): 

253 conn.close() 

254 

255 def _do_get(self): 

256 return self._create_connection() 

257 

258 def recreate(self): 

259 self.logger.info("Pool recreating") 

260 

261 return self.__class__( 

262 self._creator, 

263 recycle=self._recycle, 

264 echo=self.echo, 

265 logging_name=self._orig_logging_name, 

266 reset_on_return=self._reset_on_return, 

267 pre_ping=self._pre_ping, 

268 _dispatch=self.dispatch, 

269 dialect=self._dialect, 

270 ) 

271 

272 def dispose(self): 

273 pass 

274 

275 

276class SingletonThreadPool(Pool): 

277 

278 """A Pool that maintains one connection per thread. 

279 

280 Maintains one connection per each thread, never moving a connection to a 

281 thread other than the one which it was created in. 

282 

283 .. warning:: the :class:`.SingletonThreadPool` will call ``.close()`` 

284 on arbitrary connections that exist beyond the size setting of 

285 ``pool_size``, e.g. if more unique **thread identities** 

286 than what ``pool_size`` states are used. This cleanup is 

287 non-deterministic and not sensitive to whether or not the connections 

288 linked to those thread identities are currently in use. 

289 

290 :class:`.SingletonThreadPool` may be improved in a future release, 

291 however in its current status it is generally used only for test 

292 scenarios using a SQLite ``:memory:`` database and is not recommended 

293 for production use. 

294 

295 

296 Options are the same as those of :class:`_pool.Pool`, as well as: 

297 

298 :param pool_size: The number of threads in which to maintain connections 

299 at once. Defaults to five. 

300 

301 :class:`.SingletonThreadPool` is used by the SQLite dialect 

302 automatically when a memory-based database is used. 

303 See :ref:`sqlite_toplevel`. 

304 

305 """ 

306 

307 _is_asyncio = False 

308 

309 def __init__(self, creator, pool_size=5, **kw): 

310 Pool.__init__(self, creator, **kw) 

311 self._conn = threading.local() 

312 self._fairy = threading.local() 

313 self._all_conns = set() 

314 self.size = pool_size 

315 

316 def recreate(self): 

317 self.logger.info("Pool recreating") 

318 return self.__class__( 

319 self._creator, 

320 pool_size=self.size, 

321 recycle=self._recycle, 

322 echo=self.echo, 

323 pre_ping=self._pre_ping, 

324 logging_name=self._orig_logging_name, 

325 reset_on_return=self._reset_on_return, 

326 _dispatch=self.dispatch, 

327 dialect=self._dialect, 

328 ) 

329 

330 def dispose(self): 

331 """Dispose of this pool.""" 

332 

333 for conn in self._all_conns: 

334 try: 

335 conn.close() 

336 except Exception: 

337 # pysqlite won't even let you close a conn from a thread 

338 # that didn't create it 

339 pass 

340 

341 self._all_conns.clear() 

342 

343 def _cleanup(self): 

344 while len(self._all_conns) >= self.size: 

345 c = self._all_conns.pop() 

346 c.close() 

347 

348 def status(self): 

349 return "SingletonThreadPool id:%d size: %d" % ( 

350 id(self), 

351 len(self._all_conns), 

352 ) 

353 

354 def _do_return_conn(self, conn): 

355 pass 

356 

357 def _do_get(self): 

358 try: 

359 c = self._conn.current() 

360 if c: 

361 return c 

362 except AttributeError: 

363 pass 

364 c = self._create_connection() 

365 self._conn.current = weakref.ref(c) 

366 if len(self._all_conns) >= self.size: 

367 self._cleanup() 

368 self._all_conns.add(c) 

369 return c 

370 

371 def connect(self): 

372 # vendored from Pool to include the now removed use_threadlocal 

373 # behavior 

374 try: 

375 rec = self._fairy.current() 

376 except AttributeError: 

377 pass 

378 else: 

379 if rec is not None: 

380 return rec._checkout_existing() 

381 

382 return _ConnectionFairy._checkout(self, self._fairy) 

383 

384 def _return_conn(self, record): 

385 try: 

386 del self._fairy.current 

387 except AttributeError: 

388 pass 

389 self._do_return_conn(record) 

390 

391 

392class StaticPool(Pool): 

393 

394 """A Pool of exactly one connection, used for all requests. 

395 

396 Reconnect-related functions such as ``recycle`` and connection 

397 invalidation (which is also used to support auto-reconnect) are only 

398 partially supported right now and may not yield good results. 

399 

400 

401 """ 

402 

403 @util.memoized_property 

404 def connection(self): 

405 return _ConnectionRecord(self) 

406 

407 def status(self): 

408 return "StaticPool" 

409 

410 def dispose(self): 

411 if ( 

412 "connection" in self.__dict__ 

413 and self.connection.dbapi_connection is not None 

414 ): 

415 self.connection.close() 

416 del self.__dict__["connection"] 

417 

418 def recreate(self): 

419 self.logger.info("Pool recreating") 

420 return self.__class__( 

421 creator=self._creator, 

422 recycle=self._recycle, 

423 reset_on_return=self._reset_on_return, 

424 pre_ping=self._pre_ping, 

425 echo=self.echo, 

426 logging_name=self._orig_logging_name, 

427 _dispatch=self.dispatch, 

428 dialect=self._dialect, 

429 ) 

430 

431 def _transfer_from(self, other_static_pool): 

432 # used by the test suite to make a new engine / pool without 

433 # losing the state of an existing SQLite :memory: connection 

434 self._invoke_creator = ( 

435 lambda crec: other_static_pool.connection.dbapi_connection 

436 ) 

437 

438 def _create_connection(self): 

439 raise NotImplementedError() 

440 

441 def _do_return_conn(self, conn): 

442 pass 

443 

444 def _do_get(self): 

445 rec = self.connection 

446 if rec._is_hard_or_soft_invalidated(): 

447 del self.__dict__["connection"] 

448 rec = self.connection 

449 

450 return rec 

451 

452 

453class AssertionPool(Pool): 

454 

455 """A :class:`_pool.Pool` that allows at most one checked out connection at 

456 any given time. 

457 

458 This will raise an exception if more than one connection is checked out 

459 at a time. Useful for debugging code that is using more connections 

460 than desired. 

461 

462 """ 

463 

464 def __init__(self, *args, **kw): 

465 self._conn = None 

466 self._checked_out = False 

467 self._store_traceback = kw.pop("store_traceback", True) 

468 self._checkout_traceback = None 

469 Pool.__init__(self, *args, **kw) 

470 

471 def status(self): 

472 return "AssertionPool" 

473 

474 def _do_return_conn(self, conn): 

475 if not self._checked_out: 

476 raise AssertionError("connection is not checked out") 

477 self._checked_out = False 

478 assert conn is self._conn 

479 

480 def dispose(self): 

481 self._checked_out = False 

482 if self._conn: 

483 self._conn.close() 

484 

485 def recreate(self): 

486 self.logger.info("Pool recreating") 

487 return self.__class__( 

488 self._creator, 

489 echo=self.echo, 

490 pre_ping=self._pre_ping, 

491 recycle=self._recycle, 

492 reset_on_return=self._reset_on_return, 

493 logging_name=self._orig_logging_name, 

494 _dispatch=self.dispatch, 

495 dialect=self._dialect, 

496 ) 

497 

498 def _do_get(self): 

499 if self._checked_out: 

500 if self._checkout_traceback: 

501 suffix = " at:\n%s" % "".join( 

502 chop_traceback(self._checkout_traceback) 

503 ) 

504 else: 

505 suffix = "" 

506 raise AssertionError("connection is already checked out" + suffix) 

507 

508 if not self._conn: 

509 self._conn = self._create_connection() 

510 

511 self._checked_out = True 

512 if self._store_traceback: 

513 self._checkout_traceback = traceback.format_stack() 

514 return self._conn