Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/impl.py: 45%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

237 statements  

1# pool/impl.py 

2# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8 

9"""Pool implementation classes. 

10 

11""" 

12from __future__ import annotations 

13 

14import threading 

15import traceback 

16import typing 

17from typing import Any 

18from typing import cast 

19from typing import List 

20from typing import Optional 

21from typing import Set 

22from typing import Type 

23from typing import TYPE_CHECKING 

24from typing import Union 

25import weakref 

26 

27from .base import _AsyncConnDialect 

28from .base import _ConnectionFairy 

29from .base import _ConnectionRecord 

30from .base import _CreatorFnType 

31from .base import _CreatorWRecFnType 

32from .base import ConnectionPoolEntry 

33from .base import Pool 

34from .base import PoolProxiedConnection 

35from .. import exc 

36from .. import util 

37from ..util import chop_traceback 

38from ..util import queue as sqla_queue 

39from ..util.typing import Literal 

40 

41if typing.TYPE_CHECKING: 

42 from ..engine.interfaces import DBAPIConnection 

43 

44 

45class QueuePool(Pool): 

46 """A :class:`_pool.Pool` 

47 that imposes a limit on the number of open connections. 

48 

49 :class:`.QueuePool` is the default pooling implementation used for 

50 all :class:`_engine.Engine` objects other than SQLite with a ``:memory:`` 

51 database. 

52 

53 The :class:`.QueuePool` class **is not compatible** with asyncio and 

54 :func:`_asyncio.create_async_engine`. The 

55 :class:`.AsyncAdaptedQueuePool` class is used automatically when 

56 using :func:`_asyncio.create_async_engine`, if no other kind of pool 

57 is specified. 

58 

59 .. seealso:: 

60 

61 :class:`.AsyncAdaptedQueuePool` 

62 

63 """ 

64 

65 _is_asyncio = False # type: ignore[assignment] 

66 

67 _queue_class: Type[sqla_queue.QueueCommon[ConnectionPoolEntry]] = ( 

68 sqla_queue.Queue 

69 ) 

70 

71 _pool: sqla_queue.QueueCommon[ConnectionPoolEntry] 

72 

73 def __init__( 

74 self, 

75 creator: Union[_CreatorFnType, _CreatorWRecFnType], 

76 pool_size: int = 5, 

77 max_overflow: int = 10, 

78 timeout: float = 30.0, 

79 use_lifo: bool = False, 

80 **kw: Any, 

81 ): 

82 r""" 

83 Construct a QueuePool. 

84 

85 :param creator: a callable function that returns a DB-API 

86 connection object, same as that of :paramref:`_pool.Pool.creator`. 

87 

88 :param pool_size: The size of the pool to be maintained, 

89 defaults to 5. This is the largest number of connections that 

90 will be kept persistently in the pool. Note that the pool 

91 begins with no connections; once this number of connections 

92 is requested, that number of connections will remain. 

93 ``pool_size`` can be set to 0 to indicate no size limit; to 

94 disable pooling, use a :class:`~sqlalchemy.pool.NullPool` 

95 instead. 

96 

97 :param max_overflow: The maximum overflow size of the 

98 pool. When the number of checked-out connections reaches the 

99 size set in pool_size, additional connections will be 

100 returned up to this limit. When those additional connections 

101 are returned to the pool, they are disconnected and 

102 discarded. It follows then that the total number of 

103 simultaneous connections the pool will allow is pool_size + 

104 `max_overflow`, and the total number of "sleeping" 

105 connections the pool will allow is pool_size. `max_overflow` 

106 can be set to -1 to indicate no overflow limit; no limit 

107 will be placed on the total number of concurrent 

108 connections. Defaults to 10. 

109 

110 :param timeout: The number of seconds to wait before giving up 

111 on returning a connection. Defaults to 30.0. This can be a float 

112 but is subject to the limitations of Python time functions which 

113 may not be reliable in the tens of milliseconds. 

114 

115 :param use_lifo: use LIFO (last-in-first-out) when retrieving 

116 connections instead of FIFO (first-in-first-out). Using LIFO, a 

117 server-side timeout scheme can reduce the number of connections used 

118 during non-peak periods of use. When planning for server-side 

119 timeouts, ensure that a recycle or pre-ping strategy is in use to 

120 gracefully handle stale connections. 

121 

122 .. versionadded:: 1.3 

123 

124 .. seealso:: 

125 

126 :ref:`pool_use_lifo` 

127 

128 :ref:`pool_disconnects` 

129 

130 :param \**kw: Other keyword arguments including 

131 :paramref:`_pool.Pool.recycle`, :paramref:`_pool.Pool.echo`, 

132 :paramref:`_pool.Pool.reset_on_return` and others are passed to the 

133 :class:`_pool.Pool` constructor. 

134 

135 """ 

