Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/pool/impl.py: 55%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

241 statements  

1# pool/impl.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8 

9"""Pool implementation classes. 

10 

11""" 

12from __future__ import annotations 

13 

14import threading 

15import traceback 

16import typing 

17from typing import Any 

18from typing import cast 

19from typing import List 

20from typing import Optional 

21from typing import Set 

22from typing import Type 

23from typing import TYPE_CHECKING 

24from typing import Union 

25import weakref 

26 

27from .base import _AsyncConnDialect 

28from .base import _ConnectionFairy 

29from .base import _ConnectionRecord 

30from .base import _CreatorFnType 

31from .base import _CreatorWRecFnType 

32from .base import ConnectionPoolEntry 

33from .base import Pool 

34from .base import PoolProxiedConnection 

35from .. import exc 

36from .. import util 

37from ..util import chop_traceback 

38from ..util import queue as sqla_queue 

39from ..util.typing import Literal 

40 

41if typing.TYPE_CHECKING: 

42 from ..engine.interfaces import DBAPIConnection 

43 

44 

45class QueuePool(Pool): 

46 """A :class:`_pool.Pool` 

47 that imposes a limit on the number of open connections. 

48 

49 :class:`.QueuePool` is the default pooling implementation used for 

50 all :class:`_engine.Engine` objects other than SQLite with a ``:memory:`` 

51 database. 

52 

53 The :class:`.QueuePool` class **is not compatible** with asyncio and 

54 :func:`_asyncio.create_async_engine`. The 

55 :class:`.AsyncAdaptedQueuePool` class is used automatically when 

56 using :func:`_asyncio.create_async_engine`, if no other kind of pool 

57 is specified. 

58 

59 .. seealso:: 

60 

61 :class:`.AsyncAdaptedQueuePool` 

62 

63 """ 

64 

65 _is_asyncio = False # type: ignore[assignment] 

66 

67 _queue_class: Type[sqla_queue.QueueCommon[ConnectionPoolEntry]] = ( 

68 sqla_queue.Queue 

69 ) 

70 

71 _pool: sqla_queue.QueueCommon[ConnectionPoolEntry] 

72 

73 def __init__( 

74 self, 

75 creator: Union[_CreatorFnType, _CreatorWRecFnType], 

76 pool_size: int = 5, 

77 max_overflow: int = 10, 

78 timeout: float = 30.0, 

79 use_lifo: bool = False, 

80 **kw: Any, 

81 ): 

82 r""" 

83 Construct a QueuePool. 

84 

85 :param creator: a callable function that returns a DB-API 

86 connection object, same as that of :paramref:`_pool.Pool.creator`. 

87 

88 :param pool_size: The size of the pool to be maintained, 

89 defaults to 5. This is the largest number of connections that 

90 will be kept persistently in the pool. Note that the pool 

91 begins with no connections; once this number of connections 

92 is requested, that number of connections will remain. 

93 ``pool_size`` can be set to 0 to indicate no size limit; to 

94 disable pooling, use a :class:`~sqlalchemy.pool.NullPool` 

95 instead. 

96 

97 :param max_overflow: The maximum overflow size of the 

98 pool. When the number of checked-out connections reaches the 

99 size set in pool_size, additional connections will be 

100 returned up to this limit. When those additional connections 

101 are returned to the pool, they are disconnected and 

102 discarded. It follows then that the total number of 

103 simultaneous connections the pool will allow is pool_size + 

104 `max_overflow`, and the total number of "sleeping" 

105 connections the pool will allow is pool_size. `max_overflow` 

106 can be set to -1 to indicate no overflow limit; no limit 

107 will be placed on the total number of concurrent 

108 connections. Defaults to 10. 

109 

110 :param timeout: The number of seconds to wait before giving up 

111 on returning a connection. Defaults to 30.0. This can be a float 

112 but is subject to the limitations of Python time functions which 

113 may not be reliable in the tens of milliseconds. 

114 

115 :param use_lifo: use LIFO (last-in-first-out) when retrieving 

116 connections instead of FIFO (first-in-first-out). Using LIFO, a 

117 server-side timeout scheme can reduce the number of connections used 

118 during non-peak periods of use. When planning for server-side 

119 timeouts, ensure that a recycle or pre-ping strategy is in use to 

120 gracefully handle stale connections. 

121 

122 .. versionadded:: 1.3 

123 

124 .. seealso:: 

125 

126 :ref:`pool_use_lifo` 

127 

128 :ref:`pool_disconnects` 

129 

130 :param \**kw: Other keyword arguments including 

131 :paramref:`_pool.Pool.recycle`, :paramref:`_pool.Pool.echo`, 

132 :paramref:`_pool.Pool.reset_on_return` and others are passed to the 

133 :class:`_pool.Pool` constructor. 

134 

135 """ 

