Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/pool/impl.py: 44%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

243 statements  

1# pool/impl.py 

2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8 

9"""Pool implementation classes.""" 

10 

11from __future__ import annotations 

12 

13import threading 

14import traceback 

15import typing 

16from typing import Any 

17from typing import cast 

18from typing import List 

19from typing import Literal 

20from typing import Optional 

21from typing import Set 

22from typing import Type 

23from typing import TYPE_CHECKING 

24from typing import Union 

25import weakref 

26 

27from .base import _AsyncConnDialect 

28from .base import _ConnectionFairy 

29from .base import _ConnectionRecord 

30from .base import _CreatorFnType 

31from .base import _CreatorWRecFnType 

32from .base import ConnectionPoolEntry 

33from .base import Pool 

34from .base import PoolProxiedConnection 

35from .. import exc 

36from .. import util 

37from ..util import chop_traceback 

38from ..util import queue as sqla_queue 

39 

40if typing.TYPE_CHECKING: 

41 from ..engine.interfaces import DBAPIConnection 

42 

43 

44class QueuePool(Pool): 

45 """A :class:`_pool.Pool` 

46 that imposes a limit on the number of open connections. 

47 

48 :class:`.QueuePool` is the default pooling implementation used for 

49 all :class:`_engine.Engine` objects other than SQLite with a ``:memory:`` 

50 database. 

51 

52 The :class:`.QueuePool` class **is not compatible** with asyncio and 

53 :func:`_asyncio.create_async_engine`. The 

54 :class:`.AsyncAdaptedQueuePool` class is used automatically when 

55 using :func:`_asyncio.create_async_engine`, if no other kind of pool 

56 is specified. 

57 

58 .. seealso:: 

59 

60 :class:`.AsyncAdaptedQueuePool` 

61 

62 """ 

63 

64 _is_asyncio = False 

65 

66 _queue_class: Type[sqla_queue.QueueCommon[ConnectionPoolEntry]] = ( 

67 sqla_queue.Queue 

68 ) 

69 

70 _pool: sqla_queue.QueueCommon[ConnectionPoolEntry] 

71 

72 def __init__( 

73 self, 

74 creator: Union[_CreatorFnType, _CreatorWRecFnType], 

75 pool_size: int = 5, 

76 max_overflow: int = 10, 

77 timeout: float = 30.0, 

78 use_lifo: bool = False, 

79 **kw: Any, 

80 ): 

81 r""" 

82 Construct a QueuePool. 

83 

84 :param creator: a callable function that returns a DB-API 

85 connection object, same as that of :paramref:`_pool.Pool.creator`. 

86 

87 :param pool_size: The size of the pool to be maintained, 

88 defaults to 5. This is the largest number of connections that 

89 will be kept persistently in the pool. Note that the pool 

90 begins with no connections; once this number of connections 

91 is requested, that number of connections will remain. 

92 ``pool_size`` can be set to 0 to indicate no size limit; to 

93 disable pooling, use a :class:`~sqlalchemy.pool.NullPool` 

94 instead. 

95 

96 :param max_overflow: The maximum overflow size of the 

97 pool. When the number of checked-out connections reaches the 

98 size set in pool_size, additional connections will be 

99 returned up to this limit. When those additional connections 

100 are returned to the pool, they are disconnected and 

101 discarded. It follows then that the total number of 

102 simultaneous connections the pool will allow is pool_size + 

103 `max_overflow`, and the total number of "sleeping" 

104 connections the pool will allow is pool_size. `max_overflow` 

105 can be set to -1 to indicate no overflow limit; no limit 

106 will be placed on the total number of concurrent 

107 connections. Defaults to 10. 

108 

109 :param timeout: The number of seconds to wait before giving up 

110 on returning a connection. Defaults to 30.0. This can be a float 

111 but is subject to the limitations of Python time functions which 

112 may not be reliable in the tens of milliseconds. 

113 

114 :param use_lifo: use LIFO (last-in-first-out) when retrieving 

115 connections instead of FIFO (first-in-first-out). Using LIFO, a 

116 server-side timeout scheme can reduce the number of connections used 

117 during non-peak periods of use. When planning for server-side 

118 timeouts, ensure that a recycle or pre-ping strategy is in use to 

119 gracefully handle stale connections. 

120 

121 .. seealso:: 

122 

123 :ref:`pool_use_lifo` 

124 

125 :ref:`pool_disconnects` 

126 

127 :param \**kw: Other keyword arguments including 

128 :paramref:`_pool.Pool.recycle`, :paramref:`_pool.Pool.echo`, 

129 :paramref:`_pool.Pool.reset_on_return` and others are passed to the 

130 :class:`_pool.Pool` constructor. 

131 

132 """ 

