Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/pool/impl.py: 54%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

245 statements  

1# pool/impl.py 

2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8 

9"""Pool implementation classes.""" 

10 

11from __future__ import annotations 

12 

13import threading 

14import traceback 

15import typing 

16from typing import Any 

17from typing import cast 

18from typing import List 

19from typing import Optional 

20from typing import Set 

21from typing import Type 

22from typing import TYPE_CHECKING 

23from typing import Union 

24import weakref 

25 

26from .base import _AsyncConnDialect 

27from .base import _ConnectionFairy 

28from .base import _ConnectionRecord 

29from .base import _CreatorFnType 

30from .base import _CreatorWRecFnType 

31from .base import ConnectionPoolEntry 

32from .base import Pool 

33from .base import PoolProxiedConnection 

34from .. import exc 

35from .. import util 

36from ..util import chop_traceback 

37from ..util import queue as sqla_queue 

38from ..util.typing import Literal 

39 

40if typing.TYPE_CHECKING: 

41 from ..engine.interfaces import DBAPIConnection 

42 

43 

44class QueuePool(Pool): 

45 """A :class:`_pool.Pool` 

46 that imposes a limit on the number of open connections. 

47 

48 :class:`.QueuePool` is the default pooling implementation used for 

49 all :class:`_engine.Engine` objects other than SQLite with a ``:memory:`` 

50 database. 

51 

52 The :class:`.QueuePool` class **is not compatible** with asyncio and 

53 :func:`_asyncio.create_async_engine`. The 

54 :class:`.AsyncAdaptedQueuePool` class is used automatically when 

55 using :func:`_asyncio.create_async_engine`, if no other kind of pool 

56 is specified. 

57 

58 .. seealso:: 

59 

60 :class:`.AsyncAdaptedQueuePool` 

61 

62 """ 

63 

64 _is_asyncio = False 

65 

66 _queue_class: Type[sqla_queue.QueueCommon[ConnectionPoolEntry]] = ( 

67 sqla_queue.Queue 

68 ) 

69 

70 _pool: sqla_queue.QueueCommon[ConnectionPoolEntry] 

71 

72 def __init__( 

73 self, 

74 creator: Union[_CreatorFnType, _CreatorWRecFnType], 

75 pool_size: int = 5, 

76 max_overflow: int = 10, 

77 timeout: float = 30.0, 

78 use_lifo: bool = False, 

79 **kw: Any, 

80 ): 

81 r""" 

82 Construct a QueuePool. 

83 

84 :param creator: a callable function that returns a DB-API 

85 connection object, same as that of :paramref:`_pool.Pool.creator`. 

86 

87 :param pool_size: The size of the pool to be maintained, 

88 defaults to 5. This is the largest number of connections that 

89 will be kept persistently in the pool. Note that the pool 

90 begins with no connections; once this number of connections 

91 is requested, that number of connections will remain. 

92 ``pool_size`` can be set to 0 to indicate no size limit; to 

93 disable pooling, use a :class:`~sqlalchemy.pool.NullPool` 

94 instead. 

95 

96 :param max_overflow: The maximum overflow size of the 

97 pool. When the number of checked-out connections reaches the 

98 size set in pool_size, additional connections will be 

99 returned up to this limit. When those additional connections 

100 are returned to the pool, they are disconnected and 

101 discarded. It follows then that the total number of 

102 simultaneous connections the pool will allow is pool_size + 

103 `max_overflow`, and the total number of "sleeping" 

104 connections the pool will allow is pool_size. `max_overflow` 

105 can be set to -1 to indicate no overflow limit; no limit 

106 will be placed on the total number of concurrent 

107 connections. Defaults to 10. 

108 

109 :param timeout: The number of seconds to wait before giving up 

110 on returning a connection. Defaults to 30.0. This can be a float 

111 but is subject to the limitations of Python time functions which 

112 may not be reliable in the tens of milliseconds. 

113 

114 :param use_lifo: use LIFO (last-in-first-out) when retrieving 

115 connections instead of FIFO (first-in-first-out). Using LIFO, a 

116 server-side timeout scheme can reduce the number of connections used 

117 during non-peak periods of use. When planning for server-side 

118 timeouts, ensure that a recycle or pre-ping strategy is in use to 

119 gracefully handle stale connections. 

120 

121 .. versionadded:: 1.3 

122 

123 .. seealso:: 

124 

125 :ref:`pool_use_lifo` 

126 

127 :ref:`pool_disconnects` 

128 

129 :param \**kw: Other keyword arguments including 

130 :paramref:`_pool.Pool.recycle`, :paramref:`_pool.Pool.echo`, 

131 :paramref:`_pool.Pool.reset_on_return` and others are passed to the 

132 :class:`_pool.Pool` constructor. 

133 

134 """ 

