1# pool/impl.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8
9"""Pool implementation classes."""
10from __future__ import annotations
11
12import threading
13import traceback
14import typing
15from typing import Any
16from typing import cast
17from typing import List
18from typing import Optional
19from typing import Set
20from typing import Type
21from typing import TYPE_CHECKING
22from typing import Union
23import weakref
24
25from .base import _AsyncConnDialect
26from .base import _ConnectionFairy
27from .base import _ConnectionRecord
28from .base import _CreatorFnType
29from .base import _CreatorWRecFnType
30from .base import ConnectionPoolEntry
31from .base import Pool
32from .base import PoolProxiedConnection
33from .. import exc
34from .. import util
35from ..util import chop_traceback
36from ..util import queue as sqla_queue
37from ..util.typing import Literal
38
39if typing.TYPE_CHECKING:
40 from ..engine.interfaces import DBAPIConnection
41
42
43class QueuePool(Pool):
44 """A :class:`_pool.Pool`
45 that imposes a limit on the number of open connections.
46
47 :class:`.QueuePool` is the default pooling implementation used for
48 all :class:`_engine.Engine` objects other than SQLite with a ``:memory:``
49 database.
50
51 The :class:`.QueuePool` class **is not compatible** with asyncio and
52 :func:`_asyncio.create_async_engine`. The
53 :class:`.AsyncAdaptedQueuePool` class is used automatically when
54 using :func:`_asyncio.create_async_engine`, if no other kind of pool
55 is specified.
56
57 .. seealso::
58
59 :class:`.AsyncAdaptedQueuePool`
60
61 """
62
63 _is_asyncio = False
64
65 _queue_class: Type[sqla_queue.QueueCommon[ConnectionPoolEntry]] = (
66 sqla_queue.Queue
67 )
68
69 _pool: sqla_queue.QueueCommon[ConnectionPoolEntry]
70
71 def __init__(
72 self,
73 creator: Union[_CreatorFnType, _CreatorWRecFnType],
74 pool_size: int = 5,
75 max_overflow: int = 10,
76 timeout: float = 30.0,
77 use_lifo: bool = False,
78 **kw: Any,
79 ):
80 r"""
81 Construct a QueuePool.
82
83 :param creator: a callable function that returns a DB-API
84 connection object, same as that of :paramref:`_pool.Pool.creator`.
85
86 :param pool_size: The size of the pool to be maintained,
87 defaults to 5. This is the largest number of connections that
88 will be kept persistently in the pool. Note that the pool
89 begins with no connections; once this number of connections
90 is requested, that number of connections will remain.
91 ``pool_size`` can be set to 0 to indicate no size limit; to
92 disable pooling, use a :class:`~sqlalchemy.pool.NullPool`
93 instead.
94
95 :param max_overflow: The maximum overflow size of the
96 pool. When the number of checked-out connections reaches the
97 size set in pool_size, additional connections will be
98 returned up to this limit. When those additional connections
99 are returned to the pool, they are disconnected and
100 discarded. It follows then that the total number of
101 simultaneous connections the pool will allow is pool_size +
102 `max_overflow`, and the total number of "sleeping"
103 connections the pool will allow is pool_size. `max_overflow`
104 can be set to -1 to indicate no overflow limit; no limit
105 will be placed on the total number of concurrent
106 connections. Defaults to 10.
107
108 :param timeout: The number of seconds to wait before giving up
109 on returning a connection. Defaults to 30.0. This can be a float
110 but is subject to the limitations of Python time functions which
111 may not be reliable in the tens of milliseconds.
112
113 :param use_lifo: use LIFO (last-in-first-out) when retrieving
114 connections instead of FIFO (first-in-first-out). Using LIFO, a
115 server-side timeout scheme can reduce the number of connections used
116 during non-peak periods of use. When planning for server-side
117 timeouts, ensure that a recycle or pre-ping strategy is in use to
118 gracefully handle stale connections.
119
120 .. seealso::
121
122 :ref:`pool_use_lifo`
123
124 :ref:`pool_disconnects`
125
126 :param \**kw: Other keyword arguments including
127 :paramref:`_pool.Pool.recycle`, :paramref:`_pool.Pool.echo`,
128 :paramref:`_pool.Pool.reset_on_return` and others are passed to the
129 :class:`_pool.Pool` constructor.
130
131 """
132
133 Pool.__init__(self, creator, **kw)
134 self._pool = self._queue_class(pool_size, use_lifo=use_lifo)
135 self._overflow = 0 - pool_size
136 self._max_overflow = -1 if pool_size == 0 else max_overflow
137 self._timeout = timeout
138 self._overflow_lock = threading.Lock()
139
140 def _do_return_conn(self, record: ConnectionPoolEntry) -> None:
141 try:
142 self._pool.put(record, False)
143 except sqla_queue.Full:
144 try:
145 record.close()
146 finally:
147 self._dec_overflow()
148
149 def _do_get(self) -> ConnectionPoolEntry:
150 use_overflow = self._max_overflow > -1
151
152 wait = use_overflow and self._overflow >= self._max_overflow
153 try:
154 return self._pool.get(wait, self._timeout)
155 except sqla_queue.Empty:
156 # don't do things inside of "except Empty", because when we say
157 # we timed out or can't connect and raise, Python 3 tells
158 # people the real error is queue.Empty which it isn't.
159 pass
160 if use_overflow and self._overflow >= self._max_overflow:
161 if not wait:
162 return self._do_get()
163 else:
164 raise exc.TimeoutError(
165 "QueuePool limit of size %d overflow %d reached, "
166 "connection timed out, timeout %0.2f"
167 % (self.size(), self.overflow(), self._timeout),
168 code="3o7r",
169 )
170
171 if self._inc_overflow():
172 try:
173 return self._create_connection()
174 except:
175 with util.safe_reraise():
176 self._dec_overflow()
177 raise
178 else:
179 return self._do_get()
180
181 def _inc_overflow(self) -> bool:
182 if self._max_overflow == -1:
183 self._overflow += 1
184 return True
185 with self._overflow_lock:
186 if self._overflow < self._max_overflow:
187 self._overflow += 1
188 return True
189 else:
190 return False
191
192 def _dec_overflow(self) -> Literal[True]:
193 if self._max_overflow == -1:
194 self._overflow -= 1
195 return True
196 with self._overflow_lock:
197 self._overflow -= 1
198 return True
199
200 def recreate(self) -> QueuePool:
201 self.logger.info("Pool recreating")
202 return self.__class__(
203 self._creator,
204 pool_size=self._pool.maxsize,
205 max_overflow=self._max_overflow,
206 pre_ping=self._pre_ping,
207 use_lifo=self._pool.use_lifo,
208 timeout=self._timeout,
209 recycle=self._recycle,
210 echo=self.echo,
211 logging_name=self._orig_logging_name,
212 reset_on_return=self._reset_on_return,
213 _dispatch=self.dispatch,
214 dialect=self._dialect,
215 )
216
217 def dispose(self) -> None:
218 while True:
219 try:
220 conn = self._pool.get(False)
221 conn.close()
222 except sqla_queue.Empty:
223 break
224
225 self._overflow = 0 - self.size()
226 self.logger.info("Pool disposed. %s", self.status())
227
228 def status(self) -> str:
229 return (
230 "Pool size: %d Connections in pool: %d "
231 "Current Overflow: %d Current Checked out "
232 "connections: %d"
233 % (
234 self.size(),
235 self.checkedin(),
236 self.overflow(),
237 self.checkedout(),
238 )
239 )
240
241 def size(self) -> int:
242 return self._pool.maxsize
243
244 def timeout(self) -> float:
245 return self._timeout
246
247 def checkedin(self) -> int:
248 return self._pool.qsize()
249
250 def overflow(self) -> int:
251 return self._overflow if self._pool.maxsize else 0
252
253 def checkedout(self) -> int:
254 return self._pool.maxsize - self._pool.qsize() + self._overflow
255
256
257class AsyncAdaptedQueuePool(QueuePool):
258 """An asyncio-compatible version of :class:`.QueuePool`.
259
260 This pool is used by default when using :class:`.AsyncEngine` engines that
261 were generated from :func:`_asyncio.create_async_engine`. It uses an
262 asyncio-compatible queue implementation that does not use
263 ``threading.Lock``.
264
265 The arguments and operation of :class:`.AsyncAdaptedQueuePool` are
266 otherwise identical to that of :class:`.QueuePool`.
267
268 """
269
270 _is_asyncio = True
271 _queue_class: Type[sqla_queue.QueueCommon[ConnectionPoolEntry]] = (
272 sqla_queue.AsyncAdaptedQueue
273 )
274
275 _dialect = _AsyncConnDialect()
276
277
278class NullPool(Pool):
279 """A Pool which does not pool connections.
280
281 Instead it literally opens and closes the underlying DB-API connection
282 per each connection open/close.
283
284 Reconnect-related functions such as ``recycle`` and connection
285 invalidation are not supported by this Pool implementation, since
286 no connections are held persistently.
287
288 The :class:`.NullPool` class **is compatible** with asyncio and
289 :func:`_asyncio.create_async_engine`.
290
291 """
292
293 def status(self) -> str:
294 return "NullPool"
295
296 def _do_return_conn(self, record: ConnectionPoolEntry) -> None:
297 record.close()
298
299 def _do_get(self) -> ConnectionPoolEntry:
300 return self._create_connection()
301
302 def recreate(self) -> NullPool:
303 self.logger.info("Pool recreating")
304
305 return self.__class__(
306 self._creator,
307 recycle=self._recycle,
308 echo=self.echo,
309 logging_name=self._orig_logging_name,
310 reset_on_return=self._reset_on_return,
311 pre_ping=self._pre_ping,
312 _dispatch=self.dispatch,
313 dialect=self._dialect,
314 )
315
316 def dispose(self) -> None:
317 pass
318
319
320class SingletonThreadPool(Pool):
321 """A Pool that maintains one connection per thread.
322
323 Maintains one connection per each thread, never moving a connection to a
324 thread other than the one which it was created in.
325
326 .. warning:: the :class:`.SingletonThreadPool` will call ``.close()``
327 on arbitrary connections that exist beyond the size setting of
328 ``pool_size``, e.g. if more unique **thread identities**
329 than what ``pool_size`` states are used. This cleanup is
330 non-deterministic and not sensitive to whether or not the connections
331 linked to those thread identities are currently in use.
332
333 :class:`.SingletonThreadPool` may be improved in a future release,
334 however in its current status it is generally used only for test
335 scenarios using a SQLite ``:memory:`` database and is not recommended
336 for production use.
337
338 The :class:`.SingletonThreadPool` class **is not compatible** with asyncio
339 and :func:`_asyncio.create_async_engine`.
340
341
342 Options are the same as those of :class:`_pool.Pool`, as well as:
343
344 :param pool_size: The number of threads in which to maintain connections
345 at once. Defaults to five.
346
347 :class:`.SingletonThreadPool` is used by the SQLite dialect
348 automatically when a memory-based database is used.
349 See :ref:`sqlite_toplevel`.
350
351 """
352
353 _is_asyncio = False
354
355 def __init__(
356 self,
357 creator: Union[_CreatorFnType, _CreatorWRecFnType],
358 pool_size: int = 5,
359 **kw: Any,
360 ):
361 Pool.__init__(self, creator, **kw)
362 self._conn = threading.local()
363 self._fairy = threading.local()
364 self._all_conns: Set[ConnectionPoolEntry] = set()
365 self.size = pool_size
366
367 def recreate(self) -> SingletonThreadPool:
368 self.logger.info("Pool recreating")
369 return self.__class__(
370 self._creator,
371 pool_size=self.size,
372 recycle=self._recycle,
373 echo=self.echo,
374 pre_ping=self._pre_ping,
375 logging_name=self._orig_logging_name,
376 reset_on_return=self._reset_on_return,
377 _dispatch=self.dispatch,
378 dialect=self._dialect,
379 )
380
381 def dispose(self) -> None:
382 """Dispose of this pool."""
383
384 for conn in self._all_conns:
385 try:
386 conn.close()
387 except Exception:
388 # pysqlite won't even let you close a conn from a thread
389 # that didn't create it
390 pass
391
392 self._all_conns.clear()
393
394 def _cleanup(self) -> None:
395 while len(self._all_conns) >= self.size:
396 c = self._all_conns.pop()
397 c.close()
398
399 def status(self) -> str:
400 return "SingletonThreadPool id:%d size: %d" % (
401 id(self),
402 len(self._all_conns),
403 )
404
405 def _do_return_conn(self, record: ConnectionPoolEntry) -> None:
406 try:
407 del self._fairy.current
408 except AttributeError:
409 pass
410
411 def _do_get(self) -> ConnectionPoolEntry:
412 try:
413 if TYPE_CHECKING:
414 c = cast(ConnectionPoolEntry, self._conn.current())
415 else:
416 c = self._conn.current()
417 if c:
418 return c
419 except AttributeError:
420 pass
421 c = self._create_connection()
422 self._conn.current = weakref.ref(c)
423 if len(self._all_conns) >= self.size:
424 self._cleanup()
425 self._all_conns.add(c)
426 return c
427
428 def connect(self) -> PoolProxiedConnection:
429 # vendored from Pool to include the now removed use_threadlocal
430 # behavior
431 try:
432 rec = cast(_ConnectionFairy, self._fairy.current())
433 except AttributeError:
434 pass
435 else:
436 if rec is not None:
437 return rec._checkout_existing()
438
439 return _ConnectionFairy._checkout(self, self._fairy)
440
441
442class StaticPool(Pool):
443 """A Pool of exactly one connection, used for all requests.
444
445 Reconnect-related functions such as ``recycle`` and connection
446 invalidation (which is also used to support auto-reconnect) are only
447 partially supported right now and may not yield good results.
448
449 The :class:`.StaticPool` class **is compatible** with asyncio and
450 :func:`_asyncio.create_async_engine`.
451
452 """
453
454 @util.memoized_property
455 def connection(self) -> _ConnectionRecord:
456 return _ConnectionRecord(self)
457
458 def status(self) -> str:
459 return "StaticPool"
460
461 def dispose(self) -> None:
462 if (
463 "connection" in self.__dict__
464 and self.connection.dbapi_connection is not None
465 ):
466 self.connection.close()
467 del self.__dict__["connection"]
468
469 def recreate(self) -> StaticPool:
470 self.logger.info("Pool recreating")
471 return self.__class__(
472 creator=self._creator,
473 recycle=self._recycle,
474 reset_on_return=self._reset_on_return,
475 pre_ping=self._pre_ping,
476 echo=self.echo,
477 logging_name=self._orig_logging_name,
478 _dispatch=self.dispatch,
479 dialect=self._dialect,
480 )
481
482 def _transfer_from(self, other_static_pool: StaticPool) -> None:
483 # used by the test suite to make a new engine / pool without
484 # losing the state of an existing SQLite :memory: connection
485 def creator(rec: ConnectionPoolEntry) -> DBAPIConnection:
486 conn = other_static_pool.connection.dbapi_connection
487 assert conn is not None
488 return conn
489
490 self._invoke_creator = creator
491
492 def _create_connection(self) -> ConnectionPoolEntry:
493 raise NotImplementedError()
494
495 def _do_return_conn(self, record: ConnectionPoolEntry) -> None:
496 pass
497
498 def _do_get(self) -> ConnectionPoolEntry:
499 rec = self.connection
500 if rec._is_hard_or_soft_invalidated():
501 del self.__dict__["connection"]
502 rec = self.connection
503
504 return rec
505
506
507class AssertionPool(Pool):
508 """A :class:`_pool.Pool` that allows at most one checked out connection at
509 any given time.
510
511 This will raise an exception if more than one connection is checked out
512 at a time. Useful for debugging code that is using more connections
513 than desired.
514
515 The :class:`.AssertionPool` class **is compatible** with asyncio and
516 :func:`_asyncio.create_async_engine`.
517
518 """
519
520 _conn: Optional[ConnectionPoolEntry]
521 _checkout_traceback: Optional[List[str]]
522
523 def __init__(self, *args: Any, **kw: Any):
524 self._conn = None
525 self._checked_out = False
526 self._store_traceback = kw.pop("store_traceback", True)
527 self._checkout_traceback = None
528 Pool.__init__(self, *args, **kw)
529
530 def status(self) -> str:
531 return "AssertionPool"
532
533 def _do_return_conn(self, record: ConnectionPoolEntry) -> None:
534 if not self._checked_out:
535 raise AssertionError("connection is not checked out")
536 self._checked_out = False
537 assert record is self._conn
538
539 def dispose(self) -> None:
540 self._checked_out = False
541 if self._conn:
542 self._conn.close()
543
544 def recreate(self) -> AssertionPool:
545 self.logger.info("Pool recreating")
546 return self.__class__(
547 self._creator,
548 echo=self.echo,
549 pre_ping=self._pre_ping,
550 recycle=self._recycle,
551 reset_on_return=self._reset_on_return,
552 logging_name=self._orig_logging_name,
553 _dispatch=self.dispatch,
554 dialect=self._dialect,
555 )
556
557 def _do_get(self) -> ConnectionPoolEntry:
558 if self._checked_out:
559 if self._checkout_traceback:
560 suffix = " at:\n%s" % "".join(
561 chop_traceback(self._checkout_traceback)
562 )
563 else:
564 suffix = ""
565 raise AssertionError("connection is already checked out" + suffix)
566
567 if not self._conn:
568 self._conn = self._create_connection()
569
570 self._checked_out = True
571 if self._store_traceback:
572 self._checkout_traceback = traceback.format_stack()
573 return self._conn