133 

134 Pool.__init__(self, creator, **kw) 

135 self._pool = self._queue_class(pool_size, use_lifo=use_lifo) 

136 self._overflow = 0 - pool_size 

137 self._max_overflow = -1 if pool_size == 0 else max_overflow 

138 self._timeout = timeout 

139 self._overflow_lock = threading.Lock() 

140 

141 def _do_return_conn(self, record: ConnectionPoolEntry) -> None: 

142 try: 

143 self._pool.put(record, False) 

144 except sqla_queue.Full: 

145 try: 

146 record.close() 

147 finally: 

148 self._dec_overflow() 

149 

150 def _do_get(self) -> ConnectionPoolEntry: 

151 use_overflow = self._max_overflow > -1 

152 

153 wait = use_overflow and self._overflow >= self._max_overflow 

154 try: 

155 return self._pool.get(wait, self._timeout) 

156 except sqla_queue.Empty: 

157 # don't do things inside of "except Empty", because when we say 

158 # we timed out or can't connect and raise, Python 3 tells 

159 # people the real error is queue.Empty which it isn't. 

160 pass 

161 if use_overflow and self._overflow >= self._max_overflow: 

162 if not wait: 

163 return self._do_get() 

164 else: 

165 raise exc.TimeoutError( 

166 "QueuePool limit of size %d overflow %d reached, " 

167 "connection timed out, timeout %0.2f" 

168 % (self.size(), self.overflow(), self._timeout), 

169 code="3o7r", 

170 ) 

171 

172 if self._inc_overflow(): 

173 try: 

174 return self._create_connection() 

175 except: 

176 with util.safe_reraise(): 

177 self._dec_overflow() 

178 raise 

179 else: 

180 return self._do_get() 

181 

182 def _inc_overflow(self) -> bool: 

183 if self._max_overflow == -1: 

184 self._overflow += 1 

185 return True 

186 with self._overflow_lock: 

187 if self._overflow < self._max_overflow: 

188 self._overflow += 1 

189 return True 

190 else: 

191 return False 

192 

193 def _dec_overflow(self) -> Literal[True]: 

194 if self._max_overflow == -1: 

195 self._overflow -= 1 

196 return True 

197 with self._overflow_lock: 

198 self._overflow -= 1 

199 return True 

200 

201 def recreate(self) -> QueuePool: 

202 self.logger.info("Pool recreating") 

203 return self.__class__( 

204 self._creator, 

205 pool_size=self._pool.maxsize, 

206 max_overflow=self._max_overflow, 

207 pre_ping=self._pre_ping, 

208 use_lifo=self._pool.use_lifo, 

209 timeout=self._timeout, 

210 recycle=self._recycle, 

211 echo=self.echo, 

212 logging_name=self._orig_logging_name, 

213 reset_on_return=self._reset_on_return, 

214 _dispatch=self.dispatch, 

215 dialect=self._dialect, 

216 ) 

217 

218 def dispose(self) -> None: 

219 while True: 

220 try: 

221 conn = self._pool.get(False) 

222 conn.close() 

223 except sqla_queue.Empty: 

224 break 

225 

226 self._overflow = 0 - self.size() 

227 self.logger.info("Pool disposed. %s", self.status()) 

228 

229 def status(self) -> str: 

