Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/pool/impl.py: 44%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

243 statements  

1# pool/impl.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8 

9"""Pool implementation classes.""" 

10from __future__ import annotations 

11 

12import threading 

13import traceback 

14import typing 

15from typing import Any 

16from typing import cast 

17from typing import List 

18from typing import Literal 

19from typing import Optional 

20from typing import Set 

21from typing import Type 

22from typing import TYPE_CHECKING 

23from typing import Union 

24import weakref 

25 

26from .base import _AsyncConnDialect 

27from .base import _ConnectionFairy 

28from .base import _ConnectionRecord 

29from .base import _CreatorFnType 

30from .base import _CreatorWRecFnType 

31from .base import ConnectionPoolEntry 

32from .base import Pool 

33from .base import PoolProxiedConnection 

34from .. import exc 

35from .. import util 

36from ..util import chop_traceback 

37from ..util import queue as sqla_queue 

38 

39if typing.TYPE_CHECKING: 

40 from ..engine.interfaces import DBAPIConnection 

41 

42 

43class QueuePool(Pool): 

44 """A :class:`_pool.Pool` 

45 that imposes a limit on the number of open connections. 

46 

47 :class:`.QueuePool` is the default pooling implementation used for 

48 all :class:`_engine.Engine` objects other than SQLite with a ``:memory:`` 

49 database. 

50 

51 The :class:`.QueuePool` class **is not compatible** with asyncio and 

52 :func:`_asyncio.create_async_engine`. The 

53 :class:`.AsyncAdaptedQueuePool` class is used automatically when 

54 using :func:`_asyncio.create_async_engine`, if no other kind of pool 

55 is specified. 

56 

57 .. seealso:: 

58 

59 :class:`.AsyncAdaptedQueuePool` 

60 

61 """ 

62 

63 _is_asyncio = False 

64 

65 _queue_class: Type[sqla_queue.QueueCommon[ConnectionPoolEntry]] = ( 

66 sqla_queue.Queue 

67 ) 

68 

69 _pool: sqla_queue.QueueCommon[ConnectionPoolEntry] 

70 

71 def __init__( 

72 self, 

73 creator: Union[_CreatorFnType, _CreatorWRecFnType], 

74 pool_size: int = 5, 

75 max_overflow: int = 10, 

76 timeout: float = 30.0, 

77 use_lifo: bool = False, 

78 **kw: Any, 

79 ): 

80 r""" 

81 Construct a QueuePool. 

82 

83 :param creator: a callable function that returns a DB-API 

84 connection object, same as that of :paramref:`_pool.Pool.creator`. 

85 

86 :param pool_size: The size of the pool to be maintained, 

87 defaults to 5. This is the largest number of connections that 

88 will be kept persistently in the pool. Note that the pool 

89 begins with no connections; once this number of connections 

90 is requested, that number of connections will remain. 

91 ``pool_size`` can be set to 0 to indicate no size limit; to 

92 disable pooling, use a :class:`~sqlalchemy.pool.NullPool` 

93 instead. 

94 

95 :param max_overflow: The maximum overflow size of the 

96 pool. When the number of checked-out connections reaches the 

97 size set in pool_size, additional connections will be 

98 returned up to this limit. When those additional connections 

99 are returned to the pool, they are disconnected and 

100 discarded. It follows then that the total number of 

101 simultaneous connections the pool will allow is pool_size + 

102 `max_overflow`, and the total number of "sleeping" 

103 connections the pool will allow is pool_size. `max_overflow` 

104 can be set to -1 to indicate no overflow limit; no limit 

105 will be placed on the total number of concurrent 

106 connections. Defaults to 10. 

107 

108 :param timeout: The number of seconds to wait before giving up 

109 on returning a connection. Defaults to 30.0. This can be a float 

110 but is subject to the limitations of Python time functions which 

111 may not be reliable in the tens of milliseconds. 

112 

113 :param use_lifo: use LIFO (last-in-first-out) when retrieving 

114 connections instead of FIFO (first-in-first-out). Using LIFO, a 

115 server-side timeout scheme can reduce the number of connections used 

116 during non-peak periods of use. When planning for server-side 

117 timeouts, ensure that a recycle or pre-ping strategy is in use to 

118 gracefully handle stale connections. 

119 

120 .. seealso:: 

121 

122 :ref:`pool_use_lifo` 

123 

124 :ref:`pool_disconnects` 

125 

126 :param \**kw: Other keyword arguments including 

127 :paramref:`_pool.Pool.recycle`, :paramref:`_pool.Pool.echo`, 

128 :paramref:`_pool.Pool.reset_on_return` and others are passed to the 

129 :class:`_pool.Pool` constructor. 

130 

131 """ 

