Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/pool/impl.py: 45%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

239 statements  

1# pool/impl.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8 

9"""Pool implementation classes.""" 

10from __future__ import annotations 

11 

12import threading 

13import traceback 

14import typing 

15from typing import Any 

16from typing import cast 

17from typing import List 

18from typing import Optional 

19from typing import Set 

20from typing import Type 

21from typing import TYPE_CHECKING 

22from typing import Union 

23import weakref 

24 

25from .base import _AsyncConnDialect 

26from .base import _ConnectionFairy 

27from .base import _ConnectionRecord 

28from .base import _CreatorFnType 

29from .base import _CreatorWRecFnType 

30from .base import ConnectionPoolEntry 

31from .base import Pool 

32from .base import PoolProxiedConnection 

33from .. import exc 

34from .. import util 

35from ..util import chop_traceback 

36from ..util import queue as sqla_queue 

37from ..util.typing import Literal 

38 

39if typing.TYPE_CHECKING: 

40 from ..engine.interfaces import DBAPIConnection 

41 

42 

43class QueuePool(Pool): 

44 """A :class:`_pool.Pool` 

45 that imposes a limit on the number of open connections. 

46 

47 :class:`.QueuePool` is the default pooling implementation used for 

48 all :class:`_engine.Engine` objects other than SQLite with a ``:memory:`` 

49 database. 

50 

51 The :class:`.QueuePool` class **is not compatible** with asyncio and 

52 :func:`_asyncio.create_async_engine`. The 

53 :class:`.AsyncAdaptedQueuePool` class is used automatically when 

54 using :func:`_asyncio.create_async_engine`, if no other kind of pool 

55 is specified. 

56 

57 .. seealso:: 

58 

59 :class:`.AsyncAdaptedQueuePool` 

60 

61 """ 

62 

63 _is_asyncio = False 

64 

65 _queue_class: Type[sqla_queue.QueueCommon[ConnectionPoolEntry]] = ( 

66 sqla_queue.Queue 

67 ) 

68 

69 _pool: sqla_queue.QueueCommon[ConnectionPoolEntry] 

70 

71 def __init__( 

72 self, 

73 creator: Union[_CreatorFnType, _CreatorWRecFnType], 

74 pool_size: int = 5, 

75 max_overflow: int = 10, 

76 timeout: float = 30.0, 

77 use_lifo: bool = False, 

78 **kw: Any, 

79 ): 

80 r""" 

81 Construct a QueuePool. 

82 

83 :param creator: a callable function that returns a DB-API 

84 connection object, same as that of :paramref:`_pool.Pool.creator`. 

85 

86 :param pool_size: The size of the pool to be maintained, 

87 defaults to 5. This is the largest number of connections that 

88 will be kept persistently in the pool. Note that the pool 

89 begins with no connections; once this number of connections 

90 is requested, that number of connections will remain. 

91 ``pool_size`` can be set to 0 to indicate no size limit; to 

92 disable pooling, use a :class:`~sqlalchemy.pool.NullPool` 

93 instead. 

94 

95 :param max_overflow: The maximum overflow size of the 

96 pool. When the number of checked-out connections reaches the 

97 size set in pool_size, additional connections will be 

98 returned up to this limit. When those additional connections 

99 are returned to the pool, they are disconnected and 

100 discarded. It follows then that the total number of 

101 simultaneous connections the pool will allow is pool_size + 

102 `max_overflow`, and the total number of "sleeping" 

103 connections the pool will allow is pool_size. `max_overflow` 

104 can be set to -1 to indicate no overflow limit; no limit 

105 will be placed on the total number of concurrent 

106 connections. Defaults to 10. 

107 

108 :param timeout: The number of seconds to wait before giving up 

109 on returning a connection. Defaults to 30.0. This can be a float 

110 but is subject to the limitations of Python time functions which 

111 may not be reliable in the tens of milliseconds. 

112 

113 :param use_lifo: use LIFO (last-in-first-out) when retrieving 

114 connections instead of FIFO (first-in-first-out). Using LIFO, a 

115 server-side timeout scheme can reduce the number of connections used 

116 during non-peak periods of use. When planning for server-side 

117 timeouts, ensure that a recycle or pre-ping strategy is in use to 

118 gracefully handle stale connections. 

119 

120 .. seealso:: 

121 

122 :ref:`pool_use_lifo` 

123 

124 :ref:`pool_disconnects` 

125 

126 :param \**kw: Other keyword arguments including 

127 :paramref:`_pool.Pool.recycle`, :paramref:`_pool.Pool.echo`, 

128 :paramref:`_pool.Pool.reset_on_return` and others are passed to the 

129 :class:`_pool.Pool` constructor. 

130 

131 """ 