230 return ( 

231 "Pool size: %d Connections in pool: %d " 

232 "Current Overflow: %d Current Checked out " 

233 "connections: %d" 

234 % ( 

235 self.size(), 

236 self.checkedin(), 

237 self.overflow(), 

238 self.checkedout(), 

239 ) 

240 ) 

241 

242 def size(self) -> int: 

243 return self._pool.maxsize 

244 

245 def timeout(self) -> float: 

246 return self._timeout 

247 

248 def checkedin(self) -> int: 

249 return self._pool.qsize() 

250 

251 def overflow(self) -> int: 

252 return self._overflow if self._pool.maxsize else 0 

253 

254 def checkedout(self) -> int: 

255 return self._pool.maxsize - self._pool.qsize() + self._overflow 

256 

257 

258class AsyncAdaptedQueuePool(QueuePool): 

259 """An asyncio-compatible version of :class:`.QueuePool`. 

260 

261 This pool is used by default when using :class:`.AsyncEngine` engines that 

262 were generated from :func:`_asyncio.create_async_engine`. It uses an 

263 asyncio-compatible queue implementation that does not use 

264 ``threading.Lock``. 

265 

266 The arguments and operation of :class:`.AsyncAdaptedQueuePool` are 

267 otherwise identical to that of :class:`.QueuePool`. 

268 

269 """ 

270 

271 _is_asyncio = True 

272 _queue_class: Type[sqla_queue.QueueCommon[ConnectionPoolEntry]] = ( 

273 sqla_queue.AsyncAdaptedQueue 

274 ) 

275 

276 _dialect = _AsyncConnDialect() 

277 

278 

279class NullPool(Pool): 

280 """A Pool which does not pool connections. 

281 

282 Instead it literally opens and closes the underlying DB-API connection 

283 per each connection open/close. 

284 

285 Reconnect-related functions such as ``recycle`` and connection 

286 invalidation are not supported by this Pool implementation, since 

287 no connections are held persistently. 

288 

289 The :class:`.NullPool` class **is compatible** with asyncio and 

290 :func:`_asyncio.create_async_engine`. 

291 

292 """ 

293 

294 def status(self) -> str: 

295 return "NullPool" 

296 

297 def _do_return_conn(self, record: ConnectionPoolEntry) -> None: 

298 record.close() 

299 

300 def _do_get(self) -> ConnectionPoolEntry: 

301 return self._create_connection() 

302 

303 def recreate(self) -> NullPool: 

304 self.logger.info("Pool recreating") 

305 

306 return self.__class__( 

307 self._creator, 

308 recycle=self._recycle, 

309 echo=self.echo, 

310 logging_name=self._orig_logging_name, 

311 reset_on_return=self._reset_on_return, 

312 pre_ping=self._pre_ping, 

313 _dispatch=self.dispatch, 

314 dialect=self._dialect, 

315 ) 

316 

317 def dispose(self) -> None: 

318 pass 

319 

320 

321class SingletonThreadPool(Pool): 

322 """A Pool that maintains one connection per thread. 

323 

324 Maintains one connection per each thread, never moving a connection to a 

325 thread other than the one which it was created in. 

326 

327 .. warning:: the :class:`.SingletonThreadPool` will call ``.close()`` 

328 on arbitrary connections that exist beyond the size setting of 

329 ``pool_size``, e.g. if more unique **thread identities** 

330 than what ``pool_size`` states are used. This cleanup is 

331 non-deterministic and not sensitive to whether or not the connections 

332 linked to those thread identities are currently in use. 

333 

334 :class:`.SingletonThreadPool` may be improved in a future release, 

335 however in its current status it is generally used only for test 

336 scenarios using a SQLite ``:memory:`` database and is not recommended 

337 for production use. 

338 

339 The :class:`.SingletonThreadPool` class **is not compatible** with asyncio 

340 and :func:`_asyncio.create_async_engine`. 

341 

342 

343 Options are the same as those of :class:`_pool.Pool`, as well as: 

344 

345 :param pool_size: The number of threads in which to maintain connections 

346 at once. Defaults to five. 

347 

348 :class:`.SingletonThreadPool` is used by the SQLite dialect 

349 automatically when a memory-based database is used. 

350 See :ref:`sqlite_toplevel`. 

351 

352 """ 