132 

133 Pool.__init__(self, creator, **kw) 

134 self._pool = self._queue_class(pool_size, use_lifo=use_lifo) 

135 self._overflow = 0 - pool_size 

136 self._max_overflow = -1 if pool_size == 0 else max_overflow 

137 self._timeout = timeout 

138 self._overflow_lock = threading.Lock() 

139 

140 def _do_return_conn(self, record: ConnectionPoolEntry) -> None: 

141 try: 

142 self._pool.put(record, False) 

143 except sqla_queue.Full: 

144 try: 

145 record.close() 

146 finally: 

147 self._dec_overflow() 

148 

149 def _do_get(self) -> ConnectionPoolEntry: 

150 use_overflow = self._max_overflow > -1 

151 

152 wait = use_overflow and self._overflow >= self._max_overflow 

153 try: 

154 return self._pool.get(wait, self._timeout) 

155 except sqla_queue.Empty: 

156 # don't do things inside of "except Empty", because when we say 

157 # we timed out or can't connect and raise, Python 3 tells 

158 # people the real error is queue.Empty which it isn't. 

159 pass 

160 if use_overflow and self._overflow >= self._max_overflow: 

161 if not wait: 

162 return self._do_get() 

163 else: 

164 raise exc.TimeoutError( 

165 "QueuePool limit of size %d overflow %d reached, " 

166 "connection timed out, timeout %0.2f" 

167 % (self.size(), self.overflow(), self._timeout), 

168 code="3o7r", 

169 ) 

170 

171 if self._inc_overflow(): 

172 try: 

173 return self._create_connection() 

174 except: 

175 with util.safe_reraise(): 

176 self._dec_overflow() 

177 raise 

178 else: 

179 return self._do_get() 

180 

181 def _inc_overflow(self) -> bool: 

182 if self._max_overflow == -1: 

183 self._overflow += 1 

184 return True 

185 with self._overflow_lock: 

186 if self._overflow < self._max_overflow: 

187 self._overflow += 1 

188 return True 

189 else: 

190 return False 

191 

192 def _dec_overflow(self) -> Literal[True]: 

193 if self._max_overflow == -1: 

194 self._overflow -= 1 

195 return True 

196 with self._overflow_lock: 

197 self._overflow -= 1 

198 return True 

199 

200 def recreate(self) -> QueuePool: 

201 self.logger.info("Pool recreating") 

202 return self.__class__( 

203 self._creator, 

204 pool_size=self._pool.maxsize, 

205 max_overflow=self._max_overflow, 

206 pre_ping=self._pre_ping, 

207 use_lifo=self._pool.use_lifo, 

208 timeout=self._timeout, 

209 recycle=self._recycle, 

210 echo=self.echo, 

211 logging_name=self._orig_logging_name, 

212 reset_on_return=self._reset_on_return, 

213 _dispatch=self.dispatch, 

214 dialect=self._dialect, 

215 ) 

216 

217 def dispose(self) -> None: 

218 while True: 

219 try: 

220 conn = self._pool.get(False) 

221 conn.close() 

222 except sqla_queue.Empty: 

223 break 

224 

225 self._overflow = 0 - self.size() 

226 self.logger.info("Pool disposed. %s", self.status()) 

227 

228 def status(self) -> str: 

