Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/pool/impl.py: 54%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# pool/impl.py
2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
9"""Pool implementation classes."""
11from __future__ import annotations
13import threading
14import traceback
15import typing
16from typing import Any
17from typing import cast
18from typing import List
19from typing import Optional
20from typing import Set
21from typing import Type
22from typing import TYPE_CHECKING
23from typing import Union
24import weakref
26from .base import _AsyncConnDialect
27from .base import _ConnectionFairy
28from .base import _ConnectionRecord
29from .base import _CreatorFnType
30from .base import _CreatorWRecFnType
31from .base import ConnectionPoolEntry
32from .base import Pool
33from .base import PoolProxiedConnection
34from .. import exc
35from .. import util
36from ..util import chop_traceback
37from ..util import queue as sqla_queue
38from ..util.typing import Literal
40if typing.TYPE_CHECKING:
41 from ..engine.interfaces import DBAPIConnection
44class QueuePool(Pool):
45 """A :class:`_pool.Pool`
46 that imposes a limit on the number of open connections.
48 :class:`.QueuePool` is the default pooling implementation used for
49 all :class:`_engine.Engine` objects other than SQLite with a ``:memory:``
50 database.
52 The :class:`.QueuePool` class **is not compatible** with asyncio and
53 :func:`_asyncio.create_async_engine`. The
54 :class:`.AsyncAdaptedQueuePool` class is used automatically when
55 using :func:`_asyncio.create_async_engine`, if no other kind of pool
56 is specified.
58 .. seealso::
60 :class:`.AsyncAdaptedQueuePool`
62 """
64 _is_asyncio = False
66 _queue_class: Type[sqla_queue.QueueCommon[ConnectionPoolEntry]] = (
67 sqla_queue.Queue
68 )
70 _pool: sqla_queue.QueueCommon[ConnectionPoolEntry]
72 def __init__(
73 self,
74 creator: Union[_CreatorFnType, _CreatorWRecFnType],
75 pool_size: int = 5,
76 max_overflow: int = 10,
77 timeout: float = 30.0,
78 use_lifo: bool = False,
79 **kw: Any,
80 ):
81 r"""
82 Construct a QueuePool.
84 :param creator: a callable function that returns a DB-API
85 connection object, same as that of :paramref:`_pool.Pool.creator`.
87 :param pool_size: The size of the pool to be maintained,
88 defaults to 5. This is the largest number of connections that
89 will be kept persistently in the pool. Note that the pool
90 begins with no connections; once this number of connections
91 is requested, that number of connections will remain.
92 ``pool_size`` can be set to 0 to indicate no size limit; to
93 disable pooling, use a :class:`~sqlalchemy.pool.NullPool`
94 instead.
96 :param max_overflow: The maximum overflow size of the
97 pool. When the number of checked-out connections reaches the
98 size set in pool_size, additional connections will be
99 returned up to this limit. When those additional connections
100 are returned to the pool, they are disconnected and
101 discarded. It follows then that the total number of
102 simultaneous connections the pool will allow is pool_size +
103 `max_overflow`, and the total number of "sleeping"
104 connections the pool will allow is pool_size. `max_overflow`
105 can be set to -1 to indicate no overflow limit; no limit
106 will be placed on the total number of concurrent
107 connections. Defaults to 10.
109 :param timeout: The number of seconds to wait before giving up
110 on returning a connection. Defaults to 30.0. This can be a float
111 but is subject to the limitations of Python time functions which
112 may not be reliable in the tens of milliseconds.
114 :param use_lifo: use LIFO (last-in-first-out) when retrieving
115 connections instead of FIFO (first-in-first-out). Using LIFO, a
116 server-side timeout scheme can reduce the number of connections used
117 during non-peak periods of use. When planning for server-side
118 timeouts, ensure that a recycle or pre-ping strategy is in use to
119 gracefully handle stale connections.
121 .. versionadded:: 1.3
123 .. seealso::
125 :ref:`pool_use_lifo`
127 :ref:`pool_disconnects`
129 :param \**kw: Other keyword arguments including
130 :paramref:`_pool.Pool.recycle`, :paramref:`_pool.Pool.echo`,
131 :paramref:`_pool.Pool.reset_on_return` and others are passed to the
132 :class:`_pool.Pool` constructor.
134 """
136 Pool.__init__(self, creator, **kw)
137 self._pool = self._queue_class(pool_size, use_lifo=use_lifo)
138 self._overflow = 0 - pool_size
139 self._max_overflow = -1 if pool_size == 0 else max_overflow
140 self._timeout = timeout
141 self._overflow_lock = threading.Lock()
143 def _do_return_conn(self, record: ConnectionPoolEntry) -> None:
144 try:
145 self._pool.put(record, False)
146 except sqla_queue.Full:
147 try:
148 record.close()
149 finally:
150 self._dec_overflow()
152 def _do_get(self) -> ConnectionPoolEntry:
153 use_overflow = self._max_overflow > -1
155 wait = use_overflow and self._overflow >= self._max_overflow
156 try:
157 return self._pool.get(wait, self._timeout)
158 except sqla_queue.Empty:
159 # don't do things inside of "except Empty", because when we say
160 # we timed out or can't connect and raise, Python 3 tells
161 # people the real error is queue.Empty which it isn't.
162 pass
163 if use_overflow and self._overflow >= self._max_overflow:
164 if not wait:
165 return self._do_get()
166 else:
167 raise exc.TimeoutError(
168 "QueuePool limit of size %d overflow %d reached, "
169 "connection timed out, timeout %0.2f"
170 % (self.size(), self.overflow(), self._timeout),
171 code="3o7r",
172 )
174 if self._inc_overflow():
175 try:
176 return self._create_connection()
177 except:
178 with util.safe_reraise():
179 self._dec_overflow()
180 raise
181 else:
182 return self._do_get()
184 def _inc_overflow(self) -> bool:
185 if self._max_overflow == -1:
186 self._overflow += 1
187 return True
188 with self._overflow_lock:
189 if self._overflow < self._max_overflow:
190 self._overflow += 1
191 return True
192 else:
193 return False
195 def _dec_overflow(self) -> Literal[True]:
196 if self._max_overflow == -1:
197 self._overflow -= 1
198 return True
199 with self._overflow_lock:
200 self._overflow -= 1
201 return True
203 def recreate(self) -> QueuePool:
204 self.logger.info("Pool recreating")
205 return self.__class__(
206 self._creator,
207 pool_size=self._pool.maxsize,
208 max_overflow=self._max_overflow,
209 pre_ping=self._pre_ping,
210 use_lifo=self._pool.use_lifo,
211 timeout=self._timeout,
212 recycle=self._recycle,
213 echo=self.echo,
214 logging_name=self._orig_logging_name,
215 reset_on_return=self._reset_on_return,
216 _dispatch=self.dispatch,
217 dialect=self._dialect,
218 )
220 def dispose(self) -> None:
221 while True:
222 try:
223 conn = self._pool.get(False)
224 conn.close()
225 except sqla_queue.Empty:
226 break
228 self._overflow = 0 - self.size()
229 self.logger.info("Pool disposed. %s", self.status())
231 def status(self) -> str:
232 return (
233 "Pool size: %d Connections in pool: %d "
234 "Current Overflow: %d Current Checked out "
235 "connections: %d"
236 % (
237 self.size(),
238 self.checkedin(),
239 self.overflow(),
240 self.checkedout(),
241 )
242 )
244 def size(self) -> int:
245 return self._pool.maxsize
247 def timeout(self) -> float:
248 return self._timeout
250 def checkedin(self) -> int:
251 return self._pool.qsize()
253 def overflow(self) -> int:
254 return self._overflow if self._pool.maxsize else 0
256 def checkedout(self) -> int:
257 return self._pool.maxsize - self._pool.qsize() + self._overflow
260class AsyncAdaptedQueuePool(QueuePool):
261 """An asyncio-compatible version of :class:`.QueuePool`.
263 This pool is used by default when using :class:`.AsyncEngine` engines that
264 were generated from :func:`_asyncio.create_async_engine`. It uses an
265 asyncio-compatible queue implementation that does not use
266 ``threading.Lock``.
268 The arguments and operation of :class:`.AsyncAdaptedQueuePool` are
269 otherwise identical to that of :class:`.QueuePool`.
271 """
273 _is_asyncio = True
274 _queue_class: Type[sqla_queue.QueueCommon[ConnectionPoolEntry]] = (
275 sqla_queue.AsyncAdaptedQueue
276 )
278 _dialect = _AsyncConnDialect()
281class FallbackAsyncAdaptedQueuePool(AsyncAdaptedQueuePool):
282 _queue_class = sqla_queue.FallbackAsyncAdaptedQueue # type: ignore[assignment] # noqa: E501
285class NullPool(Pool):
286 """A Pool which does not pool connections.
288 Instead it literally opens and closes the underlying DB-API connection
289 per each connection open/close.
291 Reconnect-related functions such as ``recycle`` and connection
292 invalidation are not supported by this Pool implementation, since
293 no connections are held persistently.
295 The :class:`.NullPool` class **is compatible** with asyncio and
296 :func:`_asyncio.create_async_engine`.
298 """
300 def status(self) -> str:
301 return "NullPool"
303 def _do_return_conn(self, record: ConnectionPoolEntry) -> None:
304 record.close()
306 def _do_get(self) -> ConnectionPoolEntry:
307 return self._create_connection()
309 def recreate(self) -> NullPool:
310 self.logger.info("Pool recreating")
312 return self.__class__(
313 self._creator,
314 recycle=self._recycle,
315 echo=self.echo,
316 logging_name=self._orig_logging_name,
317 reset_on_return=self._reset_on_return,
318 pre_ping=self._pre_ping,
319 _dispatch=self.dispatch,
320 dialect=self._dialect,
321 )
323 def dispose(self) -> None:
324 pass
327class SingletonThreadPool(Pool):
328 """A Pool that maintains one connection per thread.
330 Maintains one connection per each thread, never moving a connection to a
331 thread other than the one which it was created in.
333 .. warning:: the :class:`.SingletonThreadPool` will call ``.close()``
334 on arbitrary connections that exist beyond the size setting of
335 ``pool_size``, e.g. if more unique **thread identities**
336 than what ``pool_size`` states are used. This cleanup is
337 non-deterministic and not sensitive to whether or not the connections
338 linked to those thread identities are currently in use.
340 :class:`.SingletonThreadPool` may be improved in a future release,
341 however in its current status it is generally used only for test
342 scenarios using a SQLite ``:memory:`` database and is not recommended
343 for production use.
345 The :class:`.SingletonThreadPool` class **is not compatible** with asyncio
346 and :func:`_asyncio.create_async_engine`.
349 Options are the same as those of :class:`_pool.Pool`, as well as:
351 :param pool_size: The number of threads in which to maintain connections
352 at once. Defaults to five.
354 :class:`.SingletonThreadPool` is used by the SQLite dialect
355 automatically when a memory-based database is used.
356 See :ref:`sqlite_toplevel`.
358 """
360 _is_asyncio = False
362 def __init__(
363 self,
364 creator: Union[_CreatorFnType, _CreatorWRecFnType],
365 pool_size: int = 5,
366 **kw: Any,
367 ):
368 Pool.__init__(self, creator, **kw)
369 self._conn = threading.local()
370 self._fairy = threading.local()
371 self._all_conns: Set[ConnectionPoolEntry] = set()
372 self.size = pool_size
374 def recreate(self) -> SingletonThreadPool:
375 self.logger.info("Pool recreating")
376 return self.__class__(
377 self._creator,
378 pool_size=self.size,
379 recycle=self._recycle,
380 echo=self.echo,
381 pre_ping=self._pre_ping,
382 logging_name=self._orig_logging_name,
383 reset_on_return=self._reset_on_return,
384 _dispatch=self.dispatch,
385 dialect=self._dialect,
386 )
388 def _transfer_from(
389 self, other_singleton_pool: SingletonThreadPool
390 ) -> None:
391 # used by the test suite to make a new engine / pool without
392 # losing the state of an existing SQLite :memory: connection
393 assert not hasattr(other_singleton_pool._fairy, "current")
394 self._conn = other_singleton_pool._conn
395 self._all_conns = other_singleton_pool._all_conns
397 def dispose(self) -> None:
398 """Dispose of this pool."""
400 for conn in self._all_conns:
401 try:
402 conn.close()
403 except Exception:
404 # pysqlite won't even let you close a conn from a thread
405 # that didn't create it
406 pass
408 self._all_conns.clear()
410 def _cleanup(self) -> None:
411 while len(self._all_conns) >= self.size:
412 c = self._all_conns.pop()
413 c.close()
415 def status(self) -> str:
416 return "SingletonThreadPool id:%d size: %d" % (
417 id(self),
418 len(self._all_conns),
419 )
421 def _do_return_conn(self, record: ConnectionPoolEntry) -> None:
422 try:
423 del self._fairy.current
424 except AttributeError:
425 pass
427 def _do_get(self) -> ConnectionPoolEntry:
428 try:
429 if TYPE_CHECKING:
430 c = cast(ConnectionPoolEntry, self._conn.current())
431 else:
432 c = self._conn.current()
433 if c:
434 return c
435 except AttributeError:
436 pass
437 c = self._create_connection()
438 self._conn.current = weakref.ref(c)
439 if len(self._all_conns) >= self.size:
440 self._cleanup()
441 self._all_conns.add(c)
442 return c
444 def connect(self) -> PoolProxiedConnection:
445 # vendored from Pool to include the now removed use_threadlocal
446 # behavior
447 try:
448 rec = cast(_ConnectionFairy, self._fairy.current())
449 except AttributeError:
450 pass
451 else:
452 if rec is not None:
453 return rec._checkout_existing()
455 return _ConnectionFairy._checkout(self, self._fairy)
458class StaticPool(Pool):
459 """A Pool of exactly one connection, used for all requests.
461 Reconnect-related functions such as ``recycle`` and connection
462 invalidation (which is also used to support auto-reconnect) are only
463 partially supported right now and may not yield good results.
465 The :class:`.StaticPool` class **is compatible** with asyncio and
466 :func:`_asyncio.create_async_engine`.
468 """
470 @util.memoized_property
471 def connection(self) -> _ConnectionRecord:
472 return _ConnectionRecord(self)
474 def status(self) -> str:
475 return "StaticPool"
477 def dispose(self) -> None:
478 if (
479 "connection" in self.__dict__
480 and self.connection.dbapi_connection is not None
481 ):
482 self.connection.close()
483 del self.__dict__["connection"]
485 def recreate(self) -> StaticPool:
486 self.logger.info("Pool recreating")
487 return self.__class__(
488 creator=self._creator,
489 recycle=self._recycle,
490 reset_on_return=self._reset_on_return,
491 pre_ping=self._pre_ping,
492 echo=self.echo,
493 logging_name=self._orig_logging_name,
494 _dispatch=self.dispatch,
495 dialect=self._dialect,
496 )
498 def _transfer_from(self, other_static_pool: StaticPool) -> None:
499 # used by the test suite to make a new engine / pool without
500 # losing the state of an existing SQLite :memory: connection
501 def creator(rec: ConnectionPoolEntry) -> DBAPIConnection:
502 conn = other_static_pool.connection.dbapi_connection
503 assert conn is not None
504 return conn
506 self._invoke_creator = creator
508 def _create_connection(self) -> ConnectionPoolEntry:
509 raise NotImplementedError()
511 def _do_return_conn(self, record: ConnectionPoolEntry) -> None:
512 pass
514 def _do_get(self) -> ConnectionPoolEntry:
515 rec = self.connection
516 if rec._is_hard_or_soft_invalidated():
517 del self.__dict__["connection"]
518 rec = self.connection
520 return rec
523class AssertionPool(Pool):
524 """A :class:`_pool.Pool` that allows at most one checked out connection at
525 any given time.
527 This will raise an exception if more than one connection is checked out
528 at a time. Useful for debugging code that is using more connections
529 than desired.
531 The :class:`.AssertionPool` class **is compatible** with asyncio and
532 :func:`_asyncio.create_async_engine`.
534 """
536 _conn: Optional[ConnectionPoolEntry]
537 _checkout_traceback: Optional[List[str]]
539 def __init__(self, *args: Any, **kw: Any):
540 self._conn = None
541 self._checked_out = False
542 self._store_traceback = kw.pop("store_traceback", True)
543 self._checkout_traceback = None
544 Pool.__init__(self, *args, **kw)
546 def status(self) -> str:
547 return "AssertionPool"
549 def _do_return_conn(self, record: ConnectionPoolEntry) -> None:
550 if not self._checked_out:
551 raise AssertionError("connection is not checked out")
552 self._checked_out = False
553 assert record is self._conn
555 def dispose(self) -> None:
556 self._checked_out = False
557 if self._conn:
558 self._conn.close()
560 def recreate(self) -> AssertionPool:
561 self.logger.info("Pool recreating")
562 return self.__class__(
563 self._creator,
564 echo=self.echo,
565 pre_ping=self._pre_ping,
566 recycle=self._recycle,
567 reset_on_return=self._reset_on_return,
568 logging_name=self._orig_logging_name,
569 _dispatch=self.dispatch,
570 dialect=self._dialect,
571 )
573 def _do_get(self) -> ConnectionPoolEntry:
574 if self._checked_out:
575 if self._checkout_traceback:
576 suffix = " at:\n%s" % "".join(
577 chop_traceback(self._checkout_traceback)
578 )
579 else:
580 suffix = ""
581 raise AssertionError("connection is already checked out" + suffix)
583 if not self._conn:
584 self._conn = self._create_connection()
586 self._checked_out = True
587 if self._store_traceback:
588 self._checkout_traceback = traceback.format_stack()
589 return self._conn