1# pool/impl.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8
9"""Pool implementation classes."""
10from __future__ import annotations
11
12import threading
13import traceback
14import typing
15from typing import Any
16from typing import cast
17from typing import List
18from typing import Optional
19from typing import Set
20from typing import Type
21from typing import TYPE_CHECKING
22from typing import Union
23import weakref
24
25from .base import _AsyncConnDialect
26from .base import _ConnectionFairy
27from .base import _ConnectionRecord
28from .base import _CreatorFnType
29from .base import _CreatorWRecFnType
30from .base import ConnectionPoolEntry
31from .base import Pool
32from .base import PoolProxiedConnection
33from .. import exc
34from .. import util
35from ..util import chop_traceback
36from ..util import queue as sqla_queue
37from ..util.typing import Literal
38
39if typing.TYPE_CHECKING:
40 from ..engine.interfaces import DBAPIConnection
41
42
43class QueuePool(Pool):
44 """A :class:`_pool.Pool`
45 that imposes a limit on the number of open connections.
46
47 :class:`.QueuePool` is the default pooling implementation used for
48 all :class:`_engine.Engine` objects other than SQLite with a ``:memory:``
49 database.
50
51 The :class:`.QueuePool` class **is not compatible** with asyncio and
52 :func:`_asyncio.create_async_engine`. The
53 :class:`.AsyncAdaptedQueuePool` class is used automatically when
54 using :func:`_asyncio.create_async_engine`, if no other kind of pool
55 is specified.
56
57 .. seealso::
58
59 :class:`.AsyncAdaptedQueuePool`
60
61 """
62
63 _is_asyncio = False
64
65 _queue_class: Type[sqla_queue.QueueCommon[ConnectionPoolEntry]] = (
66 sqla_queue.Queue
67 )
68
69 _pool: sqla_queue.QueueCommon[ConnectionPoolEntry]
70
71 def __init__(
72 self,
73 creator: Union[_CreatorFnType, _CreatorWRecFnType],
74 pool_size: int = 5,
75 max_overflow: int = 10,
76 timeout: float = 30.0,
77 use_lifo: bool = False,
78 **kw: Any,
79 ):
80 r"""
81 Construct a QueuePool.
82
83 :param creator: a callable function that returns a DB-API
84 connection object, same as that of :paramref:`_pool.Pool.creator`.
85
86 :param pool_size: The size of the pool to be maintained,
87 defaults to 5. This is the largest number of connections that
88 will be kept persistently in the pool. Note that the pool
89 begins with no connections; once this number of connections
90 is requested, that number of connections will remain.
91 ``pool_size`` can be set to 0 to indicate no size limit; to
92 disable pooling, use a :class:`~sqlalchemy.pool.NullPool`
93 instead.
94
95 :param max_overflow: The maximum overflow size of the
96 pool. When the number of checked-out connections reaches the
97 size set in pool_size, additional connections will be
98 returned up to this limit. When those additional connections
99 are returned to the pool, they are disconnected and
100 discarded. It follows then that the total number of
101 simultaneous connections the pool will allow is pool_size +
102 `max_overflow`, and the total number of "sleeping"
103 connections the pool will allow is pool_size. `max_overflow`
104 can be set to -1 to indicate no overflow limit; no limit
105 will be placed on the total number of concurrent
106 connections. Defaults to 10.
107
108 :param timeout: The number of seconds to wait before giving up
109 on returning a connection. Defaults to 30.0. This can be a float
110 but is subject to the limitations of Python time functions which
111 may not be reliable in the tens of milliseconds.
112
113 :param use_lifo: use LIFO (last-in-first-out) when retrieving
114 connections instead of FIFO (first-in-first-out). Using LIFO, a
115 server-side timeout scheme can reduce the number of connections used
116 during non-peak periods of use. When planning for server-side
117 timeouts, ensure that a recycle or pre-ping strategy is in use to
118 gracefully handle stale connections.
119
120 .. versionadded:: 1.3
121
122 .. seealso::
123
124 :ref:`pool_use_lifo`
125
126 :ref:`pool_disconnects`
127
128 :param \**kw: Other keyword arguments including
129 :paramref:`_pool.Pool.recycle`, :paramref:`_pool.Pool.echo`,
130 :paramref:`_pool.Pool.reset_on_return` and others are passed to the
131 :class:`_pool.Pool` constructor.
132
133 """
134
135 Pool.__init__(self, creator, **kw)
136 self._pool = self._queue_class(pool_size, use_lifo=use_lifo)
137 self._overflow = 0 - pool_size
138 self._max_overflow = -1 if pool_size == 0 else max_overflow
139 self._timeout = timeout
140 self._overflow_lock = threading.Lock()
141
142 def _do_return_conn(self, record: ConnectionPoolEntry) -> None:
143 try:
144 self._pool.put(record, False)
145 except sqla_queue.Full:
146 try:
147 record.close()
148 finally:
149 self._dec_overflow()
150
151 def _do_get(self) -> ConnectionPoolEntry:
152 use_overflow = self._max_overflow > -1
153
154 wait = use_overflow and self._overflow >= self._max_overflow
155 try:
156 return self._pool.get(wait, self._timeout)
157 except sqla_queue.Empty:
158 # don't do things inside of "except Empty", because when we say
159 # we timed out or can't connect and raise, Python 3 tells
160 # people the real error is queue.Empty which it isn't.
161 pass
162 if use_overflow and self._overflow >= self._max_overflow:
163 if not wait:
164 return self._do_get()
165 else:
166 raise exc.TimeoutError(
167 "QueuePool limit of size %d overflow %d reached, "
168 "connection timed out, timeout %0.2f"
169 % (self.size(), self.overflow(), self._timeout),
170 code="3o7r",
171 )
172
173 if self._inc_overflow():
174 try:
175 return self._create_connection()
176 except:
177 with util.safe_reraise():
178 self._dec_overflow()
179 raise
180 else:
181 return self._do_get()
182
183 def _inc_overflow(self) -> bool:
184 if self._max_overflow == -1:
185 self._overflow += 1
186 return True
187 with self._overflow_lock:
188 if self._overflow < self._max_overflow:
189 self._overflow += 1
190 return True
191 else:
192 return False
193
194 def _dec_overflow(self) -> Literal[True]:
195 if self._max_overflow == -1:
196 self._overflow -= 1
197 return True
198 with self._overflow_lock:
199 self._overflow -= 1
200 return True
201
202 def recreate(self) -> QueuePool:
203 self.logger.info("Pool recreating")
204 return self.__class__(
205 self._creator,
206 pool_size=self._pool.maxsize,
207 max_overflow=self._max_overflow,
208 pre_ping=self._pre_ping,
209 use_lifo=self._pool.use_lifo,
210 timeout=self._timeout,
211 recycle=self._recycle,
212 echo=self.echo,
213 logging_name=self._orig_logging_name,
214 reset_on_return=self._reset_on_return,
215 _dispatch=self.dispatch,
216 dialect=self._dialect,
217 )
218
219 def dispose(self) -> None:
220 while True:
221 try:
222 conn = self._pool.get(False)
223 conn.close()
224 except sqla_queue.Empty:
225 break
226
227 self._overflow = 0 - self.size()
228 self.logger.info("Pool disposed. %s", self.status())
229
230 def status(self) -> str:
231 return (
232 "Pool size: %d Connections in pool: %d "
233 "Current Overflow: %d Current Checked out "
234 "connections: %d"
235 % (
236 self.size(),
237 self.checkedin(),
238 self.overflow(),
239 self.checkedout(),
240 )
241 )
242
243 def size(self) -> int:
244 return self._pool.maxsize
245
246 def timeout(self) -> float:
247 return self._timeout
248
249 def checkedin(self) -> int:
250 return self._pool.qsize()
251
252 def overflow(self) -> int:
253 return self._overflow if self._pool.maxsize else 0
254
255 def checkedout(self) -> int:
256 return self._pool.maxsize - self._pool.qsize() + self._overflow
257
258
259class AsyncAdaptedQueuePool(QueuePool):
260 """An asyncio-compatible version of :class:`.QueuePool`.
261
262 This pool is used by default when using :class:`.AsyncEngine` engines that
263 were generated from :func:`_asyncio.create_async_engine`. It uses an
264 asyncio-compatible queue implementation that does not use
265 ``threading.Lock``.
266
267 The arguments and operation of :class:`.AsyncAdaptedQueuePool` are
268 otherwise identical to that of :class:`.QueuePool`.
269
270 """
271
272 _is_asyncio = True
273 _queue_class: Type[sqla_queue.QueueCommon[ConnectionPoolEntry]] = (
274 sqla_queue.AsyncAdaptedQueue
275 )
276
277 _dialect = _AsyncConnDialect()
278
279
280class FallbackAsyncAdaptedQueuePool(AsyncAdaptedQueuePool):
281 _queue_class = sqla_queue.FallbackAsyncAdaptedQueue # type: ignore[assignment] # noqa: E501
282
283
284class NullPool(Pool):
285 """A Pool which does not pool connections.
286
287 Instead it literally opens and closes the underlying DB-API connection
288 per each connection open/close.
289
290 Reconnect-related functions such as ``recycle`` and connection
291 invalidation are not supported by this Pool implementation, since
292 no connections are held persistently.
293
294 The :class:`.NullPool` class **is compatible** with asyncio and
295 :func:`_asyncio.create_async_engine`.
296
297 """
298
299 def status(self) -> str:
300 return "NullPool"
301
302 def _do_return_conn(self, record: ConnectionPoolEntry) -> None:
303 record.close()
304
305 def _do_get(self) -> ConnectionPoolEntry:
306 return self._create_connection()
307
308 def recreate(self) -> NullPool:
309 self.logger.info("Pool recreating")
310
311 return self.__class__(
312 self._creator,
313 recycle=self._recycle,
314 echo=self.echo,
315 logging_name=self._orig_logging_name,
316 reset_on_return=self._reset_on_return,
317 pre_ping=self._pre_ping,
318 _dispatch=self.dispatch,
319 dialect=self._dialect,
320 )
321
322 def dispose(self) -> None:
323 pass
324
325
326class SingletonThreadPool(Pool):
327 """A Pool that maintains one connection per thread.
328
329 Maintains one connection per each thread, never moving a connection to a
330 thread other than the one which it was created in.
331
332 .. warning:: the :class:`.SingletonThreadPool` will call ``.close()``
333 on arbitrary connections that exist beyond the size setting of
334 ``pool_size``, e.g. if more unique **thread identities**
335 than what ``pool_size`` states are used. This cleanup is
336 non-deterministic and not sensitive to whether or not the connections
337 linked to those thread identities are currently in use.
338
339 :class:`.SingletonThreadPool` may be improved in a future release,
340 however in its current status it is generally used only for test
341 scenarios using a SQLite ``:memory:`` database and is not recommended
342 for production use.
343
344 The :class:`.SingletonThreadPool` class **is not compatible** with asyncio
345 and :func:`_asyncio.create_async_engine`.
346
347
348 Options are the same as those of :class:`_pool.Pool`, as well as:
349
350 :param pool_size: The number of threads in which to maintain connections
351 at once. Defaults to five.
352
353 :class:`.SingletonThreadPool` is used by the SQLite dialect
354 automatically when a memory-based database is used.
355 See :ref:`sqlite_toplevel`.
356
357 """
358
359 _is_asyncio = False
360
361 def __init__(
362 self,
363 creator: Union[_CreatorFnType, _CreatorWRecFnType],
364 pool_size: int = 5,
365 **kw: Any,
366 ):
367 Pool.__init__(self, creator, **kw)
368 self._conn = threading.local()
369 self._fairy = threading.local()
370 self._all_conns: Set[ConnectionPoolEntry] = set()
371 self.size = pool_size
372
373 def recreate(self) -> SingletonThreadPool:
374 self.logger.info("Pool recreating")
375 return self.__class__(
376 self._creator,
377 pool_size=self.size,
378 recycle=self._recycle,
379 echo=self.echo,
380 pre_ping=self._pre_ping,
381 logging_name=self._orig_logging_name,
382 reset_on_return=self._reset_on_return,
383 _dispatch=self.dispatch,
384 dialect=self._dialect,
385 )
386
387 def dispose(self) -> None:
388 """Dispose of this pool."""
389
390 for conn in self._all_conns:
391 try:
392 conn.close()
393 except Exception:
394 # pysqlite won't even let you close a conn from a thread
395 # that didn't create it
396 pass
397
398 self._all_conns.clear()
399
400 def _cleanup(self) -> None:
401 while len(self._all_conns) >= self.size:
402 c = self._all_conns.pop()
403 c.close()
404
405 def status(self) -> str:
406 return "SingletonThreadPool id:%d size: %d" % (
407 id(self),
408 len(self._all_conns),
409 )
410
411 def _do_return_conn(self, record: ConnectionPoolEntry) -> None:
412 try:
413 del self._fairy.current
414 except AttributeError:
415 pass
416
417 def _do_get(self) -> ConnectionPoolEntry:
418 try:
419 if TYPE_CHECKING:
420 c = cast(ConnectionPoolEntry, self._conn.current())
421 else:
422 c = self._conn.current()
423 if c:
424 return c
425 except AttributeError:
426 pass
427 c = self._create_connection()
428 self._conn.current = weakref.ref(c)
429 if len(self._all_conns) >= self.size:
430 self._cleanup()
431 self._all_conns.add(c)
432 return c
433
434 def connect(self) -> PoolProxiedConnection:
435 # vendored from Pool to include the now removed use_threadlocal
436 # behavior
437 try:
438 rec = cast(_ConnectionFairy, self._fairy.current())
439 except AttributeError:
440 pass
441 else:
442 if rec is not None:
443 return rec._checkout_existing()
444
445 return _ConnectionFairy._checkout(self, self._fairy)
446
447
448class StaticPool(Pool):
449 """A Pool of exactly one connection, used for all requests.
450
451 Reconnect-related functions such as ``recycle`` and connection
452 invalidation (which is also used to support auto-reconnect) are only
453 partially supported right now and may not yield good results.
454
455 The :class:`.StaticPool` class **is compatible** with asyncio and
456 :func:`_asyncio.create_async_engine`.
457
458 """
459
460 @util.memoized_property
461 def connection(self) -> _ConnectionRecord:
462 return _ConnectionRecord(self)
463
464 def status(self) -> str:
465 return "StaticPool"
466
467 def dispose(self) -> None:
468 if (
469 "connection" in self.__dict__
470 and self.connection.dbapi_connection is not None
471 ):
472 self.connection.close()
473 del self.__dict__["connection"]
474
475 def recreate(self) -> StaticPool:
476 self.logger.info("Pool recreating")
477 return self.__class__(
478 creator=self._creator,
479 recycle=self._recycle,
480 reset_on_return=self._reset_on_return,
481 pre_ping=self._pre_ping,
482 echo=self.echo,
483 logging_name=self._orig_logging_name,
484 _dispatch=self.dispatch,
485 dialect=self._dialect,
486 )
487
488 def _transfer_from(self, other_static_pool: StaticPool) -> None:
489 # used by the test suite to make a new engine / pool without
490 # losing the state of an existing SQLite :memory: connection
491 def creator(rec: ConnectionPoolEntry) -> DBAPIConnection:
492 conn = other_static_pool.connection.dbapi_connection
493 assert conn is not None
494 return conn
495
496 self._invoke_creator = creator
497
498 def _create_connection(self) -> ConnectionPoolEntry:
499 raise NotImplementedError()
500
501 def _do_return_conn(self, record: ConnectionPoolEntry) -> None:
502 pass
503
504 def _do_get(self) -> ConnectionPoolEntry:
505 rec = self.connection
506 if rec._is_hard_or_soft_invalidated():
507 del self.__dict__["connection"]
508 rec = self.connection
509
510 return rec
511
512
513class AssertionPool(Pool):
514 """A :class:`_pool.Pool` that allows at most one checked out connection at
515 any given time.
516
517 This will raise an exception if more than one connection is checked out
518 at a time. Useful for debugging code that is using more connections
519 than desired.
520
521 The :class:`.AssertionPool` class **is compatible** with asyncio and
522 :func:`_asyncio.create_async_engine`.
523
524 """
525
526 _conn: Optional[ConnectionPoolEntry]
527 _checkout_traceback: Optional[List[str]]
528
529 def __init__(self, *args: Any, **kw: Any):
530 self._conn = None
531 self._checked_out = False
532 self._store_traceback = kw.pop("store_traceback", True)
533 self._checkout_traceback = None
534 Pool.__init__(self, *args, **kw)
535
536 def status(self) -> str:
537 return "AssertionPool"
538
539 def _do_return_conn(self, record: ConnectionPoolEntry) -> None:
540 if not self._checked_out:
541 raise AssertionError("connection is not checked out")
542 self._checked_out = False
543 assert record is self._conn
544
545 def dispose(self) -> None:
546 self._checked_out = False
547 if self._conn:
548 self._conn.close()
549
550 def recreate(self) -> AssertionPool:
551 self.logger.info("Pool recreating")
552 return self.__class__(
553 self._creator,
554 echo=self.echo,
555 pre_ping=self._pre_ping,
556 recycle=self._recycle,
557 reset_on_return=self._reset_on_return,
558 logging_name=self._orig_logging_name,
559 _dispatch=self.dispatch,
560 dialect=self._dialect,
561 )
562
563 def _do_get(self) -> ConnectionPoolEntry:
564 if self._checked_out:
565 if self._checkout_traceback:
566 suffix = " at:\n%s" % "".join(
567 chop_traceback(self._checkout_traceback)
568 )
569 else:
570 suffix = ""
571 raise AssertionError("connection is already checked out" + suffix)
572
573 if not self._conn:
574 self._conn = self._create_connection()
575
576 self._checked_out = True
577 if self._store_traceback:
578 self._checkout_traceback = traceback.format_stack()
579 return self._conn