229 return ( 

230 "Pool size: %d Connections in pool: %d " 

231 "Current Overflow: %d Current Checked out " 

232 "connections: %d" 

233 % ( 

234 self.size(), 

235 self.checkedin(), 

236 self.overflow(), 

237 self.checkedout(), 

238 ) 

239 ) 

240 

241 def size(self) -> int: 

242 return self._pool.maxsize 

243 

244 def timeout(self) -> float: 

245 return self._timeout 

246 

247 def checkedin(self) -> int: 

248 return self._pool.qsize() 

249 

250 def overflow(self) -> int: 

251 return self._overflow if self._pool.maxsize else 0 

252 

253 def checkedout(self) -> int: 

254 return self._pool.maxsize - self._pool.qsize() + self._overflow 

255 

256 

257class AsyncAdaptedQueuePool(QueuePool): 

258 """An asyncio-compatible version of :class:`.QueuePool`. 

259 

260 This pool is used by default when using :class:`.AsyncEngine` engines that 

261 were generated from :func:`_asyncio.create_async_engine`. It uses an 

262 asyncio-compatible queue implementation that does not use 

263 ``threading.Lock``. 

264 

265 The arguments and operation of :class:`.AsyncAdaptedQueuePool` are 

266 otherwise identical to that of :class:`.QueuePool`. 

267 

268 """ 

269 

270 _is_asyncio = True 

271 _queue_class: Type[sqla_queue.QueueCommon[ConnectionPoolEntry]] = ( 

272 sqla_queue.AsyncAdaptedQueue 

273 ) 

274 

275 _dialect = _AsyncConnDialect() 

276 

277 

278class NullPool(Pool): 

279 """A Pool which does not pool connections. 

280 

281 Instead it literally opens and closes the underlying DB-API connection 

282 per each connection open/close. 

283 

284 Reconnect-related functions such as ``recycle`` and connection 

285 invalidation are not supported by this Pool implementation, since 

286 no connections are held persistently. 

287 

288 The :class:`.NullPool` class **is compatible** with asyncio and 

289 :func:`_asyncio.create_async_engine`. 

290 

291 """ 

292 

293 def status(self) -> str: 

294 return "NullPool" 

295 

296 def _do_return_conn(self, record: ConnectionPoolEntry) -> None: 

297 record.close() 

298 

299 def _do_get(self) -> ConnectionPoolEntry: 

300 return self._create_connection() 

301 

302 def recreate(self) -> NullPool: 

303 self.logger.info("Pool recreating") 

304 

305 return self.__class__( 

306 self._creator, 

307 recycle=self._recycle, 

308 echo=self.echo, 

309 logging_name=self._orig_logging_name, 

310 reset_on_return=self._reset_on_return, 

311 pre_ping=self._pre_ping, 

312 _dispatch=self.dispatch, 

313 dialect=self._dialect, 

314 ) 

315 

316 def dispose(self) -> None: 

317 pass 

318 

319 

320class SingletonThreadPool(Pool): 

321 """A Pool that maintains one connection per thread. 

322 

323 Maintains one connection per each thread, never moving a connection to a 

324 thread other than the one which it was created in. 

325 

326 .. warning:: the :class:`.SingletonThreadPool` will call ``.close()`` 

327 on arbitrary connections that exist beyond the size setting of 

328 ``pool_size``, e.g. if more unique **thread identities** 

329 than what ``pool_size`` states are used. This cleanup is 

330 non-deterministic and not sensitive to whether or not the connections 

331 linked to those thread identities are currently in use. 

332 

333 :class:`.SingletonThreadPool` may be improved in a future release, 

334 however in its current status it is generally used only for test 

335 scenarios using a SQLite ``:memory:`` database and is not recommended 

336 for production use. 

337 

338 The :class:`.SingletonThreadPool` class **is not compatible** with asyncio 

339 and :func:`_asyncio.create_async_engine`. 

340 

341 

342 Options are the same as those of :class:`_pool.Pool`, as well as: 

343 

344 :param pool_size: The number of threads in which to maintain connections 

345 at once. Defaults to five. 

346 

347 :class:`.SingletonThreadPool` is used by the SQLite dialect 

348 automatically when a memory-based database is used. 

349 See :ref:`sqlite_toplevel`. 

350 

351 """ 