136 

137 Pool.__init__(self, creator, **kw) 

138 self._pool = self._queue_class(pool_size, use_lifo=use_lifo) 

139 self._overflow = 0 - pool_size 

140 self._max_overflow = -1 if pool_size == 0 else max_overflow 

141 self._timeout = timeout 

142 self._overflow_lock = threading.Lock() 

143 

144 def _do_return_conn(self, record: ConnectionPoolEntry) -> None: 

145 try: 

146 self._pool.put(record, False) 

147 except sqla_queue.Full: 

148 try: 

149 record.close() 

150 finally: 

151 self._dec_overflow() 

152 

153 def _do_get(self) -> ConnectionPoolEntry: 

154 use_overflow = self._max_overflow > -1 

155 

156 wait = use_overflow and self._overflow >= self._max_overflow 

157 try: 

158 return self._pool.get(wait, self._timeout) 

159 except sqla_queue.Empty: 

160 # don't do things inside of "except Empty", because when we say 

161 # we timed out or can't connect and raise, Python 3 tells 

162 # people the real error is queue.Empty which it isn't. 

163 pass 

164 if use_overflow and self._overflow >= self._max_overflow: 

165 if not wait: 

166 return self._do_get() 

167 else: 

168 raise exc.TimeoutError( 

169 "QueuePool limit of size %d overflow %d reached, " 

170 "connection timed out, timeout %0.2f" 

171 % (self.size(), self.overflow(), self._timeout), 

172 code="3o7r", 

173 ) 

174 

175 if self._inc_overflow(): 

176 try: 

177 return self._create_connection() 

178 except: 

179 with util.safe_reraise(): 

180 self._dec_overflow() 

181 raise 

182 else: 

183 return self._do_get() 

184 

185 def _inc_overflow(self) -> bool: 

186 if self._max_overflow == -1: 

187 self._overflow += 1 

188 return True 

189 with self._overflow_lock: 

190 if self._overflow < self._max_overflow: 

191 self._overflow += 1 

192 return True 

193 else: 

194 return False 

195 

196 def _dec_overflow(self) -> Literal[True]: 

197 if self._max_overflow == -1: 

198 self._overflow -= 1 

199 return True 

200 with self._overflow_lock: 

201 self._overflow -= 1 

202 return True 

203 

204 def recreate(self) -> QueuePool: 

205 self.logger.info("Pool recreating") 

206 return self.__class__( 

207 self._creator, 

208 pool_size=self._pool.maxsize, 

209 max_overflow=self._max_overflow, 

210 pre_ping=self._pre_ping, 

211 use_lifo=self._pool.use_lifo, 

212 timeout=self._timeout, 

213 recycle=self._recycle, 

214 echo=self.echo, 

215 logging_name=self._orig_logging_name, 

216 reset_on_return=self._reset_on_return, 

217 _dispatch=self.dispatch, 

218 dialect=self._dialect, 

219 ) 

220 

221 def dispose(self) -> None: 

222 while True: 

223 try: 

224 conn = self._pool.get(False) 

225 conn.close() 

226 except sqla_queue.Empty: 

227 break 

228 

229 self._overflow = 0 - self.size() 

