Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/pool/impl.py: 55%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

241 statements  

1# pool/impl.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8 

9"""Pool implementation classes.""" 

10from __future__ import annotations 

11 

12import threading 

13import traceback 

14import typing 

15from typing import Any 

16from typing import cast 

17from typing import List 

18from typing import Optional 

19from typing import Set 

20from typing import Type 

21from typing import TYPE_CHECKING 

22from typing import Union 

23import weakref 

24 

25from .base import _AsyncConnDialect 

26from .base import _ConnectionFairy 

27from .base import _ConnectionRecord 

28from .base import _CreatorFnType 

29from .base import _CreatorWRecFnType 

30from .base import ConnectionPoolEntry 

31from .base import Pool 

32from .base import PoolProxiedConnection 

33from .. import exc 

34from .. import util 

35from ..util import chop_traceback 

36from ..util import queue as sqla_queue 

37from ..util.typing import Literal 

38 

39if typing.TYPE_CHECKING: 

40 from ..engine.interfaces import DBAPIConnection 

41 

42 

43class QueuePool(Pool): 

44 """A :class:`_pool.Pool` 

45 that imposes a limit on the number of open connections. 

46 

47 :class:`.QueuePool` is the default pooling implementation used for 

48 all :class:`_engine.Engine` objects other than SQLite with a ``:memory:`` 

49 database. 

50 

51 The :class:`.QueuePool` class **is not compatible** with asyncio and 

52 :func:`_asyncio.create_async_engine`. The 

53 :class:`.AsyncAdaptedQueuePool` class is used automatically when 

54 using :func:`_asyncio.create_async_engine`, if no other kind of pool 

55 is specified. 

56 

57 .. seealso:: 

58 

59 :class:`.AsyncAdaptedQueuePool` 

60 

61 """ 

62 

63 _is_asyncio = False 

64 

65 _queue_class: Type[sqla_queue.QueueCommon[ConnectionPoolEntry]] = ( 

66 sqla_queue.Queue 

67 ) 

68 

69 _pool: sqla_queue.QueueCommon[ConnectionPoolEntry] 

70 

71 def __init__( 

72 self, 

73 creator: Union[_CreatorFnType, _CreatorWRecFnType], 

74 pool_size: int = 5, 

75 max_overflow: int = 10, 

76 timeout: float = 30.0, 

77 use_lifo: bool = False, 

78 **kw: Any, 

79 ): 

80 r""" 

81 Construct a QueuePool. 

82 

83 :param creator: a callable function that returns a DB-API 

84 connection object, same as that of :paramref:`_pool.Pool.creator`. 

85 

86 :param pool_size: The size of the pool to be maintained, 

87 defaults to 5. This is the largest number of connections that 

88 will be kept persistently in the pool. Note that the pool 

89 begins with no connections; once this number of connections 

90 is requested, that number of connections will remain. 

91 ``pool_size`` can be set to 0 to indicate no size limit; to 

92 disable pooling, use a :class:`~sqlalchemy.pool.NullPool` 

93 instead. 

94 

95 :param max_overflow: The maximum overflow size of the 

96 pool. When the number of checked-out connections reaches the 

97 size set in pool_size, additional connections will be 

98 returned up to this limit. When those additional connections 

99 are returned to the pool, they are disconnected and 

100 discarded. It follows then that the total number of 

101 simultaneous connections the pool will allow is pool_size + 

102 `max_overflow`, and the total number of "sleeping" 

103 connections the pool will allow is pool_size. `max_overflow` 

104 can be set to -1 to indicate no overflow limit; no limit 

105 will be placed on the total number of concurrent 

106 connections. Defaults to 10. 

107 

108 :param timeout: The number of seconds to wait before giving up 

109 on returning a connection. Defaults to 30.0. This can be a float 

110 but is subject to the limitations of Python time functions which 

111 may not be reliable in the tens of milliseconds. 

112 

113 :param use_lifo: use LIFO (last-in-first-out) when retrieving 

114 connections instead of FIFO (first-in-first-out). Using LIFO, a 

115 server-side timeout scheme can reduce the number of connections used 

116 during non-peak periods of use. When planning for server-side 

117 timeouts, ensure that a recycle or pre-ping strategy is in use to 

118 gracefully handle stale connections. 

119 

120 .. versionadded:: 1.3 

121 

122 .. seealso:: 

123 

124 :ref:`pool_use_lifo` 

125 

126 :ref:`pool_disconnects` 

127 

128 :param \**kw: Other keyword arguments including 

129 :paramref:`_pool.Pool.recycle`, :paramref:`_pool.Pool.echo`, 

130 :paramref:`_pool.Pool.reset_on_return` and others are passed to the 

131 :class:`_pool.Pool` constructor. 

132 

133 """ 

