1# pool/impl.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8
9"""Pool implementation classes.
10
11"""
12from __future__ import annotations
13
14import threading
15import traceback
16import typing
17from typing import Any
18from typing import cast
19from typing import List
20from typing import Optional
21from typing import Set
22from typing import Type
23from typing import TYPE_CHECKING
24from typing import Union
25import weakref
26
27from .base import _AsyncConnDialect
28from .base import _ConnectionFairy
29from .base import _ConnectionRecord
30from .base import _CreatorFnType
31from .base import _CreatorWRecFnType
32from .base import ConnectionPoolEntry
33from .base import Pool
34from .base import PoolProxiedConnection
35from .. import exc
36from .. import util
37from ..util import chop_traceback
38from ..util import queue as sqla_queue
39from ..util.typing import Literal
40
41if typing.TYPE_CHECKING:
42 from ..engine.interfaces import DBAPIConnection
43
44
45class QueuePool(Pool):
46 """A :class:`_pool.Pool`
47 that imposes a limit on the number of open connections.
48
49 :class:`.QueuePool` is the default pooling implementation used for
50 all :class:`_engine.Engine` objects other than SQLite with a ``:memory:``
51 database.
52
53 The :class:`.QueuePool` class **is not compatible** with asyncio and
54 :func:`_asyncio.create_async_engine`. The
55 :class:`.AsyncAdaptedQueuePool` class is used automatically when
56 using :func:`_asyncio.create_async_engine`, if no other kind of pool
57 is specified.
58
59 .. seealso::
60
61 :class:`.AsyncAdaptedQueuePool`
62
63 """
64
65 _is_asyncio = False # type: ignore[assignment]
66
67 _queue_class: Type[sqla_queue.QueueCommon[ConnectionPoolEntry]] = (
68 sqla_queue.Queue
69 )
70
71 _pool: sqla_queue.QueueCommon[ConnectionPoolEntry]
72
73 def __init__(
74 self,
75 creator: Union[_CreatorFnType, _CreatorWRecFnType],
76 pool_size: int = 5,
77 max_overflow: int = 10,
78 timeout: float = 30.0,
79 use_lifo: bool = False,
80 **kw: Any,
81 ):
82 r"""
83 Construct a QueuePool.
84
85 :param creator: a callable function that returns a DB-API
86 connection object, same as that of :paramref:`_pool.Pool.creator`.
87
88 :param pool_size: The size of the pool to be maintained,
89 defaults to 5. This is the largest number of connections that
90 will be kept persistently in the pool. Note that the pool
91 begins with no connections; once this number of connections
92 is requested, that number of connections will remain.
93 ``pool_size`` can be set to 0 to indicate no size limit; to
94 disable pooling, use a :class:`~sqlalchemy.pool.NullPool`
95 instead.
96
97 :param max_overflow: The maximum overflow size of the
98 pool. When the number of checked-out connections reaches the
99 size set in pool_size, additional connections will be
100 returned up to this limit. When those additional connections
101 are returned to the pool, they are disconnected and
102 discarded. It follows then that the total number of
103 simultaneous connections the pool will allow is pool_size +
104 `max_overflow`, and the total number of "sleeping"
105 connections the pool will allow is pool_size. `max_overflow`
106 can be set to -1 to indicate no overflow limit; no limit
107 will be placed on the total number of concurrent
108 connections. Defaults to 10.
109
110 :param timeout: The number of seconds to wait before giving up
111 on returning a connection. Defaults to 30.0. This can be a float
112 but is subject to the limitations of Python time functions which
113 may not be reliable in the tens of milliseconds.
114
115 :param use_lifo: use LIFO (last-in-first-out) when retrieving
116 connections instead of FIFO (first-in-first-out). Using LIFO, a
117 server-side timeout scheme can reduce the number of connections used
118 during non-peak periods of use. When planning for server-side
119 timeouts, ensure that a recycle or pre-ping strategy is in use to
120 gracefully handle stale connections.
121
122 .. versionadded:: 1.3
123
124 .. seealso::
125
126 :ref:`pool_use_lifo`
127
128 :ref:`pool_disconnects`
129
130 :param \**kw: Other keyword arguments including
131 :paramref:`_pool.Pool.recycle`, :paramref:`_pool.Pool.echo`,
132 :paramref:`_pool.Pool.reset_on_return` and others are passed to the
133 :class:`_pool.Pool` constructor.
134
135 """
136
137 Pool.__init__(self, creator, **kw)
138 self._pool = self._queue_class(pool_size, use_lifo=use_lifo)
139 self._overflow = 0 - pool_size
140 self._max_overflow = -1 if pool_size == 0 else max_overflow
141 self._timeout = timeout
142 self._overflow_lock = threading.Lock()
143
144 def _do_return_conn(self, record: ConnectionPoolEntry) -> None:
145 try:
146 self._pool.put(record, False)
147 except sqla_queue.Full:
148 try:
149 record.close()
150 finally:
151 self._dec_overflow()
152
153 def _do_get(self) -> ConnectionPoolEntry:
154 use_overflow = self._max_overflow > -1
155
156 wait = use_overflow and self._overflow >= self._max_overflow
157 try:
158 return self._pool.get(wait, self._timeout)
159 except sqla_queue.Empty:
160 # don't do things inside of "except Empty", because when we say
161 # we timed out or can't connect and raise, Python 3 tells
162 # people the real error is queue.Empty which it isn't.
163 pass
164 if use_overflow and self._overflow >= self._max_overflow:
165 if not wait:
166 return self._do_get()
167 else:
168 raise exc.TimeoutError(
169 "QueuePool limit of size %d overflow %d reached, "
170 "connection timed out, timeout %0.2f"
171 % (self.size(), self.overflow(), self._timeout),
172 code="3o7r",
173 )
174
175 if self._inc_overflow():
176 try:
177 return self._create_connection()
178 except:
179 with util.safe_reraise():
180 self._dec_overflow()
181 raise
182 else:
183 return self._do_get()
184
185 def _inc_overflow(self) -> bool:
186 if self._max_overflow == -1:
187 self._overflow += 1
188 return True
189 with self._overflow_lock:
190 if self._overflow < self._max_overflow:
191 self._overflow += 1
192 return True
193 else:
194 return False
195
196 def _dec_overflow(self) -> Literal[True]:
197 if self._max_overflow == -1:
198 self._overflow -= 1
199 return True
200 with self._overflow_lock:
201 self._overflow -= 1
202 return True
203
204 def recreate(self) -> QueuePool:
205 self.logger.info("Pool recreating")
206 return self.__class__(
207 self._creator,
208 pool_size=self._pool.maxsize,
209 max_overflow=self._max_overflow,
210 pre_ping=self._pre_ping,
211 use_lifo=self._pool.use_lifo,
212 timeout=self._timeout,
213 recycle=self._recycle,
214 echo=self.echo,
215 logging_name=self._orig_logging_name,
216 reset_on_return=self._reset_on_return,
217 _dispatch=self.dispatch,
218 dialect=self._dialect,
219 )
220
221 def dispose(self) -> None:
222 while True:
223 try:
224 conn = self._pool.get(False)
225 conn.close()
226 except sqla_queue.Empty:
227 break
228
229 self._overflow = 0 - self.size()
230 self.logger.info("Pool disposed. %s", self.status())
231
232 def status(self) -> str:
233 return (
234 "Pool size: %d Connections in pool: %d "
235 "Current Overflow: %d Current Checked out "
236 "connections: %d"
237 % (
238 self.size(),
239 self.checkedin(),
240 self.overflow(),
241 self.checkedout(),
242 )
243 )
244
245 def size(self) -> int:
246 return self._pool.maxsize
247
248 def timeout(self) -> float:
249 return self._timeout
250
251 def checkedin(self) -> int:
252 return self._pool.qsize()
253
254 def overflow(self) -> int:
255 return self._overflow if self._pool.maxsize else 0
256
257 def checkedout(self) -> int:
258 return self._pool.maxsize - self._pool.qsize() + self._overflow
259
260
261class AsyncAdaptedQueuePool(QueuePool):
262 """An asyncio-compatible version of :class:`.QueuePool`.
263
264 This pool is used by default when using :class:`.AsyncEngine` engines that
265 were generated from :func:`_asyncio.create_async_engine`. It uses an
266 asyncio-compatible queue implementation that does not use
267 ``threading.Lock``.
268
269 The arguments and operation of :class:`.AsyncAdaptedQueuePool` are
270 otherwise identical to that of :class:`.QueuePool`.
271
272 """
273
274 _is_asyncio = True # type: ignore[assignment]
275 _queue_class: Type[sqla_queue.QueueCommon[ConnectionPoolEntry]] = (
276 sqla_queue.AsyncAdaptedQueue
277 )
278
279 _dialect = _AsyncConnDialect()
280
281
282class FallbackAsyncAdaptedQueuePool(AsyncAdaptedQueuePool):
283 _queue_class = sqla_queue.FallbackAsyncAdaptedQueue
284
285
286class NullPool(Pool):
287 """A Pool which does not pool connections.
288
289 Instead it literally opens and closes the underlying DB-API connection
290 per each connection open/close.
291
292 Reconnect-related functions such as ``recycle`` and connection
293 invalidation are not supported by this Pool implementation, since
294 no connections are held persistently.
295
296 The :class:`.NullPool` class **is compatible** with asyncio and
297 :func:`_asyncio.create_async_engine`.
298
299 """
300
301 def status(self) -> str:
302 return "NullPool"
303
304 def _do_return_conn(self, record: ConnectionPoolEntry) -> None:
305 record.close()
306
307 def _do_get(self) -> ConnectionPoolEntry:
308 return self._create_connection()
309
310 def recreate(self) -> NullPool:
311 self.logger.info("Pool recreating")
312
313 return self.__class__(
314 self._creator,
315 recycle=self._recycle,
316 echo=self.echo,
317 logging_name=self._orig_logging_name,
318 reset_on_return=self._reset_on_return,
319 pre_ping=self._pre_ping,
320 _dispatch=self.dispatch,
321 dialect=self._dialect,
322 )
323
324 def dispose(self) -> None:
325 pass
326
327
328class SingletonThreadPool(Pool):
329 """A Pool that maintains one connection per thread.
330
331 Maintains one connection per each thread, never moving a connection to a
332 thread other than the one which it was created in.
333
334 .. warning:: the :class:`.SingletonThreadPool` will call ``.close()``
335 on arbitrary connections that exist beyond the size setting of
336 ``pool_size``, e.g. if more unique **thread identities**
337 than what ``pool_size`` states are used. This cleanup is
338 non-deterministic and not sensitive to whether or not the connections
339 linked to those thread identities are currently in use.
340
341 :class:`.SingletonThreadPool` may be improved in a future release,
342 however in its current status it is generally used only for test
343 scenarios using a SQLite ``:memory:`` database and is not recommended
344 for production use.
345
346 The :class:`.SingletonThreadPool` class **is not compatible** with asyncio
347 and :func:`_asyncio.create_async_engine`.
348
349
350 Options are the same as those of :class:`_pool.Pool`, as well as:
351
352 :param pool_size: The number of threads in which to maintain connections
353 at once. Defaults to five.
354
355 :class:`.SingletonThreadPool` is used by the SQLite dialect
356 automatically when a memory-based database is used.
357 See :ref:`sqlite_toplevel`.
358
359 """
360
361 _is_asyncio = False # type: ignore[assignment]
362
363 def __init__(
364 self,
365 creator: Union[_CreatorFnType, _CreatorWRecFnType],
366 pool_size: int = 5,
367 **kw: Any,
368 ):
369 Pool.__init__(self, creator, **kw)
370 self._conn = threading.local()
371 self._fairy = threading.local()
372 self._all_conns: Set[ConnectionPoolEntry] = set()
373 self.size = pool_size
374
375 def recreate(self) -> SingletonThreadPool:
376 self.logger.info("Pool recreating")
377 return self.__class__(
378 self._creator,
379 pool_size=self.size,
380 recycle=self._recycle,
381 echo=self.echo,
382 pre_ping=self._pre_ping,
383 logging_name=self._orig_logging_name,
384 reset_on_return=self._reset_on_return,
385 _dispatch=self.dispatch,
386 dialect=self._dialect,
387 )
388
389 def dispose(self) -> None:
390 """Dispose of this pool."""
391
392 for conn in self._all_conns:
393 try:
394 conn.close()
395 except Exception:
396 # pysqlite won't even let you close a conn from a thread
397 # that didn't create it
398 pass
399
400 self._all_conns.clear()
401
402 def _cleanup(self) -> None:
403 while len(self._all_conns) >= self.size:
404 c = self._all_conns.pop()
405 c.close()
406
407 def status(self) -> str:
408 return "SingletonThreadPool id:%d size: %d" % (
409 id(self),
410 len(self._all_conns),
411 )
412
413 def _do_return_conn(self, record: ConnectionPoolEntry) -> None:
414 try:
415 del self._fairy.current
416 except AttributeError:
417 pass
418
419 def _do_get(self) -> ConnectionPoolEntry:
420 try:
421 if TYPE_CHECKING:
422 c = cast(ConnectionPoolEntry, self._conn.current())
423 else:
424 c = self._conn.current()
425 if c:
426 return c
427 except AttributeError:
428 pass
429 c = self._create_connection()
430 self._conn.current = weakref.ref(c)
431 if len(self._all_conns) >= self.size:
432 self._cleanup()
433 self._all_conns.add(c)
434 return c
435
436 def connect(self) -> PoolProxiedConnection:
437 # vendored from Pool to include the now removed use_threadlocal
438 # behavior
439 try:
440 rec = cast(_ConnectionFairy, self._fairy.current())
441 except AttributeError:
442 pass
443 else:
444 if rec is not None:
445 return rec._checkout_existing()
446
447 return _ConnectionFairy._checkout(self, self._fairy)
448
449
450class StaticPool(Pool):
451 """A Pool of exactly one connection, used for all requests.
452
453 Reconnect-related functions such as ``recycle`` and connection
454 invalidation (which is also used to support auto-reconnect) are only
455 partially supported right now and may not yield good results.
456
457 The :class:`.StaticPool` class **is compatible** with asyncio and
458 :func:`_asyncio.create_async_engine`.
459
460 """
461
462 @util.memoized_property
463 def connection(self) -> _ConnectionRecord:
464 return _ConnectionRecord(self)
465
466 def status(self) -> str:
467 return "StaticPool"
468
469 def dispose(self) -> None:
470 if (
471 "connection" in self.__dict__
472 and self.connection.dbapi_connection is not None
473 ):
474 self.connection.close()
475 del self.__dict__["connection"]
476
477 def recreate(self) -> StaticPool:
478 self.logger.info("Pool recreating")
479 return self.__class__(
480 creator=self._creator,
481 recycle=self._recycle,
482 reset_on_return=self._reset_on_return,
483 pre_ping=self._pre_ping,
484 echo=self.echo,
485 logging_name=self._orig_logging_name,
486 _dispatch=self.dispatch,
487 dialect=self._dialect,
488 )
489
490 def _transfer_from(self, other_static_pool: StaticPool) -> None:
491 # used by the test suite to make a new engine / pool without
492 # losing the state of an existing SQLite :memory: connection
493 def creator(rec: ConnectionPoolEntry) -> DBAPIConnection:
494 conn = other_static_pool.connection.dbapi_connection
495 assert conn is not None
496 return conn
497
498 self._invoke_creator = creator
499
500 def _create_connection(self) -> ConnectionPoolEntry:
501 raise NotImplementedError()
502
503 def _do_return_conn(self, record: ConnectionPoolEntry) -> None:
504 pass
505
506 def _do_get(self) -> ConnectionPoolEntry:
507 rec = self.connection
508 if rec._is_hard_or_soft_invalidated():
509 del self.__dict__["connection"]
510 rec = self.connection
511
512 return rec
513
514
515class AssertionPool(Pool):
516 """A :class:`_pool.Pool` that allows at most one checked out connection at
517 any given time.
518
519 This will raise an exception if more than one connection is checked out
520 at a time. Useful for debugging code that is using more connections
521 than desired.
522
523 The :class:`.AssertionPool` class **is compatible** with asyncio and
524 :func:`_asyncio.create_async_engine`.
525
526 """
527
528 _conn: Optional[ConnectionPoolEntry]
529 _checkout_traceback: Optional[List[str]]
530
531 def __init__(self, *args: Any, **kw: Any):
532 self._conn = None
533 self._checked_out = False
534 self._store_traceback = kw.pop("store_traceback", True)
535 self._checkout_traceback = None
536 Pool.__init__(self, *args, **kw)
537
538 def status(self) -> str:
539 return "AssertionPool"
540
541 def _do_return_conn(self, record: ConnectionPoolEntry) -> None:
542 if not self._checked_out:
543 raise AssertionError("connection is not checked out")
544 self._checked_out = False
545 assert record is self._conn
546
547 def dispose(self) -> None:
548 self._checked_out = False
549 if self._conn:
550 self._conn.close()
551
552 def recreate(self) -> AssertionPool:
553 self.logger.info("Pool recreating")
554 return self.__class__(
555 self._creator,
556 echo=self.echo,
557 pre_ping=self._pre_ping,
558 recycle=self._recycle,
559 reset_on_return=self._reset_on_return,
560 logging_name=self._orig_logging_name,
561 _dispatch=self.dispatch,
562 dialect=self._dialect,
563 )
564
565 def _do_get(self) -> ConnectionPoolEntry:
566 if self._checked_out:
567 if self._checkout_traceback:
568 suffix = " at:\n%s" % "".join(
569 chop_traceback(self._checkout_traceback)
570 )
571 else:
572 suffix = ""
573 raise AssertionError("connection is already checked out" + suffix)
574
575 if not self._conn:
576 self._conn = self._create_connection()
577
578 self._checked_out = True
579 if self._store_traceback:
580 self._checkout_traceback = traceback.format_stack()
581 return self._conn