Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/pool/impl.py: 44%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

245 statements  

1# pool/impl.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8 

9"""Pool implementation classes.""" 

10from __future__ import annotations 

11 

12import threading 

13import traceback 

14import typing 

15from typing import Any 

16from typing import cast 

17from typing import List 

18from typing import Optional 

19from typing import Set 

20from typing import Type 

21from typing import TYPE_CHECKING 

22from typing import Union 

23import weakref 

24 

25from .base import _AsyncConnDialect 

26from .base import _ConnectionFairy 

27from .base import _ConnectionRecord 

28from .base import _CreatorFnType 

29from .base import _CreatorWRecFnType 

30from .base import ConnectionPoolEntry 

31from .base import Pool 

32from .base import PoolProxiedConnection 

33from .. import exc 

34from .. import util 

35from ..util import chop_traceback 

36from ..util import queue as sqla_queue 

37from ..util.typing import Literal 

38 

39if typing.TYPE_CHECKING: 

40 from ..engine.interfaces import DBAPIConnection 

41 

42 

43class QueuePool(Pool): 

44 """A :class:`_pool.Pool` 

45 that imposes a limit on the number of open connections. 

46 

47 :class:`.QueuePool` is the default pooling implementation used for 

48 all :class:`_engine.Engine` objects other than SQLite with a ``:memory:`` 

49 database. 

50 

51 The :class:`.QueuePool` class **is not compatible** with asyncio and 

52 :func:`_asyncio.create_async_engine`. The 

53 :class:`.AsyncAdaptedQueuePool` class is used automatically when 

54 using :func:`_asyncio.create_async_engine`, if no other kind of pool 

55 is specified. 

56 

57 .. seealso:: 

58 

59 :class:`.AsyncAdaptedQueuePool` 

60 

61 """ 

62 

63 _is_asyncio = False 

64 

65 _queue_class: Type[sqla_queue.QueueCommon[ConnectionPoolEntry]] = ( 

66 sqla_queue.Queue 

67 ) 

68 

69 _pool: sqla_queue.QueueCommon[ConnectionPoolEntry] 

70 

71 def __init__( 

72 self, 

73 creator: Union[_CreatorFnType, _CreatorWRecFnType], 

74 pool_size: int = 5, 

75 max_overflow: int = 10, 

76 timeout: float = 30.0, 

77 use_lifo: bool = False, 

78 **kw: Any, 

79 ): 

80 r""" 

81 Construct a QueuePool. 

82 

83 :param creator: a callable function that returns a DB-API 

84 connection object, same as that of :paramref:`_pool.Pool.creator`. 

85 

86 :param pool_size: The size of the pool to be maintained, 

87 defaults to 5. This is the largest number of connections that 

88 will be kept persistently in the pool. Note that the pool 

89 begins with no connections; once this number of connections 

90 is requested, that number of connections will remain. 

91 ``pool_size`` can be set to 0 to indicate no size limit; to 

92 disable pooling, use a :class:`~sqlalchemy.pool.NullPool` 

93 instead. 

94 

95 :param max_overflow: The maximum overflow size of the 

96 pool. When the number of checked-out connections reaches the 

97 size set in pool_size, additional connections will be 

98 returned up to this limit. When those additional connections 

99 are returned to the pool, they are disconnected and 

100 discarded. It follows then that the total number of 

101 simultaneous connections the pool will allow is pool_size + 

102 `max_overflow`, and the total number of "sleeping" 

103 connections the pool will allow is pool_size. `max_overflow` 

104 can be set to -1 to indicate no overflow limit; no limit 

105 will be placed on the total number of concurrent 

106 connections. Defaults to 10. 

107 

108 :param timeout: The number of seconds to wait before giving up 

109 on returning a connection. Defaults to 30.0. This can be a float 

110 but is subject to the limitations of Python time functions which 

111 may not be reliable in the tens of milliseconds. 

112 

113 :param use_lifo: use LIFO (last-in-first-out) when retrieving 

114 connections instead of FIFO (first-in-first-out). Using LIFO, a 

115 server-side timeout scheme can reduce the number of connections used 

116 during non-peak periods of use. When planning for server-side 

117 timeouts, ensure that a recycle or pre-ping strategy is in use to 

118 gracefully handle stale connections. 

119 

120 .. versionadded:: 1.3 

121 

122 .. seealso:: 

123 

124 :ref:`pool_use_lifo` 

125 

126 :ref:`pool_disconnects` 

127 

128 :param \**kw: Other keyword arguments including 

129 :paramref:`_pool.Pool.recycle`, :paramref:`_pool.Pool.echo`, 

130 :paramref:`_pool.Pool.reset_on_return` and others are passed to the 

131 :class:`_pool.Pool` constructor. 

132 

133 """ 