353 

354 _is_asyncio = False 

355 

356 def __init__( 

357 self, 

358 creator: Union[_CreatorFnType, _CreatorWRecFnType], 

359 pool_size: int = 5, 

360 **kw: Any, 

361 ): 

362 Pool.__init__(self, creator, **kw) 

363 self._conn = threading.local() 

364 self._fairy = threading.local() 

365 self._all_conns: Set[ConnectionPoolEntry] = set() 

366 self.size = pool_size 

367 

368 def recreate(self) -> SingletonThreadPool: 

369 self.logger.info("Pool recreating") 

370 return self.__class__( 

371 self._creator, 

372 pool_size=self.size, 

373 recycle=self._recycle, 

374 echo=self.echo, 

375 pre_ping=self._pre_ping, 

376 logging_name=self._orig_logging_name, 

377 reset_on_return=self._reset_on_return, 

378 _dispatch=self.dispatch, 

379 dialect=self._dialect, 

380 ) 

381 

382 def _transfer_from( 

383 self, other_singleton_pool: SingletonThreadPool 

384 ) -> None: 

385 # used by the test suite to make a new engine / pool without 

386 # losing the state of an existing SQLite :memory: connection 

387 assert not hasattr(other_singleton_pool._fairy, "current") 

388 self._conn = other_singleton_pool._conn 

389 self._all_conns = other_singleton_pool._all_conns 

390 

391 def dispose(self) -> None: 

392 """Dispose of this pool.""" 

393 

394 for conn in self._all_conns: 

395 try: 

396 conn.close() 

397 except Exception: 

398 # pysqlite won't even let you close a conn from a thread 

399 # that didn't create it 

400 pass 

401 

402 self._all_conns.clear() 

403 

404 def _cleanup(self) -> None: 

405 while len(self._all_conns) >= self.size: 

406 c = self._all_conns.pop() 

407 c.close() 

408 

409 def status(self) -> str: 

410 return "SingletonThreadPool id:%d size: %d" % ( 

411 id(self), 

412 len(self._all_conns), 

413 ) 

414 

415 def _do_return_conn(self, record: ConnectionPoolEntry) -> None: 

416 try: 

417 del self._fairy.current 

418 except AttributeError: 

419 pass 

420 

421 def _do_get(self) -> ConnectionPoolEntry: 

422 try: 

423 if TYPE_CHECKING: 

424 c = cast(ConnectionPoolEntry, self._conn.current()) 

425 else: 

426 c = self._conn.current() 

427 if c: 

428 return c 

429 except AttributeError: 

430 pass 

431 c = self._create_connection() 

432 self._conn.current = weakref.ref(c) 

433 if len(self._all_conns) >= self.size: 

434 self._cleanup() 

435 self._all_conns.add(c) 

436 return c 

437 

438 def connect(self) -> PoolProxiedConnection: 

439 # vendored from Pool to include the now removed use_threadlocal 

440 # behavior 

441 try: 

442 rec = cast(_ConnectionFairy, self._fairy.current()) 

443 except AttributeError: 

444 pass 

445 else: 

446 if rec is not None: 

447 return rec._checkout_existing() 

448 

449 return _ConnectionFairy._checkout(self, self._fairy) 

450 

451 

452class StaticPool(Pool): 

453 """A Pool of exactly one connection, used for all requests. 

454 

455 Reconnect-related functions such as ``recycle`` and connection 

456 invalidation (which is also used to support auto-reconnect) are only 

457 partially supported right now and may not yield good results. 

458 

459 The :class:`.StaticPool` class **is compatible** with asyncio and 

460 :func:`_asyncio.create_async_engine`. 

461 

462 """ 

463 

464 @util.memoized_property 

465 def connection(self) -> _ConnectionRecord: 

466 return _ConnectionRecord(self) 