132 

133 Pool.__init__(self, creator, **kw) 

134 self._pool = self._queue_class(pool_size, use_lifo=use_lifo) 

135 self._overflow = 0 - pool_size 

136 self._max_overflow = -1 if pool_size == 0 else max_overflow 

137 self._timeout = timeout 

138 self._overflow_lock = threading.Lock() 

139 

140 def _do_return_conn(self, record: ConnectionPoolEntry) -> None: 

141 try: 

142 self._pool.put(record, False) 

143 except sqla_queue.Full: 

144 try: 

145 record.close() 

146 finally: 

147 self._dec_overflow() 

148 

149 def _do_get(self) -> ConnectionPoolEntry: 

150 use_overflow = self._max_overflow > -1 

151 

152 wait = use_overflow and self._overflow >= self._max_overflow 

153 try: 

154 return self._pool.get(wait, self._timeout) 

155 except sqla_queue.Empty: 

156 # don't do things inside of "except Empty", because when we say 

157 # we timed out or can't connect and raise, Python 3 tells 

158 # people the real error is queue.Empty which it isn't. 

159 pass 

160 if use_overflow and self._overflow >= self._max_overflow: 

161 if not wait: 

162 return self._do_get() 

163 else: 

164 raise exc.TimeoutError( 

165 "QueuePool limit of size %d overflow %d reached, " 

166 "connection timed out, timeout %0.2f" 

167 % (self.size(), self.overflow(), self._timeout), 

168 code="3o7r", 

169 ) 

170 

171 if self._inc_overflow(): 

172 try: 

173 return self._create_connection() 

174 except: 

175 with util.safe_reraise(): 

176 self._dec_overflow() 

177 raise 

178 else: 

179 return self._do_get() 

180 

181 def _inc_overflow(self) -> bool: 

182 if self._max_overflow == -1: 

183 self._overflow += 1 

184 return True 

185 with self._overflow_lock: 

186 if self._overflow < self._max_overflow: 

187 self._overflow += 1 

188 return True 

189 else: 

190 return False 

191 

192 def _dec_overflow(self) -> Literal[True]: 

193 if self._max_overflow == -1: 

194 self._overflow -= 1 

195 return True 

196 with self._overflow_lock: 

197 self._overflow -= 1 

198 return True 

199 

200 def recreate(self) -> QueuePool: 

201 self.logger.info("Pool recreating") 

202 return self.__class__( 

203 self._creator, 

204 pool_size=self._pool.maxsize, 

205 max_overflow=self._max_overflow, 

206 pre_ping=self._pre_ping, 

207 use_lifo=self._pool.use_lifo, 

208 timeout=self._timeout, 

209 recycle=self._recycle, 

210 echo=self.echo, 

211 logging_name=self._orig_logging_name, 

212 reset_on_return=self._reset_on_return, 

213 _dispatch=self.dispatch, 

214 dialect=self._dialect, 

215 ) 

216 

217 def dispose(self) -> None: 

218 while True: 

219 try: 

220 conn = self._pool.get(False) 

221 conn.close() 

222 except sqla_queue.Empty: 

223 break 

224 

225 self._overflow = 0 - self.size() 