134 

135 Pool.__init__(self, creator, **kw) 

136 self._pool = self._queue_class(pool_size, use_lifo=use_lifo) 

137 self._overflow = 0 - pool_size 

138 self._max_overflow = -1 if pool_size == 0 else max_overflow 

139 self._timeout = timeout 

140 self._overflow_lock = threading.Lock() 

141 

142 def _do_return_conn(self, record: ConnectionPoolEntry) -> None: 

143 try: 

144 self._pool.put(record, False) 

145 except sqla_queue.Full: 

146 try: 

147 record.close() 

148 finally: 

149 self._dec_overflow() 

150 

151 def _do_get(self) -> ConnectionPoolEntry: 

152 use_overflow = self._max_overflow > -1 

153 

154 wait = use_overflow and self._overflow >= self._max_overflow 

155 try: 

156 return self._pool.get(wait, self._timeout) 

157 except sqla_queue.Empty: 

158 # don't do things inside of "except Empty", because when we say 

159 # we timed out or can't connect and raise, Python 3 tells 

160 # people the real error is queue.Empty which it isn't. 

161 pass 

162 if use_overflow and self._overflow >= self._max_overflow: 

163 if not wait: 

164 return self._do_get() 

165 else: 

166 raise exc.TimeoutError( 

167 "QueuePool limit of size %d overflow %d reached, " 

168 "connection timed out, timeout %0.2f" 

169 % (self.size(), self.overflow(), self._timeout), 

170 code="3o7r", 

171 ) 

172 

173 if self._inc_overflow(): 

174 try: 

175 return self._create_connection() 

176 except: 

177 with util.safe_reraise(): 

178 self._dec_overflow() 

179 raise 

180 else: 

181 return self._do_get() 

182 

183 def _inc_overflow(self) -> bool: 

184 if self._max_overflow == -1: 

185 self._overflow += 1 

186 return True 

187 with self._overflow_lock: 

188 if self._overflow < self._max_overflow: 

189 self._overflow += 1 

190 return True 

191 else: 

192 return False 

193 

194 def _dec_overflow(self) -> Literal[True]: 

195 if self._max_overflow == -1: 

196 self._overflow -= 1 

197 return True 

198 with self._overflow_lock: 

199 self._overflow -= 1 

200 return True 

201 

202 def recreate(self) -> QueuePool: 

203 self.logger.info("Pool recreating") 

204 return self.__class__( 

205 self._creator, 

206 pool_size=self._pool.maxsize, 

207 max_overflow=self._max_overflow, 

208 pre_ping=self._pre_ping, 

209 use_lifo=self._pool.use_lifo, 

210 timeout=self._timeout, 

211 recycle=self._recycle, 

212 echo=self.echo, 

213 logging_name=self._orig_logging_name, 

214 reset_on_return=self._reset_on_return, 

215 _dispatch=self.dispatch, 

216 dialect=self._dialect, 

217 ) 

218 

219 def dispose(self) -> None: 

220 while True: 

221 try: 

222 conn = self._pool.get(False) 

223 conn.close() 

224 except sqla_queue.Empty: 

225 break 

226 

227 self._overflow = 0 - self.size() 

228 self.logger.info("Pool disposed. %s", self.status()) 