352 

353 _is_asyncio = False 

354 

355 def __init__( 

356 self, 

357 creator: Union[_CreatorFnType, _CreatorWRecFnType], 

358 pool_size: int = 5, 

359 **kw: Any, 

360 ): 

361 Pool.__init__(self, creator, **kw) 

362 self._conn = threading.local() 

363 self._fairy = threading.local() 

364 self._all_conns: Set[ConnectionPoolEntry] = set() 

365 self.size = pool_size 

366 

367 def recreate(self) -> SingletonThreadPool: 

368 self.logger.info("Pool recreating") 

369 return self.__class__( 

370 self._creator, 

371 pool_size=self.size, 

372 recycle=self._recycle, 

373 echo=self.echo, 

374 pre_ping=self._pre_ping, 

375 logging_name=self._orig_logging_name, 

376 reset_on_return=self._reset_on_return, 

377 _dispatch=self.dispatch, 

378 dialect=self._dialect, 

379 ) 

380 

381 def _transfer_from( 

382 self, other_singleton_pool: SingletonThreadPool 

383 ) -> None: 

384 # used by the test suite to make a new engine / pool without 

385 # losing the state of an existing SQLite :memory: connection 

386 assert not hasattr(other_singleton_pool._fairy, "current") 

387 self._conn = other_singleton_pool._conn 

388 self._all_conns = other_singleton_pool._all_conns 

389 

390 def dispose(self) -> None: 

391 """Dispose of this pool.""" 

392 

393 for conn in self._all_conns: 

394 try: 

395 conn.close() 

396 except Exception: 

397 # pysqlite won't even let you close a conn from a thread 

398 # that didn't create it 

399 pass 

400 

401 self._all_conns.clear() 

402 

403 def _cleanup(self) -> None: 

404 while len(self._all_conns) >= self.size: 

405 c = self._all_conns.pop() 

406 c.close() 

407 

408 def status(self) -> str: 

409 return "SingletonThreadPool id:%d size: %d" % ( 

410 id(self), 

411 len(self._all_conns), 

412 ) 

413 

414 def _do_return_conn(self, record: ConnectionPoolEntry) -> None: 

415 try: 

416 del self._fairy.current 

417 except AttributeError: 

418 pass 

419 

420 def _do_get(self) -> ConnectionPoolEntry: 

421 try: 

422 if TYPE_CHECKING: 

423 c = cast(ConnectionPoolEntry, self._conn.current()) 

424 else: 

425 c = self._conn.current() 

426 if c: 

427 return c 

428 except AttributeError: 

429 pass 

430 c = self._create_connection() 

431 self._conn.current = weakref.ref(c) 

432 if len(self._all_conns) >= self.size: 

433 self._cleanup() 

434 self._all_conns.add(c) 

435 return c 

436 

437 def connect(self) -> PoolProxiedConnection: 

438 # vendored from Pool to include the now removed use_threadlocal 

439 # behavior 

440 try: 

441 rec = cast(_ConnectionFairy, self._fairy.current()) 

442 except AttributeError: 

443 pass 

444 else: 

445 if rec is not None: 

446 return rec._checkout_existing() 

447 

448 return _ConnectionFairy._checkout(self, self._fairy) 

449 

450 

451class StaticPool(Pool): 

452 """A Pool of exactly one connection, used for all requests. 

453 

454 Reconnect-related functions such as ``recycle`` and connection 

455 invalidation (which is also used to support auto-reconnect) are only 

456 partially supported right now and may not yield good results. 

457 

458 The :class:`.StaticPool` class **is compatible** with asyncio and 

459 :func:`_asyncio.create_async_engine`. 

460 

461 """ 

462 

463 @util.memoized_property 

464 def connection(self) -> _ConnectionRecord: 

465 return _ConnectionRecord(self) 