134 

135 Pool.__init__(self, creator, **kw) 

136 self._pool = self._queue_class(pool_size, use_lifo=use_lifo) 

137 self._overflow = 0 - pool_size 

138 self._max_overflow = -1 if pool_size == 0 else max_overflow 

139 self._timeout = timeout 

140 self._overflow_lock = threading.Lock() 

141 

142 def _do_return_conn(self, record: ConnectionPoolEntry) -> None: 

143 try: 

144 self._pool.put(record, False) 

145 except sqla_queue.Full: 

146 try: 

147 record.close() 

148 finally: 

149 self._dec_overflow() 

150 

151 def _do_get(self) -> ConnectionPoolEntry: 

152 use_overflow = self._max_overflow > -1 

153 

154 wait = use_overflow and self._overflow >= self._max_overflow 

155 try: 

156 return self._pool.get(wait, self._timeout) 

157 except sqla_queue.Empty: 

158 # don't do things inside of "except Empty", because when we say 

159 # we timed out or can't connect and raise, Python 3 tells 

160 # people the real error is queue.Empty which it isn't. 

161 pass 

162 if use_overflow and self._overflow >= self._max_overflow: 

163 if not wait: 

164 return self._do_get() 

165 else: 

166 raise exc.TimeoutError( 

167 "QueuePool limit of size %d overflow %d reached, " 

168 "connection timed out, timeout %0.2f" 

169 % (self.size(), self.overflow(), self._timeout), 

170 code="3o7r", 

171 ) 

172 

173 if self._inc_overflow(): 

174 try: 

175 return self._create_connection() 

176 except: 

177 with util.safe_reraise(): 

178 self._dec_overflow() 

179 raise 

180 else: 

181 return self._do_get() 

182 

183 def _inc_overflow(self) -> bool: 

184 if self._max_overflow == -1: 

185 self._overflow += 1 

186 return True 

187 with self._overflow_lock: 

188 if self._overflow < self._max_overflow: 

189 self._overflow += 1 

190 return True 

191 else: 

192 return False 

193 

194 def _dec_overflow(self) -> Literal[True]: 

195 if self._max_overflow == -1: 

196 self._overflow -= 1 

197 return True 

198 with self._overflow_lock: 

199 self._overflow -= 1 

200 return True 

201 

202 def recreate(self) -> QueuePool: 

203 self.logger.info("Pool recreating") 

204 return self.__class__( 

205 self._creator, 

206 pool_size=self._pool.maxsize, 

207 max_overflow=self._max_overflow, 

208 pre_ping=self._pre_ping, 

209 use_lifo=self._pool.use_lifo, 

210 timeout=self._timeout, 

211 recycle=self._recycle, 

212 echo=self.echo, 

213 logging_name=self._orig_logging_name, 

214 reset_on_return=self._reset_on_return, 

215 _dispatch=self.dispatch, 

216 dialect=self._dialect, 

217 ) 

218 

219 def dispose(self) -> None: 

220 while True: 

221 try: 

222 conn = self._pool.get(False) 

223 conn.close() 

224 except sqla_queue.Empty: 

225 break 

226 

227 self._overflow = 0 - self.size() 

228 self.logger.info("Pool disposed. %s", self.status()) 

229 

230 def status(self) -> str: 