135 

136 Pool.__init__(self, creator, **kw) 

137 self._pool = self._queue_class(pool_size, use_lifo=use_lifo) 

138 self._overflow = 0 - pool_size 

139 self._max_overflow = -1 if pool_size == 0 else max_overflow 

140 self._timeout = timeout 

141 self._overflow_lock = threading.Lock() 

142 

143 def _do_return_conn(self, record: ConnectionPoolEntry) -> None: 

144 try: 

145 self._pool.put(record, False) 

146 except sqla_queue.Full: 

147 try: 

148 record.close() 

149 finally: 

150 self._dec_overflow() 

151 

152 def _do_get(self) -> ConnectionPoolEntry: 

153 use_overflow = self._max_overflow > -1 

154 

155 wait = use_overflow and self._overflow >= self._max_overflow 

156 try: 

157 return self._pool.get(wait, self._timeout) 

158 except sqla_queue.Empty: 

159 # don't do things inside of "except Empty", because when we say 

160 # we timed out or can't connect and raise, Python 3 tells 

161 # people the real error is queue.Empty which it isn't. 

162 pass 

163 if use_overflow and self._overflow >= self._max_overflow: 

164 if not wait: 

165 return self._do_get() 

166 else: 

167 raise exc.TimeoutError( 

168 "QueuePool limit of size %d overflow %d reached, " 

169 "connection timed out, timeout %0.2f" 

170 % (self.size(), self.overflow(), self._timeout), 

171 code="3o7r", 

172 ) 

173 

174 if self._inc_overflow(): 

175 try: 

176 return self._create_connection() 

177 except: 

178 with util.safe_reraise(): 

179 self._dec_overflow() 

180 raise 

181 else: 

182 return self._do_get() 

183 

184 def _inc_overflow(self) -> bool: 

185 if self._max_overflow == -1: 

186 self._overflow += 1 

187 return True 

188 with self._overflow_lock: 

189 if self._overflow < self._max_overflow: 

190 self._overflow += 1 

191 return True 

192 else: 

193 return False 

194 

195 def _dec_overflow(self) -> Literal[True]: 

196 if self._max_overflow == -1: 

197 self._overflow -= 1 

198 return True 

199 with self._overflow_lock: 

200 self._overflow -= 1 

201 return True 

202 

203 def recreate(self) -> QueuePool: 

204 self.logger.info("Pool recreating") 

205 return self.__class__( 

206 self._creator, 

207 pool_size=self._pool.maxsize, 

208 max_overflow=self._max_overflow, 

209 pre_ping=self._pre_ping, 

210 use_lifo=self._pool.use_lifo, 

211 timeout=self._timeout, 

212 recycle=self._recycle, 

213 echo=self.echo, 

214 logging_name=self._orig_logging_name, 

215 reset_on_return=self._reset_on_return, 

216 _dispatch=self.dispatch, 

217 dialect=self._dialect, 

218 ) 

219 

220 def dispose(self) -> None: 

221 while True: 

222 try: 

223 conn = self._pool.get(False) 

224 conn.close() 

225 except sqla_queue.Empty: 

226 break 

227 

228 self._overflow = 0 - self.size() 

229 self.logger.info("Pool disposed. %s", self.status()) 

230 

231 def status(self) -> str: 