136 

137 Pool.__init__(self, creator, **kw) 

138 self._pool = self._queue_class(pool_size, use_lifo=use_lifo) 

139 self._overflow = 0 - pool_size 

140 self._max_overflow = -1 if pool_size == 0 else max_overflow 

141 self._timeout = timeout 

142 self._overflow_lock = threading.Lock() 

143 

144 def _do_return_conn(self, record: ConnectionPoolEntry) -> None: 

145 try: 

146 self._pool.put(record, False) 

147 except sqla_queue.Full: 

148 try: 

149 record.close() 

150 finally: 

151 self._dec_overflow() 

152 

153 def _do_get(self) -> ConnectionPoolEntry: 

154 use_overflow = self._max_overflow > -1 

155 

156 wait = use_overflow and self._overflow >= self._max_overflow 

157 try: 

158 return self._pool.get(wait, self._timeout) 

159 except sqla_queue.Empty: 

160 # don't do things inside of "except Empty", because when we say 

161 # we timed out or can't connect and raise, Python 3 tells 

162 # people the real error is queue.Empty which it isn't. 

163 pass 

164 if use_overflow and self._overflow >= self._max_overflow: 

165 if not wait: 

166 return self._do_get() 

167 else: 

168 raise exc.TimeoutError( 

169 "QueuePool limit of size %d overflow %d reached, " 

170 "connection timed out, timeout %0.2f" 

171 % (self.size(), self.overflow(), self._timeout), 

172 code="3o7r", 

173 ) 

174 

175 if self._inc_overflow(): 

176 try: 

177 return self._create_connection() 

178 except: 

179 with util.safe_reraise(): 

180 self._dec_overflow() 

181 raise 

182 else: 

183 return self._do_get() 

184 

185 def _inc_overflow(self) -> bool: 

186 if self._max_overflow == -1: 

187 self._overflow += 1 

188 return True 

189 with self._overflow_lock: 

190 if self._overflow < self._max_overflow: 

191 self._overflow += 1 

192 return True 

193 else: 

194 return False 

195 

196 def _dec_overflow(self) -> Literal[True]: 

197 if self._max_overflow == -1: 

198 self._overflow -= 1 

199 return True 

200 with self._overflow_lock: 

201 self._overflow -= 1 

202 return True 

203 

204 def recreate(self) -> QueuePool: 

205 self.logger.info("Pool recreating") 

206 return self.__class__( 

207 self._creator, 

208 pool_size=self._pool.maxsize, 

209 max_overflow=self._max_overflow, 

210 pre_ping=self._pre_ping, 

211 use_lifo=self._pool.use_lifo, 

212 timeout=self._timeout, 

213 recycle=self._recycle, 

214 echo=self.echo, 

215 logging_name=self._orig_logging_name, 

216 reset_on_return=self._reset_on_return, 

217 _dispatch=self.dispatch, 

218 dialect=self._dialect, 

219 ) 

220 

221 def dispose(self) -> None: 

222 while True: 

223 try: 

224 conn = self._pool.get(False) 

225 conn.close() 

226 except sqla_queue.Empty: 

227 break 