231 return ( 

232 "Pool size: %d Connections in pool: %d " 

233 "Current Overflow: %d Current Checked out " 

234 "connections: %d" 

235 % ( 

236 self.size(), 

237 self.checkedin(), 

238 self.overflow(), 

239 self.checkedout(), 

240 ) 

241 ) 

242 

243 def size(self) -> int: 

244 return self._pool.maxsize 

245 

246 def timeout(self) -> float: 

247 return self._timeout 

248 

249 def checkedin(self) -> int: 

250 return self._pool.qsize() 

251 

252 def overflow(self) -> int: 

253 return self._overflow if self._pool.maxsize else 0 

254 

255 def checkedout(self) -> int: 

256 return self._pool.maxsize - self._pool.qsize() + self._overflow 

257 

258 

259class AsyncAdaptedQueuePool(QueuePool): 

260 """An asyncio-compatible version of :class:`.QueuePool`. 

261 

262 This pool is used by default when using :class:`.AsyncEngine` engines that 

263 were generated from :func:`_asyncio.create_async_engine`. It uses an 

264 asyncio-compatible queue implementation that does not use 

265 ``threading.Lock``. 

266 

267 The arguments and operation of :class:`.AsyncAdaptedQueuePool` are 

268 otherwise identical to that of :class:`.QueuePool`. 

269 

270 """ 

271 

272 _is_asyncio = True 

273 _queue_class: Type[sqla_queue.QueueCommon[ConnectionPoolEntry]] = ( 

274 sqla_queue.AsyncAdaptedQueue 

275 ) 

276 

277 _dialect = _AsyncConnDialect() 

278 

279 

280class FallbackAsyncAdaptedQueuePool(AsyncAdaptedQueuePool): 

281 _queue_class = sqla_queue.FallbackAsyncAdaptedQueue # type: ignore[assignment] # noqa: E501 

282 

283 

284class NullPool(Pool): 

285 """A Pool which does not pool connections. 

286 

287 Instead it literally opens and closes the underlying DB-API connection 

288 per each connection open/close. 

289 

290 Reconnect-related functions such as ``recycle`` and connection 

291 invalidation are not supported by this Pool implementation, since 

292 no connections are held persistently. 

293 

294 The :class:`.NullPool` class **is compatible** with asyncio and 

295 :func:`_asyncio.create_async_engine`. 

296 

297 """ 

298 

299 def status(self) -> str: 

300 return "NullPool" 

301 

302 def _do_return_conn(self, record: ConnectionPoolEntry) -> None: 

303 record.close() 

304 

305 def _do_get(self) -> ConnectionPoolEntry: 

306 return self._create_connection() 

307 

308 def recreate(self) -> NullPool: 

309 self.logger.info("Pool recreating") 

310 

311 return self.__class__( 

312 self._creator, 

313 recycle=self._recycle, 

314 echo=self.echo, 

315 logging_name=self._orig_logging_name, 

316 reset_on_return=self._reset_on_return, 

317 pre_ping=self._pre_ping, 

318 _dispatch=self.dispatch, 

319 dialect=self._dialect, 

320 ) 

321 

322 def dispose(self) -> None: 

323 pass 

324 

325 

326class SingletonThreadPool(Pool): 

327 """A Pool that maintains one connection per thread. 

328 

329 Maintains one connection per each thread, never moving a connection to a 

330 thread other than the one which it was created in. 

331 

332 .. warning:: the :class:`.SingletonThreadPool` will call ``.close()`` 

333 on arbitrary connections that exist beyond the size setting of 

334 ``pool_size``, e.g. if more unique **thread identities** 

335 than what ``pool_size`` states are used. This cleanup is 

336 non-deterministic and not sensitive to whether or not the connections 

337 linked to those thread identities are currently in use. 

338 

339 :class:`.SingletonThreadPool` may be improved in a future release, 

340 however in its current status it is generally used only for test 

341 scenarios using a SQLite ``:memory:`` database and is not recommended 

342 for production use. 

343 

344 The :class:`.SingletonThreadPool` class **is not compatible** with asyncio 

345 and :func:`_asyncio.create_async_engine`. 

346 

347 

348 Options are the same as those of :class:`_pool.Pool`, as well as: 

349 

350 :param pool_size: The number of threads in which to maintain connections 

351 at once. Defaults to five. 

352 

353 :class:`.SingletonThreadPool` is used by the SQLite dialect 

354 automatically when a memory-based database is used. 

355 See :ref:`sqlite_toplevel`. 

356 

357 """ 