229 

230 def status(self) -> str: 

231 return ( 

232 "Pool size: %d Connections in pool: %d " 

233 "Current Overflow: %d Current Checked out " 

234 "connections: %d" 

235 % ( 

236 self.size(), 

237 self.checkedin(), 

238 self.overflow(), 

239 self.checkedout(), 

240 ) 

241 ) 

242 

243 def size(self) -> int: 

244 return self._pool.maxsize 

245 

246 def timeout(self) -> float: 

247 return self._timeout 

248 

249 def checkedin(self) -> int: 

250 return self._pool.qsize() 

251 

252 def overflow(self) -> int: 

253 return self._overflow if self._pool.maxsize else 0 

254 

255 def checkedout(self) -> int: 

256 return self._pool.maxsize - self._pool.qsize() + self._overflow 

257 

258 

259class AsyncAdaptedQueuePool(QueuePool): 

260 """An asyncio-compatible version of :class:`.QueuePool`. 

261 

262 This pool is used by default when using :class:`.AsyncEngine` engines that 

263 were generated from :func:`_asyncio.create_async_engine`. It uses an 

264 asyncio-compatible queue implementation that does not use 

265 ``threading.Lock``. 

266 

267 The arguments and operation of :class:`.AsyncAdaptedQueuePool` are 

268 otherwise identical to that of :class:`.QueuePool`. 

269 

270 """ 

271 

272 _is_asyncio = True 

273 _queue_class: Type[sqla_queue.QueueCommon[ConnectionPoolEntry]] = ( 

274 sqla_queue.AsyncAdaptedQueue 

275 ) 

276 

277 _dialect = _AsyncConnDialect() 

278 

279 

280class FallbackAsyncAdaptedQueuePool(AsyncAdaptedQueuePool): 

281 _queue_class = sqla_queue.FallbackAsyncAdaptedQueue # type: ignore[assignment] # noqa: E501 

282 

283 

284class NullPool(Pool): 

285 """A Pool which does not pool connections. 

286 

287 Instead it literally opens and closes the underlying DB-API connection 

288 per each connection open/close. 

289 

290 Reconnect-related functions such as ``recycle`` and connection 

291 invalidation are not supported by this Pool implementation, since 

292 no connections are held persistently. 

293 

294 The :class:`.NullPool` class **is compatible** with asyncio and 

295 :func:`_asyncio.create_async_engine`. 

296 

297 """ 

298 

299 def status(self) -> str: 

300 return "NullPool" 

301 

302 def _do_return_conn(self, record: ConnectionPoolEntry) -> None: 

303 record.close() 

304 

305 def _do_get(self) -> ConnectionPoolEntry: 

306 return self._create_connection() 

307 

308 def recreate(self) -> NullPool: 

309 self.logger.info("Pool recreating") 

310 

311 return self.__class__( 

312 self._creator, 

313 recycle=self._recycle, 

314 echo=self.echo, 

315 logging_name=self._orig_logging_name, 

316 reset_on_return=self._reset_on_return, 

317 pre_ping=self._pre_ping, 

318 _dispatch=self.dispatch, 

319 dialect=self._dialect, 

320 ) 

321 

322 def dispose(self) -> None: 

323 pass 

324 

325 

326class SingletonThreadPool(Pool): 

327 """A Pool that maintains one connection per thread. 

328 

329 Maintains one connection per each thread, never moving a connection to a 

330 thread other than the one which it was created in. 

331 

332 .. warning:: the :class:`.SingletonThreadPool` will call ``.close()`` 

333 on arbitrary connections that exist beyond the size setting of 

334 ``pool_size``, e.g. if more unique **thread identities** 

335 than what ``pool_size`` states are used. This cleanup is 

336 non-deterministic and not sensitive to whether or not the connections 

337 linked to those thread identities are currently in use. 

338 

339 :class:`.SingletonThreadPool` may be improved in a future release, 

340 however in its current status it is generally used only for test 

341 scenarios using a SQLite ``:memory:`` database and is not recommended 

342 for production use. 

343 

344 The :class:`.SingletonThreadPool` class **is not compatible** with asyncio 

345 and :func:`_asyncio.create_async_engine`. 

346 

347 

348 Options are the same as those of :class:`_pool.Pool`, as well as: 

349 

350 :param pool_size: The number of threads in which to maintain connections 

351 at once. Defaults to five. 

352 

353 :class:`.SingletonThreadPool` is used by the SQLite dialect 

354 automatically when a memory-based database is used. 

355 See :ref:`sqlite_toplevel`. 

356 

357 """ 