230 self.logger.info("Pool disposed. %s", self.status()) 

231 

232 def status(self) -> str: 

233 return ( 

234 "Pool size: %d Connections in pool: %d " 

235 "Current Overflow: %d Current Checked out " 

236 "connections: %d" 

237 % ( 

238 self.size(), 

239 self.checkedin(), 

240 self.overflow(), 

241 self.checkedout(), 

242 ) 

243 ) 

244 

245 def size(self) -> int: 

246 return self._pool.maxsize 

247 

248 def timeout(self) -> float: 

249 return self._timeout 

250 

251 def checkedin(self) -> int: 

252 return self._pool.qsize() 

253 

254 def overflow(self) -> int: 

255 return self._overflow if self._pool.maxsize else 0 

256 

257 def checkedout(self) -> int: 

258 return self._pool.maxsize - self._pool.qsize() + self._overflow 

259 

260 

261class AsyncAdaptedQueuePool(QueuePool): 

262 """An asyncio-compatible version of :class:`.QueuePool`. 

263 

264 This pool is used by default when using :class:`.AsyncEngine` engines that 

265 were generated from :func:`_asyncio.create_async_engine`. It uses an 

266 asyncio-compatible queue implementation that does not use 

267 ``threading.Lock``. 

268 

269 The arguments and operation of :class:`.AsyncAdaptedQueuePool` are 

270 otherwise identical to that of :class:`.QueuePool`. 

271 

272 """ 

273 

274 _is_asyncio = True # type: ignore[assignment] 

275 _queue_class: Type[sqla_queue.QueueCommon[ConnectionPoolEntry]] = ( 

276 sqla_queue.AsyncAdaptedQueue 

277 ) 

278 

279 _dialect = _AsyncConnDialect() 

280 

281 

282class FallbackAsyncAdaptedQueuePool(AsyncAdaptedQueuePool): 

283 _queue_class = sqla_queue.FallbackAsyncAdaptedQueue 

284 

285 

286class NullPool(Pool): 

287 """A Pool which does not pool connections. 

288 

289 Instead it literally opens and closes the underlying DB-API connection 

290 per each connection open/close. 

291 

292 Reconnect-related functions such as ``recycle`` and connection 

293 invalidation are not supported by this Pool implementation, since 

294 no connections are held persistently. 

295 

296 The :class:`.NullPool` class **is compatible** with asyncio and 

297 :func:`_asyncio.create_async_engine`. 

298 

299 """ 

300 

301 def status(self) -> str: 

302 return "NullPool" 

303 

304 def _do_return_conn(self, record: ConnectionPoolEntry) -> None: 

305 record.close() 

306 

307 def _do_get(self) -> ConnectionPoolEntry: 

308 return self._create_connection() 

309 

310 def recreate(self) -> NullPool: 

311 self.logger.info("Pool recreating") 

312 

313 return self.__class__( 

314 self._creator, 

315 recycle=self._recycle, 

316 echo=self.echo, 

317 logging_name=self._orig_logging_name, 

318 reset_on_return=self._reset_on_return, 

319 pre_ping=self._pre_ping, 

320 _dispatch=self.dispatch, 

321 dialect=self._dialect, 

322 ) 

323 

324 def dispose(self) -> None: 

325 pass 

326 

327 

328class SingletonThreadPool(Pool): 

329 """A Pool that maintains one connection per thread. 

330 

331 Maintains one connection per each thread, never moving a connection to a 

332 thread other than the one which it was created in. 

333 

334 .. warning:: the :class:`.SingletonThreadPool` will call ``.close()`` 

335 on arbitrary connections that exist beyond the size setting of 

336 ``pool_size``, e.g. if more unique **thread identities** 

337 than what ``pool_size`` states are used. This cleanup is 

338 non-deterministic and not sensitive to whether or not the connections 

339 linked to those thread identities are currently in use. 

340 

341 :class:`.SingletonThreadPool` may be improved in a future release, 

342 however in its current status it is generally used only for test 

343 scenarios using a SQLite ``:memory:`` database and is not recommended 

344 for production use. 

345 

346 The :class:`.SingletonThreadPool` class **is not compatible** with asyncio 

347 and :func:`_asyncio.create_async_engine`. 

348 

349 

350 Options are the same as those of :class:`_pool.Pool`, as well as: 

351 

352 :param pool_size: The number of threads in which to maintain connections 

353 at once. Defaults to five. 

354 

355 :class:`.SingletonThreadPool` is used by the SQLite dialect 

356 automatically when a memory-based database is used. 

357 See :ref:`sqlite_toplevel`. 

358 

359 """ 

