Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/pool/impl.py: 44%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# pool/impl.py
2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
9"""Pool implementation classes."""
11from __future__ import annotations
13import threading
14import traceback
15import typing
16from typing import Any
17from typing import cast
18from typing import List
19from typing import Literal
20from typing import Optional
21from typing import Set
22from typing import Type
23from typing import TYPE_CHECKING
24from typing import Union
25import weakref
27from .base import _AsyncConnDialect
28from .base import _ConnectionFairy
29from .base import _ConnectionRecord
30from .base import _CreatorFnType
31from .base import _CreatorWRecFnType
32from .base import ConnectionPoolEntry
33from .base import Pool
34from .base import PoolProxiedConnection
35from .. import exc
36from .. import util
37from ..util import chop_traceback
38from ..util import queue as sqla_queue
40if typing.TYPE_CHECKING:
41 from ..engine.interfaces import DBAPIConnection
44class QueuePool(Pool):
45 """A :class:`_pool.Pool`
46 that imposes a limit on the number of open connections.
48 :class:`.QueuePool` is the default pooling implementation used for
49 all :class:`_engine.Engine` objects other than SQLite with a ``:memory:``
50 database.
52 The :class:`.QueuePool` class **is not compatible** with asyncio and
53 :func:`_asyncio.create_async_engine`. The
54 :class:`.AsyncAdaptedQueuePool` class is used automatically when
55 using :func:`_asyncio.create_async_engine`, if no other kind of pool
56 is specified.
58 .. seealso::
60 :class:`.AsyncAdaptedQueuePool`
62 """
64 _is_asyncio = False
66 _queue_class: Type[sqla_queue.QueueCommon[ConnectionPoolEntry]] = (
67 sqla_queue.Queue
68 )
70 _pool: sqla_queue.QueueCommon[ConnectionPoolEntry]
72 def __init__(
73 self,
74 creator: Union[_CreatorFnType, _CreatorWRecFnType],
75 pool_size: int = 5,
76 max_overflow: int = 10,
77 timeout: float = 30.0,
78 use_lifo: bool = False,
79 **kw: Any,
80 ):
81 r"""
82 Construct a QueuePool.
84 :param creator: a callable function that returns a DB-API
85 connection object, same as that of :paramref:`_pool.Pool.creator`.
87 :param pool_size: The size of the pool to be maintained,
88 defaults to 5. This is the largest number of connections that
89 will be kept persistently in the pool. Note that the pool
90 begins with no connections; once this number of connections
91 is requested, that number of connections will remain.
92 ``pool_size`` can be set to 0 to indicate no size limit; to
93 disable pooling, use a :class:`~sqlalchemy.pool.NullPool`
94 instead.
96 :param max_overflow: The maximum overflow size of the
97 pool. When the number of checked-out connections reaches the
98 size set in pool_size, additional connections will be
99 returned up to this limit. When those additional connections
100 are returned to the pool, they are disconnected and
101 discarded. It follows then that the total number of
102 simultaneous connections the pool will allow is pool_size +
103 `max_overflow`, and the total number of "sleeping"
104 connections the pool will allow is pool_size. `max_overflow`
105 can be set to -1 to indicate no overflow limit; no limit
106 will be placed on the total number of concurrent
107 connections. Defaults to 10.
109 :param timeout: The number of seconds to wait before giving up
110 on returning a connection. Defaults to 30.0. This can be a float
111 but is subject to the limitations of Python time functions which
112 may not be reliable in the tens of milliseconds.
114 :param use_lifo: use LIFO (last-in-first-out) when retrieving
115 connections instead of FIFO (first-in-first-out). Using LIFO, a
116 server-side timeout scheme can reduce the number of connections used
117 during non-peak periods of use. When planning for server-side
118 timeouts, ensure that a recycle or pre-ping strategy is in use to
119 gracefully handle stale connections.
121 .. seealso::
123 :ref:`pool_use_lifo`
125 :ref:`pool_disconnects`
127 :param \**kw: Other keyword arguments including
128 :paramref:`_pool.Pool.recycle`, :paramref:`_pool.Pool.echo`,
129 :paramref:`_pool.Pool.reset_on_return` and others are passed to the
130 :class:`_pool.Pool` constructor.
132 """
134 Pool.__init__(self, creator, **kw)
135 self._pool = self._queue_class(pool_size, use_lifo=use_lifo)
136 self._overflow = 0 - pool_size
137 self._max_overflow = -1 if pool_size == 0 else max_overflow
138 self._timeout = timeout
139 self._overflow_lock = threading.Lock()
141 def _do_return_conn(self, record: ConnectionPoolEntry) -> None:
142 try:
143 self._pool.put(record, False)
144 except sqla_queue.Full:
145 try:
146 record.close()
147 finally:
148 self._dec_overflow()
150 def _do_get(self) -> ConnectionPoolEntry:
151 use_overflow = self._max_overflow > -1
153 wait = use_overflow and self._overflow >= self._max_overflow
154 try:
155 return self._pool.get(wait, self._timeout)
156 except sqla_queue.Empty:
157 # don't do things inside of "except Empty", because when we say
158 # we timed out or can't connect and raise, Python 3 tells
159 # people the real error is queue.Empty which it isn't.
160 pass
161 if use_overflow and self._overflow >= self._max_overflow:
162 if not wait:
163 return self._do_get()
164 else:
165 raise exc.TimeoutError(
166 "QueuePool limit of size %d overflow %d reached, "
167 "connection timed out, timeout %0.2f"
168 % (self.size(), self.overflow(), self._timeout),
169 code="3o7r",
170 )
172 if self._inc_overflow():
173 try:
174 return self._create_connection()
175 except:
176 with util.safe_reraise():
177 self._dec_overflow()
178 raise
179 else:
180 return self._do_get()
182 def _inc_overflow(self) -> bool:
183 if self._max_overflow == -1:
184 self._overflow += 1
185 return True
186 with self._overflow_lock:
187 if self._overflow < self._max_overflow:
188 self._overflow += 1
189 return True
190 else:
191 return False
193 def _dec_overflow(self) -> Literal[True]:
194 if self._max_overflow == -1:
195 self._overflow -= 1
196 return True
197 with self._overflow_lock:
198 self._overflow -= 1
199 return True
201 def recreate(self) -> QueuePool:
202 self.logger.info("Pool recreating")
203 return self.__class__(
204 self._creator,
205 pool_size=self._pool.maxsize,
206 max_overflow=self._max_overflow,
207 pre_ping=self._pre_ping,
208 use_lifo=self._pool.use_lifo,
209 timeout=self._timeout,
210 recycle=self._recycle,
211 echo=self.echo,
212 logging_name=self._orig_logging_name,
213 reset_on_return=self._reset_on_return,
214 _dispatch=self.dispatch,
215 dialect=self._dialect,
216 )
218 def dispose(self) -> None:
219 while True:
220 try:
221 conn = self._pool.get(False)
222 conn.close()
223 except sqla_queue.Empty:
224 break
226 self._overflow = 0 - self.size()
227 self.logger.info("Pool disposed. %s", self.status())
229 def status(self) -> str:
230 return (
231 "Pool size: %d Connections in pool: %d "
232 "Current Overflow: %d Current Checked out "
233 "connections: %d"
234 % (
235 self.size(),
236 self.checkedin(),
237 self.overflow(),
238 self.checkedout(),
239 )
240 )
242 def size(self) -> int:
243 return self._pool.maxsize
245 def timeout(self) -> float:
246 return self._timeout
248 def checkedin(self) -> int:
249 return self._pool.qsize()
251 def overflow(self) -> int:
252 return self._overflow if self._pool.maxsize else 0
254 def checkedout(self) -> int:
255 return self._pool.maxsize - self._pool.qsize() + self._overflow
258class AsyncAdaptedQueuePool(QueuePool):
259 """An asyncio-compatible version of :class:`.QueuePool`.
261 This pool is used by default when using :class:`.AsyncEngine` engines that
262 were generated from :func:`_asyncio.create_async_engine`. It uses an
263 asyncio-compatible queue implementation that does not use
264 ``threading.Lock``.
266 The arguments and operation of :class:`.AsyncAdaptedQueuePool` are
267 otherwise identical to that of :class:`.QueuePool`.
269 """
271 _is_asyncio = True
272 _queue_class: Type[sqla_queue.QueueCommon[ConnectionPoolEntry]] = (
273 sqla_queue.AsyncAdaptedQueue
274 )
276 _dialect = _AsyncConnDialect()
279class NullPool(Pool):
280 """A Pool which does not pool connections.
282 Instead it literally opens and closes the underlying DB-API connection
283 per each connection open/close.
285 Reconnect-related functions such as ``recycle`` and connection
286 invalidation are not supported by this Pool implementation, since
287 no connections are held persistently.
289 The :class:`.NullPool` class **is compatible** with asyncio and
290 :func:`_asyncio.create_async_engine`.
292 """
294 def status(self) -> str:
295 return "NullPool"
297 def _do_return_conn(self, record: ConnectionPoolEntry) -> None:
298 record.close()
300 def _do_get(self) -> ConnectionPoolEntry:
301 return self._create_connection()
303 def recreate(self) -> NullPool:
304 self.logger.info("Pool recreating")
306 return self.__class__(
307 self._creator,
308 recycle=self._recycle,
309 echo=self.echo,
310 logging_name=self._orig_logging_name,
311 reset_on_return=self._reset_on_return,
312 pre_ping=self._pre_ping,
313 _dispatch=self.dispatch,
314 dialect=self._dialect,
315 )
317 def dispose(self) -> None:
318 pass
321class SingletonThreadPool(Pool):
322 """A Pool that maintains one connection per thread.
324 Maintains one connection per each thread, never moving a connection to a
325 thread other than the one which it was created in.
327 .. warning:: the :class:`.SingletonThreadPool` will call ``.close()``
328 on arbitrary connections that exist beyond the size setting of
329 ``pool_size``, e.g. if more unique **thread identities**
330 than what ``pool_size`` states are used. This cleanup is
331 non-deterministic and not sensitive to whether or not the connections
332 linked to those thread identities are currently in use.
334 :class:`.SingletonThreadPool` may be improved in a future release,
335 however in its current status it is generally used only for test
336 scenarios using a SQLite ``:memory:`` database and is not recommended
337 for production use.
339 The :class:`.SingletonThreadPool` class **is not compatible** with asyncio
340 and :func:`_asyncio.create_async_engine`.
343 Options are the same as those of :class:`_pool.Pool`, as well as:
345 :param pool_size: The number of threads in which to maintain connections
346 at once. Defaults to five.
348 :class:`.SingletonThreadPool` is used by the SQLite dialect
349 automatically when a memory-based database is used.
350 See :ref:`sqlite_toplevel`.
352 """
354 _is_asyncio = False
356 def __init__(
357 self,
358 creator: Union[_CreatorFnType, _CreatorWRecFnType],
359 pool_size: int = 5,
360 **kw: Any,
361 ):
362 Pool.__init__(self, creator, **kw)
363 self._conn = threading.local()
364 self._fairy = threading.local()
365 self._all_conns: Set[ConnectionPoolEntry] = set()
366 self.size = pool_size
368 def recreate(self) -> SingletonThreadPool:
369 self.logger.info("Pool recreating")
370 return self.__class__(
371 self._creator,
372 pool_size=self.size,
373 recycle=self._recycle,
374 echo=self.echo,
375 pre_ping=self._pre_ping,
376 logging_name=self._orig_logging_name,
377 reset_on_return=self._reset_on_return,
378 _dispatch=self.dispatch,
379 dialect=self._dialect,
380 )
382 def _transfer_from(
383 self, other_singleton_pool: SingletonThreadPool
384 ) -> None:
385 # used by the test suite to make a new engine / pool without
386 # losing the state of an existing SQLite :memory: connection
387 assert not hasattr(other_singleton_pool._fairy, "current")
388 self._conn = other_singleton_pool._conn
389 self._all_conns = other_singleton_pool._all_conns
391 def dispose(self) -> None:
392 """Dispose of this pool."""
394 for conn in self._all_conns:
395 try:
396 conn.close()
397 except Exception:
398 # pysqlite won't even let you close a conn from a thread
399 # that didn't create it
400 pass
402 self._all_conns.clear()
404 def _cleanup(self) -> None:
405 while len(self._all_conns) >= self.size:
406 c = self._all_conns.pop()
407 c.close()
409 def status(self) -> str:
410 return "SingletonThreadPool id:%d size: %d" % (
411 id(self),
412 len(self._all_conns),
413 )
415 def _do_return_conn(self, record: ConnectionPoolEntry) -> None:
416 try:
417 del self._fairy.current
418 except AttributeError:
419 pass
421 def _do_get(self) -> ConnectionPoolEntry:
422 try:
423 if TYPE_CHECKING:
424 c = cast(ConnectionPoolEntry, self._conn.current())
425 else:
426 c = self._conn.current()
427 if c:
428 return c
429 except AttributeError:
430 pass
431 c = self._create_connection()
432 self._conn.current = weakref.ref(c)
433 if len(self._all_conns) >= self.size:
434 self._cleanup()
435 self._all_conns.add(c)
436 return c
438 def connect(self) -> PoolProxiedConnection:
439 # vendored from Pool to include the now removed use_threadlocal
440 # behavior
441 try:
442 rec = cast(_ConnectionFairy, self._fairy.current())
443 except AttributeError:
444 pass
445 else:
446 if rec is not None:
447 return rec._checkout_existing()
449 return _ConnectionFairy._checkout(self, self._fairy)
452class StaticPool(Pool):
453 """A Pool of exactly one connection, used for all requests.
455 Reconnect-related functions such as ``recycle`` and connection
456 invalidation (which is also used to support auto-reconnect) are only
457 partially supported right now and may not yield good results.
459 The :class:`.StaticPool` class **is compatible** with asyncio and
460 :func:`_asyncio.create_async_engine`.
462 """
464 @util.memoized_property
465 def connection(self) -> _ConnectionRecord:
466 return _ConnectionRecord(self)
468 def status(self) -> str:
469 return "StaticPool"
471 def dispose(self) -> None:
472 if (
473 "connection" in self.__dict__
474 and self.connection.dbapi_connection is not None
475 ):
476 self.connection.close()
477 del self.__dict__["connection"]
479 def recreate(self) -> StaticPool:
480 self.logger.info("Pool recreating")
481 return self.__class__(
482 creator=self._creator,
483 recycle=self._recycle,
484 reset_on_return=self._reset_on_return,
485 pre_ping=self._pre_ping,
486 echo=self.echo,
487 logging_name=self._orig_logging_name,
488 _dispatch=self.dispatch,
489 dialect=self._dialect,
490 )
492 def _transfer_from(self, other_static_pool: StaticPool) -> None:
493 # used by the test suite to make a new engine / pool without
494 # losing the state of an existing SQLite :memory: connection
495 def creator(rec: ConnectionPoolEntry) -> DBAPIConnection:
496 conn = other_static_pool.connection.dbapi_connection
497 assert conn is not None
498 return conn
500 self._invoke_creator = creator
502 def _create_connection(self) -> ConnectionPoolEntry:
503 raise NotImplementedError()
505 def _do_return_conn(self, record: ConnectionPoolEntry) -> None:
506 pass
508 def _do_get(self) -> ConnectionPoolEntry:
509 rec = self.connection
510 if rec._is_hard_or_soft_invalidated():
511 del self.__dict__["connection"]
512 rec = self.connection
514 return rec
517class AssertionPool(Pool):
518 """A :class:`_pool.Pool` that allows at most one checked out connection at
519 any given time.
521 This will raise an exception if more than one connection is checked out
522 at a time. Useful for debugging code that is using more connections
523 than desired.
525 The :class:`.AssertionPool` class **is compatible** with asyncio and
526 :func:`_asyncio.create_async_engine`.
528 """
530 _conn: Optional[ConnectionPoolEntry]
531 _checkout_traceback: Optional[List[str]]
533 def __init__(self, *args: Any, **kw: Any):
534 self._conn = None
535 self._checked_out = False
536 self._store_traceback = kw.pop("store_traceback", True)
537 self._checkout_traceback = None
538 Pool.__init__(self, *args, **kw)
540 def status(self) -> str:
541 return "AssertionPool"
543 def _do_return_conn(self, record: ConnectionPoolEntry) -> None:
544 if not self._checked_out:
545 raise AssertionError("connection is not checked out")
546 self._checked_out = False
547 assert record is self._conn
549 def dispose(self) -> None:
550 self._checked_out = False
551 if self._conn:
552 self._conn.close()
554 def recreate(self) -> AssertionPool:
555 self.logger.info("Pool recreating")
556 return self.__class__(
557 self._creator,
558 echo=self.echo,
559 pre_ping=self._pre_ping,
560 recycle=self._recycle,
561 reset_on_return=self._reset_on_return,
562 logging_name=self._orig_logging_name,
563 _dispatch=self.dispatch,
564 dialect=self._dialect,
565 )
567 def _do_get(self) -> ConnectionPoolEntry:
568 if self._checked_out:
569 if self._checkout_traceback:
570 suffix = " at:\n%s" % "".join(
571 chop_traceback(self._checkout_traceback)
572 )
573 else:
574 suffix = ""
575 raise AssertionError("connection is already checked out" + suffix)
577 if not self._conn:
578 self._conn = self._create_connection()
580 self._checked_out = True
581 if self._store_traceback:
582 self._checkout_traceback = traceback.format_stack()
583 return self._conn