467 

468 def status(self) -> str: 

469 return "StaticPool" 

470 

471 def dispose(self) -> None: 

472 if ( 

473 "connection" in self.__dict__ 

474 and self.connection.dbapi_connection is not None 

475 ): 

476 self.connection.close() 

477 del self.__dict__["connection"] 

478 

479 def recreate(self) -> StaticPool: 

480 self.logger.info("Pool recreating") 

481 return self.__class__( 

482 creator=self._creator, 

483 recycle=self._recycle, 

484 reset_on_return=self._reset_on_return, 

485 pre_ping=self._pre_ping, 

486 echo=self.echo, 

487 logging_name=self._orig_logging_name, 

488 _dispatch=self.dispatch, 

489 dialect=self._dialect, 

490 ) 

491 

492 def _transfer_from(self, other_static_pool: StaticPool) -> None: 

493 # used by the test suite to make a new engine / pool without 

494 # losing the state of an existing SQLite :memory: connection 

495 def creator(rec: ConnectionPoolEntry) -> DBAPIConnection: 

496 conn = other_static_pool.connection.dbapi_connection 

497 assert conn is not None 

498 return conn 

499 

500 self._invoke_creator = creator 

501 

502 def _create_connection(self) -> ConnectionPoolEntry: 

503 raise NotImplementedError() 

504 

505 def _do_return_conn(self, record: ConnectionPoolEntry) -> None: 

506 pass 

507 

508 def _do_get(self) -> ConnectionPoolEntry: 

509 rec = self.connection 

510 if rec._is_hard_or_soft_invalidated(): 

511 del self.__dict__["connection"] 

512 rec = self.connection 

513 

514 return rec 

515 

516 

517class AssertionPool(Pool): 

518 """A :class:`_pool.Pool` that allows at most one checked out connection at 

519 any given time. 

520 

521 This will raise an exception if more than one connection is checked out 

522 at a time. Useful for debugging code that is using more connections 

523 than desired. 

524 

525 The :class:`.AssertionPool` class **is compatible** with asyncio and 

526 :func:`_asyncio.create_async_engine`. 

527 

528 """ 

529 

530 _conn: Optional[ConnectionPoolEntry] 

531 _checkout_traceback: Optional[List[str]] 

532 

533 def __init__(self, *args: Any, **kw: Any): 

534 self._conn = None 

535 self._checked_out = False 

536 self._store_traceback = kw.pop("store_traceback", True) 

537 self._checkout_traceback = None 

538 Pool.__init__(self, *args, **kw) 

539 

540 def status(self) -> str: 

541 return "AssertionPool" 

542 

543 def _do_return_conn(self, record: ConnectionPoolEntry) -> None: 

544 if not self._checked_out: 

545 raise AssertionError("connection is not checked out") 

546 self._checked_out = False 

547 assert record is self._conn 

548 

549 def dispose(self) -> None: 

550 self._checked_out = False 

551 if self._conn: 

552 self._conn.close() 

553 

554 def recreate(self) -> AssertionPool: 

555 self.logger.info("Pool recreating") 

556 return self.__class__( 

557 self._creator, 

558 echo=self.echo, 

559 pre_ping=self._pre_ping, 

560 recycle=self._recycle, 

561 reset_on_return=self._reset_on_return, 

562 logging_name=self._orig_logging_name, 

563 _dispatch=self.dispatch, 

564 dialect=self._dialect, 

565 ) 

566 

567 def _do_get(self) -> ConnectionPoolEntry: 

568 if self._checked_out: 

569 if self._checkout_traceback: 

570 suffix = " at:\n%s" % "".join( 

571 chop_traceback(self._checkout_traceback) 

572 ) 

573 else: 

574 suffix = "" 

575 raise AssertionError("connection is already checked out" + suffix) 

576 

577 if not self._conn: 

578 self._conn = self._create_connection() 

579 

580 self._checked_out = True 

581 if self._store_traceback: 

582 self._checkout_traceback = traceback.format_stack() 

583 return self._conn