228 

229 self._overflow = 0 - self.size() 

230 self.logger.info("Pool disposed. %s", self.status()) 

231 

232 def status(self) -> str: 

233 return ( 

234 "Pool size: %d Connections in pool: %d " 

235 "Current Overflow: %d Current Checked out " 

236 "connections: %d" 

237 % ( 

238 self.size(), 

239 self.checkedin(), 

240 self.overflow(), 

241 self.checkedout(), 

242 ) 

243 ) 

244 

245 def size(self) -> int: 

246 return self._pool.maxsize 

247 

248 def timeout(self) -> float: 

249 return self._timeout 

250 

251 def checkedin(self) -> int: 

252 return self._pool.qsize() 

253 

254 def overflow(self) -> int: 

255 return self._overflow if self._pool.maxsize else 0 

256 

257 def checkedout(self) -> int: 

258 return self._pool.maxsize - self._pool.qsize() + self._overflow 

259 

260 

261class AsyncAdaptedQueuePool(QueuePool): 

262 """An asyncio-compatible version of :class:`.QueuePool`. 

263 

264 This pool is used by default when using :class:`.AsyncEngine` engines that 

265 were generated from :func:`_asyncio.create_async_engine`. It uses an 

266 asyncio-compatible queue implementation that does not use 

267 ``threading.Lock``. 

268 

269 The arguments and operation of :class:`.AsyncAdaptedQueuePool` are 

270 otherwise identical to that of :class:`.QueuePool`. 

271 

272 """ 

273 

274 _is_asyncio = True # type: ignore[assignment] 

275 _queue_class: Type[sqla_queue.QueueCommon[ConnectionPoolEntry]] = ( 

276 sqla_queue.AsyncAdaptedQueue 

277 ) 

278 

279 _dialect = _AsyncConnDialect() 

280 

281 

282class NullPool(Pool): 

283 """A Pool which does not pool connections. 

284 

285 Instead it literally opens and closes the underlying DB-API connection 

286 per each connection open/close. 

287 

288 Reconnect-related functions such as ``recycle`` and connection 

289 invalidation are not supported by this Pool implementation, since 

290 no connections are held persistently. 

291 

292 The :class:`.NullPool` class **is compatible** with asyncio and 

293 :func:`_asyncio.create_async_engine`. 

294 

295 """ 

296 

297 def status(self) -> str: 

298 return "NullPool" 

299 

300 def _do_return_conn(self, record: ConnectionPoolEntry) -> None: 

301 record.close() 

302 

303 def _do_get(self) -> ConnectionPoolEntry: 

304 return self._create_connection() 

305 

306 def recreate(self) -> NullPool: 

307 self.logger.info("Pool recreating") 

308 

309 return self.__class__( 

310 self._creator, 

311 recycle=self._recycle, 

312 echo=self.echo, 

313 logging_name=self._orig_logging_name, 

314 reset_on_return=self._reset_on_return, 

315 pre_ping=self._pre_ping, 

316 _dispatch=self.dispatch, 

317 dialect=self._dialect, 

318 ) 

319 

320 def dispose(self) -> None: 

321 pass 

322 

323 

324class SingletonThreadPool(Pool): 

325 """A Pool that maintains one connection per thread. 

326 

327 Maintains one connection per each thread, never moving a connection to a 

328 thread other than the one which it was created in. 

329 

330 .. warning:: the :class:`.SingletonThreadPool` will call ``.close()`` 

331 on arbitrary connections that exist beyond the size setting of 

332 ``pool_size``, e.g. if more unique **thread identities** 

333 than what ``pool_size`` states are used. This cleanup is 

334 non-deterministic and not sensitive to whether or not the connections 

335 linked to those thread identities are currently in use. 

336 

337 :class:`.SingletonThreadPool` may be improved in a future release, 

338 however in its current status it is generally used only for test 

339 scenarios using a SQLite ``:memory:`` database and is not recommended 

340 for production use. 

341 

342 The :class:`.SingletonThreadPool` class **is not compatible** with asyncio 

343 and :func:`_asyncio.create_async_engine`. 

344 

345 

346 Options are the same as those of :class:`_pool.Pool`, as well as: 

347 

348 :param pool_size: The number of threads in which to maintain connections 

349 at once. Defaults to five. 

350 

351 :class:`.SingletonThreadPool` is used by the SQLite dialect 

352 automatically when a memory-based database is used. 

353 See :ref:`sqlite_toplevel`. 

354 

355 """ 