358 

359 _is_asyncio = False 

360 

361 def __init__( 

362 self, 

363 creator: Union[_CreatorFnType, _CreatorWRecFnType], 

364 pool_size: int = 5, 

365 **kw: Any, 

366 ): 

367 Pool.__init__(self, creator, **kw) 

368 self._conn = threading.local() 

369 self._fairy = threading.local() 

370 self._all_conns: Set[ConnectionPoolEntry] = set() 

371 self.size = pool_size 

372 

373 def recreate(self) -> SingletonThreadPool: 

374 self.logger.info("Pool recreating") 

375 return self.__class__( 

376 self._creator, 

377 pool_size=self.size, 

378 recycle=self._recycle, 

379 echo=self.echo, 

380 pre_ping=self._pre_ping, 

381 logging_name=self._orig_logging_name, 

382 reset_on_return=self._reset_on_return, 

383 _dispatch=self.dispatch, 

384 dialect=self._dialect, 

385 ) 

386 

387 def _transfer_from( 

388 self, other_singleton_pool: SingletonThreadPool 

389 ) -> None: 

390 # used by the test suite to make a new engine / pool without 

391 # losing the state of an existing SQLite :memory: connection 

392 assert not hasattr(other_singleton_pool._fairy, "current") 

393 self._conn = other_singleton_pool._conn 

394 self._all_conns = other_singleton_pool._all_conns 

395 

396 def dispose(self) -> None: 

397 """Dispose of this pool.""" 

398 

399 for conn in self._all_conns: 

400 try: 

401 conn.close() 

402 except Exception: 

403 # pysqlite won't even let you close a conn from a thread 

404 # that didn't create it 

405 pass 

406 

407 self._all_conns.clear() 

408 

409 def _cleanup(self) -> None: 

410 while len(self._all_conns) >= self.size: 

411 c = self._all_conns.pop() 

412 c.close() 

413 

414 def status(self) -> str: 

415 return "SingletonThreadPool id:%d size: %d" % ( 

416 id(self), 

417 len(self._all_conns), 

418 ) 

419 

420 def _do_return_conn(self, record: ConnectionPoolEntry) -> None: 

421 try: 

422 del self._fairy.current 

423 except AttributeError: 

424 pass 

425 

426 def _do_get(self) -> ConnectionPoolEntry: 

427 try: 

428 if TYPE_CHECKING: 

429 c = cast(ConnectionPoolEntry, self._conn.current()) 

430 else: 

431 c = self._conn.current() 

432 if c: 

433 return c 

434 except AttributeError: 

435 pass 

436 c = self._create_connection() 

437 self._conn.current = weakref.ref(c) 

438 if len(self._all_conns) >= self.size: 

439 self._cleanup() 

440 self._all_conns.add(c) 

441 return c 

442 

443 def connect(self) -> PoolProxiedConnection: 

444 # vendored from Pool to include the now removed use_threadlocal 

445 # behavior 

446 try: 

447 rec = cast(_ConnectionFairy, self._fairy.current()) 

448 except AttributeError: 

449 pass 

450 else: 

451 if rec is not None: 

452 return rec._checkout_existing() 

453 

454 return _ConnectionFairy._checkout(self, self._fairy) 

455 

456 

457class StaticPool(Pool): 

458 """A Pool of exactly one connection, used for all requests. 

459 

460 Reconnect-related functions such as ``recycle`` and connection 

461 invalidation (which is also used to support auto-reconnect) are only 

462 partially supported right now and may not yield good results. 

463 

464 The :class:`.StaticPool` class **is compatible** with asyncio and 

465 :func:`_asyncio.create_async_engine`. 

466 

467 """ 

468 

469 @util.memoized_property 

470 def connection(self) -> _ConnectionRecord: 