466 

467 def status(self) -> str: 

468 return "StaticPool" 

469 

470 def dispose(self) -> None: 

471 if ( 

472 "connection" in self.__dict__ 

473 and self.connection.dbapi_connection is not None 

474 ): 

475 self.connection.close() 

476 del self.__dict__["connection"] 

477 

478 def recreate(self) -> StaticPool: 

479 self.logger.info("Pool recreating") 

480 return self.__class__( 

481 creator=self._creator, 

482 recycle=self._recycle, 

483 reset_on_return=self._reset_on_return, 

484 pre_ping=self._pre_ping, 

485 echo=self.echo, 

486 logging_name=self._orig_logging_name, 

487 _dispatch=self.dispatch, 

488 dialect=self._dialect, 

489 ) 

490 

491 def _transfer_from(self, other_static_pool: StaticPool) -> None: 

492 # used by the test suite to make a new engine / pool without 

493 # losing the state of an existing SQLite :memory: connection 

494 def creator(rec: ConnectionPoolEntry) -> DBAPIConnection: 

495 conn = other_static_pool.connection.dbapi_connection 

496 assert conn is not None 

497 return conn 

498 

499 self._invoke_creator = creator 

500 

501 def _create_connection(self) -> ConnectionPoolEntry: 

502 raise NotImplementedError() 

503 

504 def _do_return_conn(self, record: ConnectionPoolEntry) -> None: 

505 pass 

506 

507 def _do_get(self) -> ConnectionPoolEntry: 

508 rec = self.connection 

509 if rec._is_hard_or_soft_invalidated(): 

510 del self.__dict__["connection"] 

511 rec = self.connection 

512 

513 return rec 

514 

515 

516class AssertionPool(Pool): 

517 """A :class:`_pool.Pool` that allows at most one checked out connection at 

518 any given time. 

519 

520 This will raise an exception if more than one connection is checked out 

521 at a time. Useful for debugging code that is using more connections 

522 than desired. 

523 

524 The :class:`.AssertionPool` class **is compatible** with asyncio and 

525 :func:`_asyncio.create_async_engine`. 

526 

527 """ 

528 

529 _conn: Optional[ConnectionPoolEntry] 

530 _checkout_traceback: Optional[List[str]] 

531 

532 def __init__(self, *args: Any, **kw: Any): 

533 self._conn = None 

534 self._checked_out = False 

535 self._store_traceback = kw.pop("store_traceback", True) 

536 self._checkout_traceback = None 

537 Pool.__init__(self, *args, **kw) 

538 

539 def status(self) -> str: 

540 return "AssertionPool" 

541 

542 def _do_return_conn(self, record: ConnectionPoolEntry) -> None: 

543 if not self._checked_out: 

544 raise AssertionError("connection is not checked out") 

545 self._checked_out = False 

546 assert record is self._conn 

547 

548 def dispose(self) -> None: 

549 self._checked_out = False 

550 if self._conn: 

551 self._conn.close() 

552 

553 def recreate(self) -> AssertionPool: 

554 self.logger.info("Pool recreating") 

555 return self.__class__( 

556 self._creator, 

557 echo=self.echo, 

558 pre_ping=self._pre_ping, 

559 recycle=self._recycle, 

560 reset_on_return=self._reset_on_return, 

561 logging_name=self._orig_logging_name, 

562 _dispatch=self.dispatch, 

563 dialect=self._dialect, 

564 ) 

565 

566 def _do_get(self) -> ConnectionPoolEntry: 

567 if self._checked_out: 

568 if self._checkout_traceback: 

569 suffix = " at:\n%s" % "".join( 

570 chop_traceback(self._checkout_traceback) 

571 ) 

572 else: 

573 suffix = "" 

574 raise AssertionError("connection is already checked out" + suffix) 

575 

576 if not self._conn: 

577 self._conn = self._create_connection() 

578 

579 self._checked_out = True 

580 if self._store_traceback: 

581 self._checkout_traceback = traceback.format_stack() 

582 return self._conn