232 return ( 

233 "Pool size: %d Connections in pool: %d " 

234 "Current Overflow: %d Current Checked out " 

235 "connections: %d" 

236 % ( 

237 self.size(), 

238 self.checkedin(), 

239 self.overflow(), 

240 self.checkedout(), 

241 ) 

242 ) 

243 

244 def size(self) -> int: 

245 return self._pool.maxsize 

246 

247 def timeout(self) -> float: 

248 return self._timeout 

249 

250 def checkedin(self) -> int: 

251 return self._pool.qsize() 

252 

253 def overflow(self) -> int: 

254 return self._overflow if self._pool.maxsize else 0 

255 

256 def checkedout(self) -> int: 

257 return self._pool.maxsize - self._pool.qsize() + self._overflow 

258 

259 

260class AsyncAdaptedQueuePool(QueuePool): 

261 """An asyncio-compatible version of :class:`.QueuePool`. 

262 

263 This pool is used by default when using :class:`.AsyncEngine` engines that 

264 were generated from :func:`_asyncio.create_async_engine`. It uses an 

265 asyncio-compatible queue implementation that does not use 

266 ``threading.Lock``. 

267 

268 The arguments and operation of :class:`.AsyncAdaptedQueuePool` are 

269 otherwise identical to that of :class:`.QueuePool`. 

270 

271 """ 

272 

273 _is_asyncio = True 

274 _queue_class: Type[sqla_queue.QueueCommon[ConnectionPoolEntry]] = ( 

275 sqla_queue.AsyncAdaptedQueue 

276 ) 

277 

278 _dialect = _AsyncConnDialect() 

279 

280 

281class FallbackAsyncAdaptedQueuePool(AsyncAdaptedQueuePool): 

282 _queue_class = sqla_queue.FallbackAsyncAdaptedQueue # type: ignore[assignment] # noqa: E501 

283 

284 

285class NullPool(Pool): 

286 """A Pool which does not pool connections. 

287 

288 Instead it literally opens and closes the underlying DB-API connection 

289 per each connection open/close. 

290 

291 Reconnect-related functions such as ``recycle`` and connection 

292 invalidation are not supported by this Pool implementation, since 

293 no connections are held persistently. 

294 

295 The :class:`.NullPool` class **is compatible** with asyncio and 

296 :func:`_asyncio.create_async_engine`. 

297 

298 """ 

299 

300 def status(self) -> str: 

301 return "NullPool" 

302 

303 def _do_return_conn(self, record: ConnectionPoolEntry) -> None: 

304 record.close() 

305 

306 def _do_get(self) -> ConnectionPoolEntry: 

307 return self._create_connection() 

308 

309 def recreate(self) -> NullPool: 

310 self.logger.info("Pool recreating") 

311 

312 return self.__class__( 

313 self._creator, 

314 recycle=self._recycle, 

315 echo=self.echo, 

316 logging_name=self._orig_logging_name, 

317 reset_on_return=self._reset_on_return, 

318 pre_ping=self._pre_ping, 

319 _dispatch=self.dispatch, 

320 dialect=self._dialect, 

321 ) 

322 

323 def dispose(self) -> None: 

324 pass 

325 

326 

327class SingletonThreadPool(Pool): 

328 """A Pool that maintains one connection per thread. 

329 

330 Maintains one connection per each thread, never moving a connection to a 

331 thread other than the one which it was created in. 

332 

333 .. warning:: the :class:`.SingletonThreadPool` will call ``.close()`` 

334 on arbitrary connections that exist beyond the size setting of 

335 ``pool_size``, e.g. if more unique **thread identities** 

336 than what ``pool_size`` states are used. This cleanup is 

337 non-deterministic and not sensitive to whether or not the connections 

338 linked to those thread identities are currently in use. 

339 

340 :class:`.SingletonThreadPool` may be improved in a future release, 

341 however in its current status it is generally used only for test 

342 scenarios using a SQLite ``:memory:`` database and is not recommended 

343 for production use. 

344 

345 The :class:`.SingletonThreadPool` class **is not compatible** with asyncio 

346 and :func:`_asyncio.create_async_engine`. 

347 

348 

349 Options are the same as those of :class:`_pool.Pool`, as well as: 

350 

351 :param pool_size: The number of threads in which to maintain connections 

352 at once. Defaults to five. 

353 

354 :class:`.SingletonThreadPool` is used by the SQLite dialect 

355 automatically when a memory-based database is used. 

356 See :ref:`sqlite_toplevel`. 

357 

358 """ 