358 

359 _is_asyncio = False 

360 

361 def __init__( 

362 self, 

363 creator: Union[_CreatorFnType, _CreatorWRecFnType], 

364 pool_size: int = 5, 

365 **kw: Any, 

366 ): 

367 Pool.__init__(self, creator, **kw) 

368 self._conn = threading.local() 

369 self._fairy = threading.local() 

370 self._all_conns: Set[ConnectionPoolEntry] = set() 

371 self.size = pool_size 

372 

373 def recreate(self) -> SingletonThreadPool: 

374 self.logger.info("Pool recreating") 

375 return self.__class__( 

376 self._creator, 

377 pool_size=self.size, 

378 recycle=self._recycle, 

379 echo=self.echo, 

380 pre_ping=self._pre_ping, 

381 logging_name=self._orig_logging_name, 

382 reset_on_return=self._reset_on_return, 

383 _dispatch=self.dispatch, 

384 dialect=self._dialect, 

385 ) 

386 

387 def dispose(self) -> None: 

388 """Dispose of this pool.""" 

389 

390 for conn in self._all_conns: 

391 try: 

392 conn.close() 

393 except Exception: 

394 # pysqlite won't even let you close a conn from a thread 

395 # that didn't create it 

396 pass 

397 

398 self._all_conns.clear() 

399 

400 def _cleanup(self) -> None: 

401 while len(self._all_conns) >= self.size: 

402 c = self._all_conns.pop() 

403 c.close() 

404 

405 def status(self) -> str: 

406 return "SingletonThreadPool id:%d size: %d" % ( 

407 id(self), 

408 len(self._all_conns), 

409 ) 

410 

411 def _do_return_conn(self, record: ConnectionPoolEntry) -> None: 

412 try: 

413 del self._fairy.current 

414 except AttributeError: 

415 pass 

416 

417 def _do_get(self) -> ConnectionPoolEntry: 

418 try: 

419 if TYPE_CHECKING: 

420 c = cast(ConnectionPoolEntry, self._conn.current()) 

421 else: 

422 c = self._conn.current() 

423 if c: 

424 return c 

425 except AttributeError: 

426 pass 

427 c = self._create_connection() 

428 self._conn.current = weakref.ref(c) 

429 if len(self._all_conns) >= self.size: 

430 self._cleanup() 

431 self._all_conns.add(c) 

432 return c 

433 

434 def connect(self) -> PoolProxiedConnection: 

435 # vendored from Pool to include the now removed use_threadlocal 

436 # behavior 

437 try: 

438 rec = cast(_ConnectionFairy, self._fairy.current()) 

439 except AttributeError: 

440 pass 

441 else: 

442 if rec is not None: 

443 return rec._checkout_existing() 

444 

445 return _ConnectionFairy._checkout(self, self._fairy) 

446 

447 

448class StaticPool(Pool): 

449 """A Pool of exactly one connection, used for all requests. 

450 

451 Reconnect-related functions such as ``recycle`` and connection 

452 invalidation (which is also used to support auto-reconnect) are only 

453 partially supported right now and may not yield good results. 

454 

455 The :class:`.StaticPool` class **is compatible** with asyncio and 

456 :func:`_asyncio.create_async_engine`. 

457 

458 """ 

459 

460 @util.memoized_property 

461 def connection(self) -> _ConnectionRecord: 

462 return _ConnectionRecord(self) 

463 