360 

361 _is_asyncio = False # type: ignore[assignment] 

362 

363 def __init__( 

364 self, 

365 creator: Union[_CreatorFnType, _CreatorWRecFnType], 

366 pool_size: int = 5, 

367 **kw: Any, 

368 ): 

369 Pool.__init__(self, creator, **kw) 

370 self._conn = threading.local() 

371 self._fairy = threading.local() 

372 self._all_conns: Set[ConnectionPoolEntry] = set() 

373 self.size = pool_size 

374 

375 def recreate(self) -> SingletonThreadPool: 

376 self.logger.info("Pool recreating") 

377 return self.__class__( 

378 self._creator, 

379 pool_size=self.size, 

380 recycle=self._recycle, 

381 echo=self.echo, 

382 pre_ping=self._pre_ping, 

383 logging_name=self._orig_logging_name, 

384 reset_on_return=self._reset_on_return, 

385 _dispatch=self.dispatch, 

386 dialect=self._dialect, 

387 ) 

388 

389 def dispose(self) -> None: 

390 """Dispose of this pool.""" 

391 

392 for conn in self._all_conns: 

393 try: 

394 conn.close() 

395 except Exception: 

396 # pysqlite won't even let you close a conn from a thread 

397 # that didn't create it 

398 pass 

399 

400 self._all_conns.clear() 

401 

402 def _cleanup(self) -> None: 

403 while len(self._all_conns) >= self.size: 

404 c = self._all_conns.pop() 

405 c.close() 

406 

407 def status(self) -> str: 

408 return "SingletonThreadPool id:%d size: %d" % ( 

409 id(self), 

410 len(self._all_conns), 

411 ) 

412 

413 def _do_return_conn(self, record: ConnectionPoolEntry) -> None: 

414 try: 

415 del self._fairy.current 

416 except AttributeError: 

417 pass 

418 

419 def _do_get(self) -> ConnectionPoolEntry: 

420 try: 

421 if TYPE_CHECKING: 

422 c = cast(ConnectionPoolEntry, self._conn.current()) 

423 else: 

424 c = self._conn.current() 

425 if c: 

426 return c 

427 except AttributeError: 

428 pass 

429 c = self._create_connection() 

430 self._conn.current = weakref.ref(c) 

431 if len(self._all_conns) >= self.size: 

432 self._cleanup() 

433 self._all_conns.add(c) 

434 return c 

435 

436 def connect(self) -> PoolProxiedConnection: 

437 # vendored from Pool to include the now removed use_threadlocal 

438 # behavior 

439 try: 

440 rec = cast(_ConnectionFairy, self._fairy.current()) 

441 except AttributeError: 

442 pass 

443 else: 

444 if rec is not None: 

445 return rec._checkout_existing() 

446 

447 return _ConnectionFairy._checkout(self, self._fairy) 

448 

449 

450class StaticPool(Pool): 

451 """A Pool of exactly one connection, used for all requests. 

452 

453 Reconnect-related functions such as ``recycle`` and connection 

454 invalidation (which is also used to support auto-reconnect) are only 

455 partially supported right now and may not yield good results. 

456 

457 The :class:`.StaticPool` class **is compatible** with asyncio and 

458 :func:`_asyncio.create_async_engine`. 

459 

460 """ 

461 

462 @util.memoized_property 

463 def connection(self) -> _ConnectionRecord: 

464 return _ConnectionRecord(self) 

465 

