Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/pool/impl.py: 44%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# pool/impl.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
9"""Pool implementation classes."""
10from __future__ import annotations
12import threading
13import traceback
14import typing
15from typing import Any
16from typing import cast
17from typing import List
18from typing import Optional
19from typing import Set
20from typing import Type
21from typing import TYPE_CHECKING
22from typing import Union
23import weakref
25from .base import _AsyncConnDialect
26from .base import _ConnectionFairy
27from .base import _ConnectionRecord
28from .base import _CreatorFnType
29from .base import _CreatorWRecFnType
30from .base import ConnectionPoolEntry
31from .base import Pool
32from .base import PoolProxiedConnection
33from .. import exc
34from .. import util
35from ..util import chop_traceback
36from ..util import queue as sqla_queue
37from ..util.typing import Literal
39if typing.TYPE_CHECKING:
40 from ..engine.interfaces import DBAPIConnection
43class QueuePool(Pool):
44 """A :class:`_pool.Pool`
45 that imposes a limit on the number of open connections.
47 :class:`.QueuePool` is the default pooling implementation used for
48 all :class:`_engine.Engine` objects other than SQLite with a ``:memory:``
49 database.
51 The :class:`.QueuePool` class **is not compatible** with asyncio and
52 :func:`_asyncio.create_async_engine`. The
53 :class:`.AsyncAdaptedQueuePool` class is used automatically when
54 using :func:`_asyncio.create_async_engine`, if no other kind of pool
55 is specified.
57 .. seealso::
59 :class:`.AsyncAdaptedQueuePool`
61 """
63 _is_asyncio = False
65 _queue_class: Type[sqla_queue.QueueCommon[ConnectionPoolEntry]] = (
66 sqla_queue.Queue
67 )
69 _pool: sqla_queue.QueueCommon[ConnectionPoolEntry]
71 def __init__(
72 self,
73 creator: Union[_CreatorFnType, _CreatorWRecFnType],
74 pool_size: int = 5,
75 max_overflow: int = 10,
76 timeout: float = 30.0,
77 use_lifo: bool = False,
78 **kw: Any,
79 ):
80 r"""
81 Construct a QueuePool.
83 :param creator: a callable function that returns a DB-API
84 connection object, same as that of :paramref:`_pool.Pool.creator`.
86 :param pool_size: The size of the pool to be maintained,
87 defaults to 5. This is the largest number of connections that
88 will be kept persistently in the pool. Note that the pool
89 begins with no connections; once this number of connections
90 is requested, that number of connections will remain.
91 ``pool_size`` can be set to 0 to indicate no size limit; to
92 disable pooling, use a :class:`~sqlalchemy.pool.NullPool`
93 instead.
95 :param max_overflow: The maximum overflow size of the
96 pool. When the number of checked-out connections reaches the
97 size set in pool_size, additional connections will be
98 returned up to this limit. When those additional connections
99 are returned to the pool, they are disconnected and
100 discarded. It follows then that the total number of
101 simultaneous connections the pool will allow is pool_size +
102 `max_overflow`, and the total number of "sleeping"
103 connections the pool will allow is pool_size. `max_overflow`
104 can be set to -1 to indicate no overflow limit; no limit
105 will be placed on the total number of concurrent
106 connections. Defaults to 10.
108 :param timeout: The number of seconds to wait before giving up
109 on returning a connection. Defaults to 30.0. This can be a float
110 but is subject to the limitations of Python time functions which
111 may not be reliable in the tens of milliseconds.
113 :param use_lifo: use LIFO (last-in-first-out) when retrieving
114 connections instead of FIFO (first-in-first-out). Using LIFO, a
115 server-side timeout scheme can reduce the number of connections used
116 during non-peak periods of use. When planning for server-side
117 timeouts, ensure that a recycle or pre-ping strategy is in use to
118 gracefully handle stale connections.
120 .. versionadded:: 1.3
122 .. seealso::
124 :ref:`pool_use_lifo`
126 :ref:`pool_disconnects`
128 :param \**kw: Other keyword arguments including
129 :paramref:`_pool.Pool.recycle`, :paramref:`_pool.Pool.echo`,
130 :paramref:`_pool.Pool.reset_on_return` and others are passed to the
131 :class:`_pool.Pool` constructor.
133 """
135 Pool.__init__(self, creator, **kw)
136 self._pool = self._queue_class(pool_size, use_lifo=use_lifo)
137 self._overflow = 0 - pool_size
138 self._max_overflow = -1 if pool_size == 0 else max_overflow
139 self._timeout = timeout
140 self._overflow_lock = threading.Lock()
142 def _do_return_conn(self, record: ConnectionPoolEntry) -> None:
143 try:
144 self._pool.put(record, False)
145 except sqla_queue.Full:
146 try:
147 record.close()
148 finally:
149 self._dec_overflow()
151 def _do_get(self) -> ConnectionPoolEntry:
152 use_overflow = self._max_overflow > -1
154 wait = use_overflow and self._overflow >= self._max_overflow
155 try:
156 return self._pool.get(wait, self._timeout)
157 except sqla_queue.Empty:
158 # don't do things inside of "except Empty", because when we say
159 # we timed out or can't connect and raise, Python 3 tells
160 # people the real error is queue.Empty which it isn't.
161 pass
162 if use_overflow and self._overflow >= self._max_overflow:
163 if not wait:
164 return self._do_get()
165 else:
166 raise exc.TimeoutError(
167 "QueuePool limit of size %d overflow %d reached, "
168 "connection timed out, timeout %0.2f"
169 % (self.size(), self.overflow(), self._timeout),
170 code="3o7r",
171 )
173 if self._inc_overflow():
174 try:
175 return self._create_connection()
176 except:
177 with util.safe_reraise():
178 self._dec_overflow()
179 raise
180 else:
181 return self._do_get()
183 def _inc_overflow(self) -> bool:
184 if self._max_overflow == -1:
185 self._overflow += 1
186 return True
187 with self._overflow_lock:
188 if self._overflow < self._max_overflow:
189 self._overflow += 1
190 return True
191 else:
192 return False
194 def _dec_overflow(self) -> Literal[True]:
195 if self._max_overflow == -1:
196 self._overflow -= 1
197 return True
198 with self._overflow_lock:
199 self._overflow -= 1
200 return True
202 def recreate(self) -> QueuePool:
203 self.logger.info("Pool recreating")
204 return self.__class__(
205 self._creator,
206 pool_size=self._pool.maxsize,
207 max_overflow=self._max_overflow,
208 pre_ping=self._pre_ping,
209 use_lifo=self._pool.use_lifo,
210 timeout=self._timeout,
211 recycle=self._recycle,
212 echo=self.echo,
213 logging_name=self._orig_logging_name,
214 reset_on_return=self._reset_on_return,
215 _dispatch=self.dispatch,
216 dialect=self._dialect,
217 )
219 def dispose(self) -> None:
220 while True:
221 try:
222 conn = self._pool.get(False)
223 conn.close()
224 except sqla_queue.Empty:
225 break
227 self._overflow = 0 - self.size()
228 self.logger.info("Pool disposed. %s", self.status())
230 def status(self) -> str:
231 return (
232 "Pool size: %d Connections in pool: %d "
233 "Current Overflow: %d Current Checked out "
234 "connections: %d"
235 % (
236 self.size(),
237 self.checkedin(),
238 self.overflow(),
239 self.checkedout(),
240 )
241 )
243 def size(self) -> int:
244 return self._pool.maxsize
246 def timeout(self) -> float:
247 return self._timeout
249 def checkedin(self) -> int:
250 return self._pool.qsize()
252 def overflow(self) -> int:
253 return self._overflow if self._pool.maxsize else 0
255 def checkedout(self) -> int:
256 return self._pool.maxsize - self._pool.qsize() + self._overflow
259class AsyncAdaptedQueuePool(QueuePool):
260 """An asyncio-compatible version of :class:`.QueuePool`.
262 This pool is used by default when using :class:`.AsyncEngine` engines that
263 were generated from :func:`_asyncio.create_async_engine`. It uses an
264 asyncio-compatible queue implementation that does not use
265 ``threading.Lock``.
267 The arguments and operation of :class:`.AsyncAdaptedQueuePool` are
268 otherwise identical to that of :class:`.QueuePool`.
270 """
272 _is_asyncio = True
273 _queue_class: Type[sqla_queue.QueueCommon[ConnectionPoolEntry]] = (
274 sqla_queue.AsyncAdaptedQueue
275 )
277 _dialect = _AsyncConnDialect()
280class FallbackAsyncAdaptedQueuePool(AsyncAdaptedQueuePool):
281 _queue_class = sqla_queue.FallbackAsyncAdaptedQueue # type: ignore[assignment] # noqa: E501
284class NullPool(Pool):
285 """A Pool which does not pool connections.
287 Instead it literally opens and closes the underlying DB-API connection
288 per each connection open/close.
290 Reconnect-related functions such as ``recycle`` and connection
291 invalidation are not supported by this Pool implementation, since
292 no connections are held persistently.
294 The :class:`.NullPool` class **is compatible** with asyncio and
295 :func:`_asyncio.create_async_engine`.
297 """
299 def status(self) -> str:
300 return "NullPool"
302 def _do_return_conn(self, record: ConnectionPoolEntry) -> None:
303 record.close()
305 def _do_get(self) -> ConnectionPoolEntry:
306 return self._create_connection()
308 def recreate(self) -> NullPool:
309 self.logger.info("Pool recreating")
311 return self.__class__(
312 self._creator,
313 recycle=self._recycle,
314 echo=self.echo,
315 logging_name=self._orig_logging_name,
316 reset_on_return=self._reset_on_return,
317 pre_ping=self._pre_ping,
318 _dispatch=self.dispatch,
319 dialect=self._dialect,
320 )
322 def dispose(self) -> None:
323 pass
326class SingletonThreadPool(Pool):
327 """A Pool that maintains one connection per thread.
329 Maintains one connection per each thread, never moving a connection to a
330 thread other than the one which it was created in.
332 .. warning:: the :class:`.SingletonThreadPool` will call ``.close()``
333 on arbitrary connections that exist beyond the size setting of
334 ``pool_size``, e.g. if more unique **thread identities**
335 than what ``pool_size`` states are used. This cleanup is
336 non-deterministic and not sensitive to whether or not the connections
337 linked to those thread identities are currently in use.
339 :class:`.SingletonThreadPool` may be improved in a future release,
340 however in its current status it is generally used only for test
341 scenarios using a SQLite ``:memory:`` database and is not recommended
342 for production use.
344 The :class:`.SingletonThreadPool` class **is not compatible** with asyncio
345 and :func:`_asyncio.create_async_engine`.
348 Options are the same as those of :class:`_pool.Pool`, as well as:
350 :param pool_size: The number of threads in which to maintain connections
351 at once. Defaults to five.
353 :class:`.SingletonThreadPool` is used by the SQLite dialect
354 automatically when a memory-based database is used.
355 See :ref:`sqlite_toplevel`.
357 """
359 _is_asyncio = False
361 def __init__(
362 self,
363 creator: Union[_CreatorFnType, _CreatorWRecFnType],
364 pool_size: int = 5,
365 **kw: Any,
366 ):
367 Pool.__init__(self, creator, **kw)
368 self._conn = threading.local()
369 self._fairy = threading.local()
370 self._all_conns: Set[ConnectionPoolEntry] = set()
371 self.size = pool_size
373 def recreate(self) -> SingletonThreadPool:
374 self.logger.info("Pool recreating")
375 return self.__class__(
376 self._creator,
377 pool_size=self.size,
378 recycle=self._recycle,
379 echo=self.echo,
380 pre_ping=self._pre_ping,
381 logging_name=self._orig_logging_name,
382 reset_on_return=self._reset_on_return,
383 _dispatch=self.dispatch,
384 dialect=self._dialect,
385 )
387 def _transfer_from(
388 self, other_singleton_pool: SingletonThreadPool
389 ) -> None:
390 # used by the test suite to make a new engine / pool without
391 # losing the state of an existing SQLite :memory: connection
392 assert not hasattr(other_singleton_pool._fairy, "current")
393 self._conn = other_singleton_pool._conn
394 self._all_conns = other_singleton_pool._all_conns
396 def dispose(self) -> None:
397 """Dispose of this pool."""
399 for conn in self._all_conns:
400 try:
401 conn.close()
402 except Exception:
403 # pysqlite won't even let you close a conn from a thread
404 # that didn't create it
405 pass
407 self._all_conns.clear()
409 def _cleanup(self) -> None:
410 while len(self._all_conns) >= self.size:
411 c = self._all_conns.pop()
412 c.close()
414 def status(self) -> str:
415 return "SingletonThreadPool id:%d size: %d" % (
416 id(self),
417 len(self._all_conns),
418 )
420 def _do_return_conn(self, record: ConnectionPoolEntry) -> None:
421 try:
422 del self._fairy.current
423 except AttributeError:
424 pass
426 def _do_get(self) -> ConnectionPoolEntry:
427 try:
428 if TYPE_CHECKING:
429 c = cast(ConnectionPoolEntry, self._conn.current())
430 else:
431 c = self._conn.current()
432 if c:
433 return c
434 except AttributeError:
435 pass
436 c = self._create_connection()
437 self._conn.current = weakref.ref(c)
438 if len(self._all_conns) >= self.size:
439 self._cleanup()
440 self._all_conns.add(c)
441 return c
443 def connect(self) -> PoolProxiedConnection:
444 # vendored from Pool to include the now removed use_threadlocal
445 # behavior
446 try:
447 rec = cast(_ConnectionFairy, self._fairy.current())
448 except AttributeError:
449 pass
450 else:
451 if rec is not None:
452 return rec._checkout_existing()
454 return _ConnectionFairy._checkout(self, self._fairy)
457class StaticPool(Pool):
458 """A Pool of exactly one connection, used for all requests.
460 Reconnect-related functions such as ``recycle`` and connection
461 invalidation (which is also used to support auto-reconnect) are only
462 partially supported right now and may not yield good results.
464 The :class:`.StaticPool` class **is compatible** with asyncio and
465 :func:`_asyncio.create_async_engine`.
467 """
469 @util.memoized_property
470 def connection(self) -> _ConnectionRecord:
471 return _ConnectionRecord(self)
473 def status(self) -> str:
474 return "StaticPool"
476 def dispose(self) -> None:
477 if (
478 "connection" in self.__dict__
479 and self.connection.dbapi_connection is not None
480 ):
481 self.connection.close()
482 del self.__dict__["connection"]
484 def recreate(self) -> StaticPool:
485 self.logger.info("Pool recreating")
486 return self.__class__(
487 creator=self._creator,
488 recycle=self._recycle,
489 reset_on_return=self._reset_on_return,
490 pre_ping=self._pre_ping,
491 echo=self.echo,
492 logging_name=self._orig_logging_name,
493 _dispatch=self.dispatch,
494 dialect=self._dialect,
495 )
497 def _transfer_from(self, other_static_pool: StaticPool) -> None:
498 # used by the test suite to make a new engine / pool without
499 # losing the state of an existing SQLite :memory: connection
500 def creator(rec: ConnectionPoolEntry) -> DBAPIConnection:
501 conn = other_static_pool.connection.dbapi_connection
502 assert conn is not None
503 return conn
505 self._invoke_creator = creator
507 def _create_connection(self) -> ConnectionPoolEntry:
508 raise NotImplementedError()
510 def _do_return_conn(self, record: ConnectionPoolEntry) -> None:
511 pass
513 def _do_get(self) -> ConnectionPoolEntry:
514 rec = self.connection
515 if rec._is_hard_or_soft_invalidated():
516 del self.__dict__["connection"]
517 rec = self.connection
519 return rec
522class AssertionPool(Pool):
523 """A :class:`_pool.Pool` that allows at most one checked out connection at
524 any given time.
526 This will raise an exception if more than one connection is checked out
527 at a time. Useful for debugging code that is using more connections
528 than desired.
530 The :class:`.AssertionPool` class **is compatible** with asyncio and
531 :func:`_asyncio.create_async_engine`.
533 """
535 _conn: Optional[ConnectionPoolEntry]
536 _checkout_traceback: Optional[List[str]]
538 def __init__(self, *args: Any, **kw: Any):
539 self._conn = None
540 self._checked_out = False
541 self._store_traceback = kw.pop("store_traceback", True)
542 self._checkout_traceback = None
543 Pool.__init__(self, *args, **kw)
545 def status(self) -> str:
546 return "AssertionPool"
548 def _do_return_conn(self, record: ConnectionPoolEntry) -> None:
549 if not self._checked_out:
550 raise AssertionError("connection is not checked out")
551 self._checked_out = False
552 assert record is self._conn
554 def dispose(self) -> None:
555 self._checked_out = False
556 if self._conn:
557 self._conn.close()
559 def recreate(self) -> AssertionPool:
560 self.logger.info("Pool recreating")
561 return self.__class__(
562 self._creator,
563 echo=self.echo,
564 pre_ping=self._pre_ping,
565 recycle=self._recycle,
566 reset_on_return=self._reset_on_return,
567 logging_name=self._orig_logging_name,
568 _dispatch=self.dispatch,
569 dialect=self._dialect,
570 )
572 def _do_get(self) -> ConnectionPoolEntry:
573 if self._checked_out:
574 if self._checkout_traceback:
575 suffix = " at:\n%s" % "".join(
576 chop_traceback(self._checkout_traceback)
577 )
578 else:
579 suffix = ""
580 raise AssertionError("connection is already checked out" + suffix)
582 if not self._conn:
583 self._conn = self._create_connection()
585 self._checked_out = True
586 if self._store_traceback:
587 self._checkout_traceback = traceback.format_stack()
588 return self._conn