Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/pool/impl.py: 45%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# pool/impl.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
9"""Pool implementation classes."""
10from __future__ import annotations
12import threading
13import traceback
14import typing
15from typing import Any
16from typing import cast
17from typing import List
18from typing import Literal
19from typing import Optional
20from typing import Set
21from typing import Type
22from typing import TYPE_CHECKING
23from typing import Union
24import weakref
26from .base import _AsyncConnDialect
27from .base import _ConnectionFairy
28from .base import _ConnectionRecord
29from .base import _CreatorFnType
30from .base import _CreatorWRecFnType
31from .base import ConnectionPoolEntry
32from .base import Pool
33from .base import PoolProxiedConnection
34from .. import exc
35from .. import util
36from ..util import chop_traceback
37from ..util import queue as sqla_queue
39if typing.TYPE_CHECKING:
40 from ..engine.interfaces import DBAPIConnection
43class QueuePool(Pool):
44 """A :class:`_pool.Pool`
45 that imposes a limit on the number of open connections.
47 :class:`.QueuePool` is the default pooling implementation used for
48 all :class:`_engine.Engine` objects other than SQLite with a ``:memory:``
49 database.
51 The :class:`.QueuePool` class **is not compatible** with asyncio and
52 :func:`_asyncio.create_async_engine`. The
53 :class:`.AsyncAdaptedQueuePool` class is used automatically when
54 using :func:`_asyncio.create_async_engine`, if no other kind of pool
55 is specified.
57 .. seealso::
59 :class:`.AsyncAdaptedQueuePool`
61 """
63 _is_asyncio = False
65 _queue_class: Type[sqla_queue.QueueCommon[ConnectionPoolEntry]] = (
66 sqla_queue.Queue
67 )
69 _pool: sqla_queue.QueueCommon[ConnectionPoolEntry]
71 def __init__(
72 self,
73 creator: Union[_CreatorFnType, _CreatorWRecFnType],
74 pool_size: int = 5,
75 max_overflow: int = 10,
76 timeout: float = 30.0,
77 use_lifo: bool = False,
78 **kw: Any,
79 ):
80 r"""
81 Construct a QueuePool.
83 :param creator: a callable function that returns a DB-API
84 connection object, same as that of :paramref:`_pool.Pool.creator`.
86 :param pool_size: The size of the pool to be maintained,
87 defaults to 5. This is the largest number of connections that
88 will be kept persistently in the pool. Note that the pool
89 begins with no connections; once this number of connections
90 is requested, that number of connections will remain.
91 ``pool_size`` can be set to 0 to indicate no size limit; to
92 disable pooling, use a :class:`~sqlalchemy.pool.NullPool`
93 instead.
95 :param max_overflow: The maximum overflow size of the
96 pool. When the number of checked-out connections reaches the
97 size set in pool_size, additional connections will be
98 returned up to this limit. When those additional connections
99 are returned to the pool, they are disconnected and
100 discarded. It follows then that the total number of
101 simultaneous connections the pool will allow is pool_size +
102 `max_overflow`, and the total number of "sleeping"
103 connections the pool will allow is pool_size. `max_overflow`
104 can be set to -1 to indicate no overflow limit; no limit
105 will be placed on the total number of concurrent
106 connections. Defaults to 10.
108 :param timeout: The number of seconds to wait before giving up
109 on returning a connection. Defaults to 30.0. This can be a float
110 but is subject to the limitations of Python time functions which
111 may not be reliable in the tens of milliseconds.
113 :param use_lifo: use LIFO (last-in-first-out) when retrieving
114 connections instead of FIFO (first-in-first-out). Using LIFO, a
115 server-side timeout scheme can reduce the number of connections used
116 during non-peak periods of use. When planning for server-side
117 timeouts, ensure that a recycle or pre-ping strategy is in use to
118 gracefully handle stale connections.
120 .. seealso::
122 :ref:`pool_use_lifo`
124 :ref:`pool_disconnects`
126 :param \**kw: Other keyword arguments including
127 :paramref:`_pool.Pool.recycle`, :paramref:`_pool.Pool.echo`,
128 :paramref:`_pool.Pool.reset_on_return` and others are passed to the
129 :class:`_pool.Pool` constructor.
131 """
133 Pool.__init__(self, creator, **kw)
134 self._pool = self._queue_class(pool_size, use_lifo=use_lifo)
135 self._overflow = 0 - pool_size
136 self._max_overflow = -1 if pool_size == 0 else max_overflow
137 self._timeout = timeout
138 self._overflow_lock = threading.Lock()
140 def _do_return_conn(self, record: ConnectionPoolEntry) -> None:
141 try:
142 self._pool.put(record, False)
143 except sqla_queue.Full:
144 try:
145 record.close()
146 finally:
147 self._dec_overflow()
149 def _do_get(self) -> ConnectionPoolEntry:
150 use_overflow = self._max_overflow > -1
152 wait = use_overflow and self._overflow >= self._max_overflow
153 try:
154 return self._pool.get(wait, self._timeout)
155 except sqla_queue.Empty:
156 # don't do things inside of "except Empty", because when we say
157 # we timed out or can't connect and raise, Python 3 tells
158 # people the real error is queue.Empty which it isn't.
159 pass
160 if use_overflow and self._overflow >= self._max_overflow:
161 if not wait:
162 return self._do_get()
163 else:
164 raise exc.TimeoutError(
165 "QueuePool limit of size %d overflow %d reached, "
166 "connection timed out, timeout %0.2f"
167 % (self.size(), self.overflow(), self._timeout),
168 code="3o7r",
169 )
171 if self._inc_overflow():
172 try:
173 return self._create_connection()
174 except:
175 with util.safe_reraise():
176 self._dec_overflow()
177 raise
178 else:
179 return self._do_get()
181 def _inc_overflow(self) -> bool:
182 if self._max_overflow == -1:
183 self._overflow += 1
184 return True
185 with self._overflow_lock:
186 if self._overflow < self._max_overflow:
187 self._overflow += 1
188 return True
189 else:
190 return False
192 def _dec_overflow(self) -> Literal[True]:
193 if self._max_overflow == -1:
194 self._overflow -= 1
195 return True
196 with self._overflow_lock:
197 self._overflow -= 1
198 return True
200 def recreate(self) -> QueuePool:
201 self.logger.info("Pool recreating")
202 return self.__class__(
203 self._creator,
204 pool_size=self._pool.maxsize,
205 max_overflow=self._max_overflow,
206 pre_ping=self._pre_ping,
207 use_lifo=self._pool.use_lifo,
208 timeout=self._timeout,
209 recycle=self._recycle,
210 echo=self.echo,
211 logging_name=self._orig_logging_name,
212 reset_on_return=self._reset_on_return,
213 _dispatch=self.dispatch,
214 dialect=self._dialect,
215 )
217 def dispose(self) -> None:
218 while True:
219 try:
220 conn = self._pool.get(False)
221 conn.close()
222 except sqla_queue.Empty:
223 break
225 self._overflow = 0 - self.size()
226 self.logger.info("Pool disposed. %s", self.status())
228 def status(self) -> str:
229 return (
230 "Pool size: %d Connections in pool: %d "
231 "Current Overflow: %d Current Checked out "
232 "connections: %d"
233 % (
234 self.size(),
235 self.checkedin(),
236 self.overflow(),
237 self.checkedout(),
238 )
239 )
241 def size(self) -> int:
242 return self._pool.maxsize
244 def timeout(self) -> float:
245 return self._timeout
247 def checkedin(self) -> int:
248 return self._pool.qsize()
250 def overflow(self) -> int:
251 return self._overflow if self._pool.maxsize else 0
253 def checkedout(self) -> int:
254 return self._pool.maxsize - self._pool.qsize() + self._overflow
257class AsyncAdaptedQueuePool(QueuePool):
258 """An asyncio-compatible version of :class:`.QueuePool`.
260 This pool is used by default when using :class:`.AsyncEngine` engines that
261 were generated from :func:`_asyncio.create_async_engine`. It uses an
262 asyncio-compatible queue implementation that does not use
263 ``threading.Lock``.
265 The arguments and operation of :class:`.AsyncAdaptedQueuePool` are
266 otherwise identical to that of :class:`.QueuePool`.
268 """
270 _is_asyncio = True
271 _queue_class: Type[sqla_queue.QueueCommon[ConnectionPoolEntry]] = (
272 sqla_queue.AsyncAdaptedQueue
273 )
275 _dialect = _AsyncConnDialect()
278class NullPool(Pool):
279 """A Pool which does not pool connections.
281 Instead it literally opens and closes the underlying DB-API connection
282 per each connection open/close.
284 Reconnect-related functions such as ``recycle`` and connection
285 invalidation are not supported by this Pool implementation, since
286 no connections are held persistently.
288 The :class:`.NullPool` class **is compatible** with asyncio and
289 :func:`_asyncio.create_async_engine`.
291 """
293 def status(self) -> str:
294 return "NullPool"
296 def _do_return_conn(self, record: ConnectionPoolEntry) -> None:
297 record.close()
299 def _do_get(self) -> ConnectionPoolEntry:
300 return self._create_connection()
302 def recreate(self) -> NullPool:
303 self.logger.info("Pool recreating")
305 return self.__class__(
306 self._creator,
307 recycle=self._recycle,
308 echo=self.echo,
309 logging_name=self._orig_logging_name,
310 reset_on_return=self._reset_on_return,
311 pre_ping=self._pre_ping,
312 _dispatch=self.dispatch,
313 dialect=self._dialect,
314 )
316 def dispose(self) -> None:
317 pass
320class SingletonThreadPool(Pool):
321 """A Pool that maintains one connection per thread.
323 Maintains one connection per each thread, never moving a connection to a
324 thread other than the one which it was created in.
326 .. warning:: the :class:`.SingletonThreadPool` will call ``.close()``
327 on arbitrary connections that exist beyond the size setting of
328 ``pool_size``, e.g. if more unique **thread identities**
329 than what ``pool_size`` states are used. This cleanup is
330 non-deterministic and not sensitive to whether or not the connections
331 linked to those thread identities are currently in use.
333 :class:`.SingletonThreadPool` may be improved in a future release,
334 however in its current status it is generally used only for test
335 scenarios using a SQLite ``:memory:`` database and is not recommended
336 for production use.
338 The :class:`.SingletonThreadPool` class **is not compatible** with asyncio
339 and :func:`_asyncio.create_async_engine`.
342 Options are the same as those of :class:`_pool.Pool`, as well as:
344 :param pool_size: The number of threads in which to maintain connections
345 at once. Defaults to five.
347 :class:`.SingletonThreadPool` is used by the SQLite dialect
348 automatically when a memory-based database is used.
349 See :ref:`sqlite_toplevel`.
351 """
353 _is_asyncio = False
355 def __init__(
356 self,
357 creator: Union[_CreatorFnType, _CreatorWRecFnType],
358 pool_size: int = 5,
359 **kw: Any,
360 ):
361 Pool.__init__(self, creator, **kw)
362 self._conn = threading.local()
363 self._fairy = threading.local()
364 self._all_conns: Set[ConnectionPoolEntry] = set()
365 self.size = pool_size
367 def recreate(self) -> SingletonThreadPool:
368 self.logger.info("Pool recreating")
369 return self.__class__(
370 self._creator,
371 pool_size=self.size,
372 recycle=self._recycle,
373 echo=self.echo,
374 pre_ping=self._pre_ping,
375 logging_name=self._orig_logging_name,
376 reset_on_return=self._reset_on_return,
377 _dispatch=self.dispatch,
378 dialect=self._dialect,
379 )
381 def dispose(self) -> None:
382 """Dispose of this pool."""
384 for conn in self._all_conns:
385 try:
386 conn.close()
387 except Exception:
388 # pysqlite won't even let you close a conn from a thread
389 # that didn't create it
390 pass
392 self._all_conns.clear()
394 def _cleanup(self) -> None:
395 while len(self._all_conns) >= self.size:
396 c = self._all_conns.pop()
397 c.close()
399 def status(self) -> str:
400 return "SingletonThreadPool id:%d size: %d" % (
401 id(self),
402 len(self._all_conns),
403 )
405 def _do_return_conn(self, record: ConnectionPoolEntry) -> None:
406 try:
407 del self._fairy.current
408 except AttributeError:
409 pass
411 def _do_get(self) -> ConnectionPoolEntry:
412 try:
413 if TYPE_CHECKING:
414 c = cast(ConnectionPoolEntry, self._conn.current())
415 else:
416 c = self._conn.current()
417 if c:
418 return c
419 except AttributeError:
420 pass
421 c = self._create_connection()
422 self._conn.current = weakref.ref(c)
423 if len(self._all_conns) >= self.size:
424 self._cleanup()
425 self._all_conns.add(c)
426 return c
428 def connect(self) -> PoolProxiedConnection:
429 # vendored from Pool to include the now removed use_threadlocal
430 # behavior
431 try:
432 rec = cast(_ConnectionFairy, self._fairy.current())
433 except AttributeError:
434 pass
435 else:
436 if rec is not None:
437 return rec._checkout_existing()
439 return _ConnectionFairy._checkout(self, self._fairy)
442class StaticPool(Pool):
443 """A Pool of exactly one connection, used for all requests.
445 Reconnect-related functions such as ``recycle`` and connection
446 invalidation (which is also used to support auto-reconnect) are only
447 partially supported right now and may not yield good results.
449 The :class:`.StaticPool` class **is compatible** with asyncio and
450 :func:`_asyncio.create_async_engine`.
452 """
454 @util.memoized_property
455 def connection(self) -> _ConnectionRecord:
456 return _ConnectionRecord(self)
458 def status(self) -> str:
459 return "StaticPool"
461 def dispose(self) -> None:
462 if (
463 "connection" in self.__dict__
464 and self.connection.dbapi_connection is not None
465 ):
466 self.connection.close()
467 del self.__dict__["connection"]
469 def recreate(self) -> StaticPool:
470 self.logger.info("Pool recreating")
471 return self.__class__(
472 creator=self._creator,
473 recycle=self._recycle,
474 reset_on_return=self._reset_on_return,
475 pre_ping=self._pre_ping,
476 echo=self.echo,
477 logging_name=self._orig_logging_name,
478 _dispatch=self.dispatch,
479 dialect=self._dialect,
480 )
482 def _transfer_from(self, other_static_pool: StaticPool) -> None:
483 # used by the test suite to make a new engine / pool without
484 # losing the state of an existing SQLite :memory: connection
485 def creator(rec: ConnectionPoolEntry) -> DBAPIConnection:
486 conn = other_static_pool.connection.dbapi_connection
487 assert conn is not None
488 return conn
490 self._invoke_creator = creator
492 def _create_connection(self) -> ConnectionPoolEntry:
493 raise NotImplementedError()
495 def _do_return_conn(self, record: ConnectionPoolEntry) -> None:
496 pass
498 def _do_get(self) -> ConnectionPoolEntry:
499 rec = self.connection
500 if rec._is_hard_or_soft_invalidated():
501 del self.__dict__["connection"]
502 rec = self.connection
504 return rec
507class AssertionPool(Pool):
508 """A :class:`_pool.Pool` that allows at most one checked out connection at
509 any given time.
511 This will raise an exception if more than one connection is checked out
512 at a time. Useful for debugging code that is using more connections
513 than desired.
515 The :class:`.AssertionPool` class **is compatible** with asyncio and
516 :func:`_asyncio.create_async_engine`.
518 """
520 _conn: Optional[ConnectionPoolEntry]
521 _checkout_traceback: Optional[List[str]]
523 def __init__(self, *args: Any, **kw: Any):
524 self._conn = None
525 self._checked_out = False
526 self._store_traceback = kw.pop("store_traceback", True)
527 self._checkout_traceback = None
528 Pool.__init__(self, *args, **kw)
530 def status(self) -> str:
531 return "AssertionPool"
533 def _do_return_conn(self, record: ConnectionPoolEntry) -> None:
534 if not self._checked_out:
535 raise AssertionError("connection is not checked out")
536 self._checked_out = False
537 assert record is self._conn
539 def dispose(self) -> None:
540 self._checked_out = False
541 if self._conn:
542 self._conn.close()
544 def recreate(self) -> AssertionPool:
545 self.logger.info("Pool recreating")
546 return self.__class__(
547 self._creator,
548 echo=self.echo,
549 pre_ping=self._pre_ping,
550 recycle=self._recycle,
551 reset_on_return=self._reset_on_return,
552 logging_name=self._orig_logging_name,
553 _dispatch=self.dispatch,
554 dialect=self._dialect,
555 )
557 def _do_get(self) -> ConnectionPoolEntry:
558 if self._checked_out:
559 if self._checkout_traceback:
560 suffix = " at:\n%s" % "".join(
561 chop_traceback(self._checkout_traceback)
562 )
563 else:
564 suffix = ""
565 raise AssertionError("connection is already checked out" + suffix)
567 if not self._conn:
568 self._conn = self._create_connection()
570 self._checked_out = True
571 if self._store_traceback:
572 self._checkout_traceback = traceback.format_stack()
573 return self._conn