464 def status(self) -> str: 

465 return "StaticPool" 

466 

467 def dispose(self) -> None: 

468 if ( 

469 "connection" in self.__dict__ 

470 and self.connection.dbapi_connection is not None 

471 ): 

472 self.connection.close() 

473 del self.__dict__["connection"] 

474 

475 def recreate(self) -> StaticPool: 

476 self.logger.info("Pool recreating") 

477 return self.__class__( 

478 creator=self._creator, 

479 recycle=self._recycle, 

480 reset_on_return=self._reset_on_return, 

481 pre_ping=self._pre_ping, 

482 echo=self.echo, 

483 logging_name=self._orig_logging_name, 

484 _dispatch=self.dispatch, 

485 dialect=self._dialect, 

486 ) 

487 

488 def _transfer_from(self, other_static_pool: StaticPool) -> None: 

489 # used by the test suite to make a new engine / pool without 

490 # losing the state of an existing SQLite :memory: connection 

491 def creator(rec: ConnectionPoolEntry) -> DBAPIConnection: 

492 conn = other_static_pool.connection.dbapi_connection 

493 assert conn is not None 

494 return conn 

495 

496 self._invoke_creator = creator 

497 

498 def _create_connection(self) -> ConnectionPoolEntry: 

499 raise NotImplementedError() 

500 

501 def _do_return_conn(self, record: ConnectionPoolEntry) -> None: 

502 pass 

503 

504 def _do_get(self) -> ConnectionPoolEntry: 

505 rec = self.connection 

506 if rec._is_hard_or_soft_invalidated(): 

507 del self.__dict__["connection"] 

508 rec = self.connection 

509 

510 return rec 

511 

512 

513class AssertionPool(Pool): 

514 """A :class:`_pool.Pool` that allows at most one checked out connection at 

515 any given time. 

516 

517 This will raise an exception if more than one connection is checked out 

518 at a time. Useful for debugging code that is using more connections 

519 than desired. 

520 

521 The :class:`.AssertionPool` class **is compatible** with asyncio and 

522 :func:`_asyncio.create_async_engine`. 

523 

524 """ 

525 

526 _conn: Optional[ConnectionPoolEntry] 

527 _checkout_traceback: Optional[List[str]] 

528 

529 def __init__(self, *args: Any, **kw: Any): 

530 self._conn = None 

531 self._checked_out = False 

532 self._store_traceback = kw.pop("store_traceback", True) 

533 self._checkout_traceback = None 

534 Pool.__init__(self, *args, **kw) 

535 

536 def status(self) -> str: 

537 return "AssertionPool" 

538 

539 def _do_return_conn(self, record: ConnectionPoolEntry) -> None: 

540 if not self._checked_out: 

541 raise AssertionError("connection is not checked out") 

542 self._checked_out = False 

543 assert record is self._conn 

544 

545 def dispose(self) -> None: 

546 self._checked_out = False 

547 if self._conn: 

548 self._conn.close() 

549 

550 def recreate(self) -> AssertionPool: 

551 self.logger.info("Pool recreating") 

552 return self.__class__( 

553 self._creator, 

554 echo=self.echo, 

555 pre_ping=self._pre_ping, 

556 recycle=self._recycle, 

557 reset_on_return=self._reset_on_return, 

558 logging_name=self._orig_logging_name, 

559 _dispatch=self.dispatch, 

560 dialect=self._dialect, 

561 ) 

562 

563 def _do_get(self) -> ConnectionPoolEntry: 

564 if self._checked_out: 

565 if self._checkout_traceback: 

566 suffix = " at:\n%s" % "".join( 

567 chop_traceback(self._checkout_traceback) 

568 ) 

569 else: 

570 suffix = "" 

571 raise AssertionError("connection is already checked out" + suffix) 

572 

573 if not self._conn: 

574 self._conn = self._create_connection() 

575 

576 self._checked_out = True 

577 if self._store_traceback: 

578 self._checkout_traceback = traceback.format_stack() 

579 return self._conn