226 self.logger.info("Pool disposed. %s", self.status()) 

227 

228 def status(self) -> str: 

229 return ( 

230 "Pool size: %d Connections in pool: %d " 

231 "Current Overflow: %d Current Checked out " 

232 "connections: %d" 

233 % ( 

234 self.size(), 

235 self.checkedin(), 

236 self.overflow(), 

237 self.checkedout(), 

238 ) 

239 ) 

240 

241 def size(self) -> int: 

242 return self._pool.maxsize 

243 

244 def timeout(self) -> float: 

245 return self._timeout 

246 

247 def checkedin(self) -> int: 

248 return self._pool.qsize() 

249 

250 def overflow(self) -> int: 

251 return self._overflow if self._pool.maxsize else 0 

252 

253 def checkedout(self) -> int: 

254 return self._pool.maxsize - self._pool.qsize() + self._overflow 

255 

256 

257class AsyncAdaptedQueuePool(QueuePool): 

258 """An asyncio-compatible version of :class:`.QueuePool`. 

259 

260 This pool is used by default when using :class:`.AsyncEngine` engines that 

261 were generated from :func:`_asyncio.create_async_engine`. It uses an 

262 asyncio-compatible queue implementation that does not use 

263 ``threading.Lock``. 

264 

265 The arguments and operation of :class:`.AsyncAdaptedQueuePool` are 

266 otherwise identical to that of :class:`.QueuePool`. 

267 

268 """ 

269 

270 _is_asyncio = True 

271 _queue_class: Type[sqla_queue.QueueCommon[ConnectionPoolEntry]] = ( 

272 sqla_queue.AsyncAdaptedQueue 

273 ) 

274 

275 _dialect = _AsyncConnDialect() 

276 

277 

278class NullPool(Pool): 

279 """A Pool which does not pool connections. 

280 

281 Instead it literally opens and closes the underlying DB-API connection 

282 per each connection open/close. 

283 

284 Reconnect-related functions such as ``recycle`` and connection 

285 invalidation are not supported by this Pool implementation, since 

286 no connections are held persistently. 

287 

288 The :class:`.NullPool` class **is compatible** with asyncio and 

289 :func:`_asyncio.create_async_engine`. 

290 

291 """ 

292 

293 def status(self) -> str: 

294 return "NullPool" 

295 

296 def _do_return_conn(self, record: ConnectionPoolEntry) -> None: 

297 record.close() 

298 

299 def _do_get(self) -> ConnectionPoolEntry: 

300 return self._create_connection() 

301 

302 def recreate(self) -> NullPool: 

303 self.logger.info("Pool recreating") 

304 

305 return self.__class__( 

306 self._creator, 

307 recycle=self._recycle, 

308 echo=self.echo, 

309 logging_name=self._orig_logging_name, 

310 reset_on_return=self._reset_on_return, 

311 pre_ping=self._pre_ping, 

312 _dispatch=self.dispatch, 

313 dialect=self._dialect, 

314 ) 

315 

316 def dispose(self) -> None: 

317 pass 

318 

319 

320class SingletonThreadPool(Pool): 

321 """A Pool that maintains one connection per thread. 

322 

323 Maintains one connection per each thread, never moving a connection to a 

324 thread other than the one which it was created in. 

325 

326 .. warning:: the :class:`.SingletonThreadPool` will call ``.close()`` 

327 on arbitrary connections that exist beyond the size setting of 

328 ``pool_size``, e.g. if more unique **thread identities** 

329 than what ``pool_size`` states are used. This cleanup is 

330 non-deterministic and not sensitive to whether or not the connections 

331 linked to those thread identities are currently in use. 

332 

333 :class:`.SingletonThreadPool` may be improved in a future release, 

334 however in its current status it is generally used only for test 

335 scenarios using a SQLite ``:memory:`` database and is not recommended 

336 for production use. 

337 

338 The :class:`.SingletonThreadPool` class **is not compatible** with asyncio 

339 and :func:`_asyncio.create_async_engine`. 

340 

341 

342 Options are the same as those of :class:`_pool.Pool`, as well as: 

343 

344 :param pool_size: The number of threads in which to maintain connections 

345 at once. Defaults to five. 

346 

347 :class:`.SingletonThreadPool` is used by the SQLite dialect 

348 automatically when a memory-based database is used. 

349 See :ref:`sqlite_toplevel`. 

350 

351 """ 