356 

357 _is_asyncio = False # type: ignore[assignment] 

358 

359 def __init__( 

360 self, 

361 creator: Union[_CreatorFnType, _CreatorWRecFnType], 

362 pool_size: int = 5, 

363 **kw: Any, 

364 ): 

365 Pool.__init__(self, creator, **kw) 

366 self._conn = threading.local() 

367 self._fairy = threading.local() 

368 self._all_conns: Set[ConnectionPoolEntry] = set() 

369 self.size = pool_size 

370 

371 def recreate(self) -> SingletonThreadPool: 

372 self.logger.info("Pool recreating") 

373 return self.__class__( 

374 self._creator, 

375 pool_size=self.size, 

376 recycle=self._recycle, 

377 echo=self.echo, 

378 pre_ping=self._pre_ping, 

379 logging_name=self._orig_logging_name, 

380 reset_on_return=self._reset_on_return, 

381 _dispatch=self.dispatch, 

382 dialect=self._dialect, 

383 ) 

384 

385 def dispose(self) -> None: 

386 """Dispose of this pool.""" 

387 

388 for conn in self._all_conns: 

389 try: 

390 conn.close() 

391 except Exception: 

392 # pysqlite won't even let you close a conn from a thread 

393 # that didn't create it 

394 pass 

395 

396 self._all_conns.clear() 

397 

398 def _cleanup(self) -> None: 

399 while len(self._all_conns) >= self.size: 

400 c = self._all_conns.pop() 

401 c.close() 

402 

403 def status(self) -> str: 

404 return "SingletonThreadPool id:%d size: %d" % ( 

405 id(self), 

406 len(self._all_conns), 

407 ) 

408 

409 def _do_return_conn(self, record: ConnectionPoolEntry) -> None: 

410 try: 

411 del self._fairy.current 

412 except AttributeError: 

413 pass 

414 

415 def _do_get(self) -> ConnectionPoolEntry: 

416 try: 

417 if TYPE_CHECKING: 

418 c = cast(ConnectionPoolEntry, self._conn.current()) 

419 else: 

420 c = self._conn.current() 

421 if c: 

422 return c 

423 except AttributeError: 

424 pass 

425 c = self._create_connection() 

426 self._conn.current = weakref.ref(c) 

427 if len(self._all_conns) >= self.size: 

428 self._cleanup() 

429 self._all_conns.add(c) 

430 return c 

431 

432 def connect(self) -> PoolProxiedConnection: 

433 # vendored from Pool to include the now removed use_threadlocal 

434 # behavior 

435 try: 

436 rec = cast(_ConnectionFairy, self._fairy.current()) 

437 except AttributeError: 

438 pass 

439 else: 

440 if rec is not None: 

441 return rec._checkout_existing() 

442 

443 return _ConnectionFairy._checkout(self, self._fairy) 

444 

445 

446class StaticPool(Pool): 

447 """A Pool of exactly one connection, used for all requests. 

448 

449 Reconnect-related functions such as ``recycle`` and connection 

450 invalidation (which is also used to support auto-reconnect) are only 

451 partially supported right now and may not yield good results. 

452 

453 The :class:`.StaticPool` class **is compatible** with asyncio and 

454 :func:`_asyncio.create_async_engine`. 

455 

456 """ 

457 

458 @util.memoized_property 

459 def connection(self) -> _ConnectionRecord: 

460 return _ConnectionRecord(self) 

461 

462 def status(self) -> str: 