359 

360 _is_asyncio = False 

361 

362 def __init__( 

363 self, 

364 creator: Union[_CreatorFnType, _CreatorWRecFnType], 

365 pool_size: int = 5, 

366 **kw: Any, 

367 ): 

368 Pool.__init__(self, creator, **kw) 

369 self._conn = threading.local() 

370 self._fairy = threading.local() 

371 self._all_conns: Set[ConnectionPoolEntry] = set() 

372 self.size = pool_size 

373 

374 def recreate(self) -> SingletonThreadPool: 

375 self.logger.info("Pool recreating") 

376 return self.__class__( 

377 self._creator, 

378 pool_size=self.size, 

379 recycle=self._recycle, 

380 echo=self.echo, 

381 pre_ping=self._pre_ping, 

382 logging_name=self._orig_logging_name, 

383 reset_on_return=self._reset_on_return, 

384 _dispatch=self.dispatch, 

385 dialect=self._dialect, 

386 ) 

387 

388 def _transfer_from( 

389 self, other_singleton_pool: SingletonThreadPool 

390 ) -> None: 

391 # used by the test suite to make a new engine / pool without 

392 # losing the state of an existing SQLite :memory: connection 

393 assert not hasattr(other_singleton_pool._fairy, "current") 

394 self._conn = other_singleton_pool._conn 

395 self._all_conns = other_singleton_pool._all_conns 

396 

397 def dispose(self) -> None: 

398 """Dispose of this pool.""" 

399 

400 for conn in self._all_conns: 

401 try: 

402 conn.close() 

403 except Exception: 

404 # pysqlite won't even let you close a conn from a thread 

405 # that didn't create it 

406 pass 

407 

408 self._all_conns.clear() 

409 

410 def _cleanup(self) -> None: 

411 while len(self._all_conns) >= self.size: 

412 c = self._all_conns.pop() 

413 c.close() 

414 

415 def status(self) -> str: 

416 return "SingletonThreadPool id:%d size: %d" % ( 

417 id(self), 

418 len(self._all_conns), 

419 ) 

420 

421 def _do_return_conn(self, record: ConnectionPoolEntry) -> None: 

422 try: 

423 del self._fairy.current 

424 except AttributeError: 

425 pass 

426 

427 def _do_get(self) -> ConnectionPoolEntry: 

428 try: 

429 if TYPE_CHECKING: 

430 c = cast(ConnectionPoolEntry, self._conn.current()) 

431 else: 

432 c = self._conn.current() 

433 if c: 

434 return c 

435 except AttributeError: 

436 pass 

437 c = self._create_connection() 

438 self._conn.current = weakref.ref(c) 

439 if len(self._all_conns) >= self.size: 

440 self._cleanup() 

441 self._all_conns.add(c) 

442 return c 

443 

444 def connect(self) -> PoolProxiedConnection: 

445 # vendored from Pool to include the now removed use_threadlocal 

446 # behavior 

447 try: 

448 rec = cast(_ConnectionFairy, self._fairy.current()) 

449 except AttributeError: 

450 pass 

451 else: 

452 if rec is not None: 

453 return rec._checkout_existing() 

454 

455 return _ConnectionFairy._checkout(self, self._fairy) 

456 

457 

458class StaticPool(Pool): 

459 """A Pool of exactly one connection, used for all requests. 

460 

461 Reconnect-related functions such as ``recycle`` and connection 

462 invalidation (which is also used to support auto-reconnect) are only 

463 partially supported right now and may not yield good results. 

464 

465 The :class:`.StaticPool` class **is compatible** with asyncio and 

466 :func:`_asyncio.create_async_engine`. 

467 

468 """ 

469 

470 @util.memoized_property 