352 

353 _is_asyncio = False 

354 

355 def __init__( 

356 self, 

357 creator: Union[_CreatorFnType, _CreatorWRecFnType], 

358 pool_size: int = 5, 

359 **kw: Any, 

360 ): 

361 Pool.__init__(self, creator, **kw) 

362 self._conn = threading.local() 

363 self._fairy = threading.local() 

364 self._all_conns: Set[ConnectionPoolEntry] = set() 

365 self.size = pool_size 

366 

367 def recreate(self) -> SingletonThreadPool: 

368 self.logger.info("Pool recreating") 

369 return self.__class__( 

370 self._creator, 

371 pool_size=self.size, 

372 recycle=self._recycle, 

373 echo=self.echo, 

374 pre_ping=self._pre_ping, 

375 logging_name=self._orig_logging_name, 

376 reset_on_return=self._reset_on_return, 

377 _dispatch=self.dispatch, 

378 dialect=self._dialect, 

379 ) 

380 

381 def dispose(self) -> None: 

382 """Dispose of this pool.""" 

383 

384 for conn in self._all_conns: 

385 try: 

386 conn.close() 

387 except Exception: 

388 # pysqlite won't even let you close a conn from a thread 

389 # that didn't create it 

390 pass 

391 

392 self._all_conns.clear() 

393 

394 def _cleanup(self) -> None: 

395 while len(self._all_conns) >= self.size: 

396 c = self._all_conns.pop() 

397 c.close() 

398 

399 def status(self) -> str: 

400 return "SingletonThreadPool id:%d size: %d" % ( 

401 id(self), 

402 len(self._all_conns), 

403 ) 

404 

405 def _do_return_conn(self, record: ConnectionPoolEntry) -> None: 

406 try: 

407 del self._fairy.current 

408 except AttributeError: 

409 pass 

410 

411 def _do_get(self) -> ConnectionPoolEntry: 

412 try: 

413 if TYPE_CHECKING: 

414 c = cast(ConnectionPoolEntry, self._conn.current()) 

415 else: 

416 c = self._conn.current() 

417 if c: 

418 return c 

419 except AttributeError: 

420 pass 

421 c = self._create_connection() 

422 self._conn.current = weakref.ref(c) 

423 if len(self._all_conns) >= self.size: 

424 self._cleanup() 

425 self._all_conns.add(c) 

426 return c 

427 

428 def connect(self) -> PoolProxiedConnection: 

429 # vendored from Pool to include the now removed use_threadlocal 

430 # behavior 

431 try: 

432 rec = cast(_ConnectionFairy, self._fairy.current()) 

433 except AttributeError: 

434 pass 

435 else: 

436 if rec is not None: 

437 return rec._checkout_existing() 

438 

439 return _ConnectionFairy._checkout(self, self._fairy) 

440 

441 

442class StaticPool(Pool): 

443 """A Pool of exactly one connection, used for all requests. 

444 

445 Reconnect-related functions such as ``recycle`` and connection 

446 invalidation (which is also used to support auto-reconnect) are only 

447 partially supported right now and may not yield good results. 

448 

449 The :class:`.StaticPool` class **is compatible** with asyncio and 

450 :func:`_asyncio.create_async_engine`. 

451 

452 """ 

453 

454 @util.memoized_property 

455 def connection(self) -> _ConnectionRecord: 

456 return _ConnectionRecord(self) 

457 

458 def status(self) -> str: 

459 return "StaticPool" 