466 def status(self) -> str: 

467 return "StaticPool" 

468 

469 def dispose(self) -> None: 

470 if ( 

471 "connection" in self.__dict__ 

472 and self.connection.dbapi_connection is not None 

473 ): 

474 self.connection.close() 

475 del self.__dict__["connection"] 

476 

477 def recreate(self) -> StaticPool: 

478 self.logger.info("Pool recreating") 

479 return self.__class__( 

480 creator=self._creator, 

481 recycle=self._recycle, 

482 reset_on_return=self._reset_on_return, 

483 pre_ping=self._pre_ping, 

484 echo=self.echo, 

485 logging_name=self._orig_logging_name, 

486 _dispatch=self.dispatch, 

487 dialect=self._dialect, 

488 ) 

489 

490 def _transfer_from(self, other_static_pool: StaticPool) -> None: 

491 # used by the test suite to make a new engine / pool without 

492 # losing the state of an existing SQLite :memory: connection 

493 def creator(rec: ConnectionPoolEntry) -> DBAPIConnection: 

494 conn = other_static_pool.connection.dbapi_connection 

495 assert conn is not None 

496 return conn 

497 

498 self._invoke_creator = creator 

499 

500 def _create_connection(self) -> ConnectionPoolEntry: 

501 raise NotImplementedError() 

502 

503 def _do_return_conn(self, record: ConnectionPoolEntry) -> None: 

504 pass 

505 

506 def _do_get(self) -> ConnectionPoolEntry: 

507 rec = self.connection 

508 if rec._is_hard_or_soft_invalidated(): 

509 del self.__dict__["connection"] 

510 rec = self.connection 

511 

512 return rec 

513 

514 

515class AssertionPool(Pool): 

516 """A :class:`_pool.Pool` that allows at most one checked out connection at 

517 any given time. 

518 

519 This will raise an exception if more than one connection is checked out 

520 at a time. Useful for debugging code that is using more connections 

521 than desired. 

522 

523 The :class:`.AssertionPool` class **is compatible** with asyncio and 

524 :func:`_asyncio.create_async_engine`. 

525 

526 """ 

527 

528 _conn: Optional[ConnectionPoolEntry] 

529 _checkout_traceback: Optional[List[str]] 

530 

531 def __init__(self, *args: Any, **kw: Any): 

532 self._conn = None 

533 self._checked_out = False 

534 self._store_traceback = kw.pop("store_traceback", True) 

535 self._checkout_traceback = None 

536 Pool.__init__(self, *args, **kw) 

537 

538 def status(self) -> str: 

539 return "AssertionPool" 

540 

541 def _do_return_conn(self, record: ConnectionPoolEntry) -> None: 

542 if not self._checked_out: 

543 raise AssertionError("connection is not checked out") 

544 self._checked_out = False 

545 assert record is self._conn 

546 

547 def dispose(self) -> None: 

548 self._checked_out = False 

549 if self._conn: 

550 self._conn.close() 

551 

552 def recreate(self) -> AssertionPool: 

553 self.logger.info("Pool recreating") 

554 return self.__class__( 

555 self._creator, 

556 echo=self.echo, 

557 pre_ping=self._pre_ping, 

558 recycle=self._recycle, 

559 reset_on_return=self._reset_on_return, 

560 logging_name=self._orig_logging_name, 

561 _dispatch=self.dispatch, 

562 dialect=self._dialect, 

563 ) 

564 

565 def _do_get(self) -> ConnectionPoolEntry: 

566 if self._checked_out: 

567 if self._checkout_traceback: 

568 suffix = " at:\n%s" % "".join( 

569 chop_traceback(self._checkout_traceback) 

570 ) 

571 else: 

572 suffix = "" 

573 raise AssertionError("connection is already checked out" + suffix) 

574 

575 if not self._conn: 

576 self._conn = self._create_connection() 

577 

578 self._checked_out = True 

579 if self._store_traceback: 

580 self._checkout_traceback = traceback.format_stack() 

581 return self._conn