471 def connection(self) -> _ConnectionRecord: 

472 return _ConnectionRecord(self) 

473 

474 def status(self) -> str: 

475 return "StaticPool" 

476 

477 def dispose(self) -> None: 

478 if ( 

479 "connection" in self.__dict__ 

480 and self.connection.dbapi_connection is not None 

481 ): 

482 self.connection.close() 

483 del self.__dict__["connection"] 

484 

485 def recreate(self) -> StaticPool: 

486 self.logger.info("Pool recreating") 

487 return self.__class__( 

488 creator=self._creator, 

489 recycle=self._recycle, 

490 reset_on_return=self._reset_on_return, 

491 pre_ping=self._pre_ping, 

492 echo=self.echo, 

493 logging_name=self._orig_logging_name, 

494 _dispatch=self.dispatch, 

495 dialect=self._dialect, 

496 ) 

497 

498 def _transfer_from(self, other_static_pool: StaticPool) -> None: 

499 # used by the test suite to make a new engine / pool without 

500 # losing the state of an existing SQLite :memory: connection 

501 def creator(rec: ConnectionPoolEntry) -> DBAPIConnection: 

502 conn = other_static_pool.connection.dbapi_connection 

503 assert conn is not None 

504 return conn 

505 

506 self._invoke_creator = creator 

507 

508 def _create_connection(self) -> ConnectionPoolEntry: 

509 raise NotImplementedError() 

510 

511 def _do_return_conn(self, record: ConnectionPoolEntry) -> None: 

512 pass 

513 

514 def _do_get(self) -> ConnectionPoolEntry: 

515 rec = self.connection 

516 if rec._is_hard_or_soft_invalidated(): 

517 del self.__dict__["connection"] 

518 rec = self.connection 

519 

520 return rec 

521 

522 

523class AssertionPool(Pool): 

524 """A :class:`_pool.Pool` that allows at most one checked out connection at 

525 any given time. 

526 

527 This will raise an exception if more than one connection is checked out 

528 at a time. Useful for debugging code that is using more connections 

529 than desired. 

530 

531 The :class:`.AssertionPool` class **is compatible** with asyncio and 

532 :func:`_asyncio.create_async_engine`. 

533 

534 """ 

535 

536 _conn: Optional[ConnectionPoolEntry] 

537 _checkout_traceback: Optional[List[str]] 

538 

539 def __init__(self, *args: Any, **kw: Any): 

540 self._conn = None 

541 self._checked_out = False 

542 self._store_traceback = kw.pop("store_traceback", True) 

543 self._checkout_traceback = None 

544 Pool.__init__(self, *args, **kw) 

545 

546 def status(self) -> str: 

547 return "AssertionPool" 

548 

549 def _do_return_conn(self, record: ConnectionPoolEntry) -> None: 

550 if not self._checked_out: 

551 raise AssertionError("connection is not checked out") 

552 self._checked_out = False 

553 assert record is self._conn 

554 

555 def dispose(self) -> None: 

556 self._checked_out = False 

557 if self._conn: 

558 self._conn.close() 

559 

560 def recreate(self) -> AssertionPool: 

561 self.logger.info("Pool recreating") 

562 return self.__class__( 

563 self._creator, 

564 echo=self.echo, 

565 pre_ping=self._pre_ping, 

566 recycle=self._recycle, 

567 reset_on_return=self._reset_on_return, 

568 logging_name=self._orig_logging_name, 

569 _dispatch=self.dispatch, 

570 dialect=self._dialect, 

571 ) 

572 

573 def _do_get(self) -> ConnectionPoolEntry: 

574 if self._checked_out: 

575 if self._checkout_traceback: 

576 suffix = " at:\n%s" % "".join( 

577 chop_traceback(self._checkout_traceback) 

578 ) 

579 else: 

580 suffix = "" 

581 raise AssertionError("connection is already checked out" + suffix) 

582 

583 if not self._conn: 

584 self._conn = self._create_connection() 

585 

586 self._checked_out = True 

587 if self._store_traceback: 

588 self._checkout_traceback = traceback.format_stack() 

589 return self._conn