471 return _ConnectionRecord(self) 

472 

473 def status(self) -> str: 

474 return "StaticPool" 

475 

476 def dispose(self) -> None: 

477 if ( 

478 "connection" in self.__dict__ 

479 and self.connection.dbapi_connection is not None 

480 ): 

481 self.connection.close() 

482 del self.__dict__["connection"] 

483 

484 def recreate(self) -> StaticPool: 

485 self.logger.info("Pool recreating") 

486 return self.__class__( 

487 creator=self._creator, 

488 recycle=self._recycle, 

489 reset_on_return=self._reset_on_return, 

490 pre_ping=self._pre_ping, 

491 echo=self.echo, 

492 logging_name=self._orig_logging_name, 

493 _dispatch=self.dispatch, 

494 dialect=self._dialect, 

495 ) 

496 

497 def _transfer_from(self, other_static_pool: StaticPool) -> None: 

498 # used by the test suite to make a new engine / pool without 

499 # losing the state of an existing SQLite :memory: connection 

500 def creator(rec: ConnectionPoolEntry) -> DBAPIConnection: 

501 conn = other_static_pool.connection.dbapi_connection 

502 assert conn is not None 

503 return conn 

504 

505 self._invoke_creator = creator 

506 

507 def _create_connection(self) -> ConnectionPoolEntry: 

508 raise NotImplementedError() 

509 

510 def _do_return_conn(self, record: ConnectionPoolEntry) -> None: 

511 pass 

512 

513 def _do_get(self) -> ConnectionPoolEntry: 

514 rec = self.connection 

515 if rec._is_hard_or_soft_invalidated(): 

516 del self.__dict__["connection"] 

517 rec = self.connection 

518 

519 return rec 

520 

521 

522class AssertionPool(Pool): 

523 """A :class:`_pool.Pool` that allows at most one checked out connection at 

524 any given time. 

525 

526 This will raise an exception if more than one connection is checked out 

527 at a time. Useful for debugging code that is using more connections 

528 than desired. 

529 

530 The :class:`.AssertionPool` class **is compatible** with asyncio and 

531 :func:`_asyncio.create_async_engine`. 

532 

533 """ 

534 

535 _conn: Optional[ConnectionPoolEntry] 

536 _checkout_traceback: Optional[List[str]] 

537 

538 def __init__(self, *args: Any, **kw: Any): 

539 self._conn = None 

540 self._checked_out = False 

541 self._store_traceback = kw.pop("store_traceback", True) 

542 self._checkout_traceback = None 

543 Pool.__init__(self, *args, **kw) 

544 

545 def status(self) -> str: 

546 return "AssertionPool" 

547 

548 def _do_return_conn(self, record: ConnectionPoolEntry) -> None: 

549 if not self._checked_out: 

550 raise AssertionError("connection is not checked out") 

551 self._checked_out = False 

552 assert record is self._conn 

553 

554 def dispose(self) -> None: 

555 self._checked_out = False 

556 if self._conn: 

557 self._conn.close() 

558 

559 def recreate(self) -> AssertionPool: 

560 self.logger.info("Pool recreating") 

561 return self.__class__( 

562 self._creator, 

563 echo=self.echo, 

564 pre_ping=self._pre_ping, 

565 recycle=self._recycle, 

566 reset_on_return=self._reset_on_return, 

567 logging_name=self._orig_logging_name, 

568 _dispatch=self.dispatch, 

569 dialect=self._dialect, 

570 ) 

571 

572 def _do_get(self) -> ConnectionPoolEntry: 

573 if self._checked_out: 

574 if self._checkout_traceback: 

575 suffix = " at:\n%s" % "".join( 

576 chop_traceback(self._checkout_traceback) 

577 ) 

578 else: 

579 suffix = "" 

580 raise AssertionError("connection is already checked out" + suffix) 

581 

582 if not self._conn: 

583 self._conn = self._create_connection() 

584 

585 self._checked_out = True 

586 if self._store_traceback: 

587 self._checkout_traceback = traceback.format_stack() 

588 return self._conn