463 return "StaticPool" 

464 

465 def dispose(self) -> None: 

466 if ( 

467 "connection" in self.__dict__ 

468 and self.connection.dbapi_connection is not None 

469 ): 

470 self.connection.close() 

471 del self.__dict__["connection"] 

472 

473 def recreate(self) -> StaticPool: 

474 self.logger.info("Pool recreating") 

475 return self.__class__( 

476 creator=self._creator, 

477 recycle=self._recycle, 

478 reset_on_return=self._reset_on_return, 

479 pre_ping=self._pre_ping, 

480 echo=self.echo, 

481 logging_name=self._orig_logging_name, 

482 _dispatch=self.dispatch, 

483 dialect=self._dialect, 

484 ) 

485 

486 def _transfer_from(self, other_static_pool: StaticPool) -> None: 

487 # used by the test suite to make a new engine / pool without 

488 # losing the state of an existing SQLite :memory: connection 

489 def creator(rec: ConnectionPoolEntry) -> DBAPIConnection: 

490 conn = other_static_pool.connection.dbapi_connection 

491 assert conn is not None 

492 return conn 

493 

494 self._invoke_creator = creator 

495 

496 def _create_connection(self) -> ConnectionPoolEntry: 

497 raise NotImplementedError() 

498 

499 def _do_return_conn(self, record: ConnectionPoolEntry) -> None: 

500 pass 

501 

502 def _do_get(self) -> ConnectionPoolEntry: 

503 rec = self.connection 

504 if rec._is_hard_or_soft_invalidated(): 

505 del self.__dict__["connection"] 

506 rec = self.connection 

507 

508 return rec 

509 

510 

511class AssertionPool(Pool): 

512 """A :class:`_pool.Pool` that allows at most one checked out connection at 

513 any given time. 

514 

515 This will raise an exception if more than one connection is checked out 

516 at a time. Useful for debugging code that is using more connections 

517 than desired. 

518 

519 The :class:`.AssertionPool` class **is compatible** with asyncio and 

520 :func:`_asyncio.create_async_engine`. 

521 

522 """ 

523 

524 _conn: Optional[ConnectionPoolEntry] 

525 _checkout_traceback: Optional[List[str]] 

526 

527 def __init__(self, *args: Any, **kw: Any): 

528 self._conn = None 

529 self._checked_out = False 

530 self._store_traceback = kw.pop("store_traceback", True) 

531 self._checkout_traceback = None 

532 Pool.__init__(self, *args, **kw) 

533 

534 def status(self) -> str: 

535 return "AssertionPool" 

536 

537 def _do_return_conn(self, record: ConnectionPoolEntry) -> None: 

538 if not self._checked_out: 

539 raise AssertionError("connection is not checked out") 

540 self._checked_out = False 

541 assert record is self._conn 

542 

543 def dispose(self) -> None: 

544 self._checked_out = False 

545 if self._conn: 

546 self._conn.close() 

547 

548 def recreate(self) -> AssertionPool: 

549 self.logger.info("Pool recreating") 

550 return self.__class__( 

551 self._creator, 

552 echo=self.echo, 

553 pre_ping=self._pre_ping, 

554 recycle=self._recycle, 

555 reset_on_return=self._reset_on_return, 

556 logging_name=self._orig_logging_name, 

557 _dispatch=self.dispatch, 

558 dialect=self._dialect, 

559 ) 

560 

561 def _do_get(self) -> ConnectionPoolEntry: 

562 if self._checked_out: 

563 if self._checkout_traceback: 

564 suffix = " at:\n%s" % "".join( 

565 chop_traceback(self._checkout_traceback) 

566 ) 

567 else: 

568 suffix = "" 

569 raise AssertionError("connection is already checked out" + suffix) 

570 

571 if not self._conn: 

572 self._conn = self._create_connection() 

573 

574 self._checked_out = True 

575 if self._store_traceback: 

576 self._checkout_traceback = traceback.format_stack() 

577 return self._conn