460 

461 def dispose(self) -> None: 

462 if ( 

463 "connection" in self.__dict__ 

464 and self.connection.dbapi_connection is not None 

465 ): 

466 self.connection.close() 

467 del self.__dict__["connection"] 

468 

469 def recreate(self) -> StaticPool: 

470 self.logger.info("Pool recreating") 

471 return self.__class__( 

472 creator=self._creator, 

473 recycle=self._recycle, 

474 reset_on_return=self._reset_on_return, 

475 pre_ping=self._pre_ping, 

476 echo=self.echo, 

477 logging_name=self._orig_logging_name, 

478 _dispatch=self.dispatch, 

479 dialect=self._dialect, 

480 ) 

481 

482 def _transfer_from(self, other_static_pool: StaticPool) -> None: 

483 # used by the test suite to make a new engine / pool without 

484 # losing the state of an existing SQLite :memory: connection 

485 def creator(rec: ConnectionPoolEntry) -> DBAPIConnection: 

486 conn = other_static_pool.connection.dbapi_connection 

487 assert conn is not None 

488 return conn 

489 

490 self._invoke_creator = creator 

491 

492 def _create_connection(self) -> ConnectionPoolEntry: 

493 raise NotImplementedError() 

494 

495 def _do_return_conn(self, record: ConnectionPoolEntry) -> None: 

496 pass 

497 

498 def _do_get(self) -> ConnectionPoolEntry: 

499 rec = self.connection 

500 if rec._is_hard_or_soft_invalidated(): 

501 del self.__dict__["connection"] 

502 rec = self.connection 

503 

504 return rec 

505 

506 

507class AssertionPool(Pool): 

508 """A :class:`_pool.Pool` that allows at most one checked out connection at 

509 any given time. 

510 

511 This will raise an exception if more than one connection is checked out 

512 at a time. Useful for debugging code that is using more connections 

513 than desired. 

514 

515 The :class:`.AssertionPool` class **is compatible** with asyncio and 

516 :func:`_asyncio.create_async_engine`. 

517 

518 """ 

519 

520 _conn: Optional[ConnectionPoolEntry] 

521 _checkout_traceback: Optional[List[str]] 

522 

523 def __init__(self, *args: Any, **kw: Any): 

524 self._conn = None 

525 self._checked_out = False 

526 self._store_traceback = kw.pop("store_traceback", True) 

527 self._checkout_traceback = None 

528 Pool.__init__(self, *args, **kw) 

529 

530 def status(self) -> str: 

531 return "AssertionPool" 

532 

533 def _do_return_conn(self, record: ConnectionPoolEntry) -> None: 

534 if not self._checked_out: 

535 raise AssertionError("connection is not checked out") 

536 self._checked_out = False 

537 assert record is self._conn 

538 

539 def dispose(self) -> None: 

540 self._checked_out = False 

541 if self._conn: 

542 self._conn.close() 

543 

544 def recreate(self) -> AssertionPool: 

545 self.logger.info("Pool recreating") 

546 return self.__class__( 

547 self._creator, 

548 echo=self.echo, 

549 pre_ping=self._pre_ping, 

550 recycle=self._recycle, 

551 reset_on_return=self._reset_on_return, 

552 logging_name=self._orig_logging_name, 

553 _dispatch=self.dispatch, 

554 dialect=self._dialect, 

555 ) 

556 

557 def _do_get(self) -> ConnectionPoolEntry: 

558 if self._checked_out: 

559 if self._checkout_traceback: 

560 suffix = " at:\n%s" % "".join( 

561 chop_traceback(self._checkout_traceback) 

562 ) 

563 else: 

564 suffix = "" 

565 raise AssertionError("connection is already checked out" + suffix) 

566 

567 if not self._conn: 

568 self._conn = self._create_connection() 

569 

570 self._checked_out = True 

571 if self._store_traceback: 

572 self._checkout_traceback = traceback.format_stack() 

573 return self._conn