1# pool/impl.py
2# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8
9"""Pool implementation classes.
10
11"""
12from __future__ import annotations
13
14import threading
15import traceback
16import typing
17from typing import Any
18from typing import cast
19from typing import List
20from typing import Optional
21from typing import Set
22from typing import Type
23from typing import TYPE_CHECKING
24from typing import Union
25import weakref
26
27from .base import _AsyncConnDialect
28from .base import _ConnectionFairy
29from .base import _ConnectionRecord
30from .base import _CreatorFnType
31from .base import _CreatorWRecFnType
32from .base import ConnectionPoolEntry
33from .base import Pool
34from .base import PoolProxiedConnection
35from .. import exc
36from .. import util
37from ..util import chop_traceback
38from ..util import queue as sqla_queue
39from ..util.typing import Literal
40
41if typing.TYPE_CHECKING:
42 from ..engine.interfaces import DBAPIConnection
43
44
45class QueuePool(Pool):
46 """A :class:`_pool.Pool`
47 that imposes a limit on the number of open connections.
48
49 :class:`.QueuePool` is the default pooling implementation used for
50 all :class:`_engine.Engine` objects other than SQLite with a ``:memory:``
51 database.
52
53 The :class:`.QueuePool` class **is not compatible** with asyncio and
54 :func:`_asyncio.create_async_engine`. The
55 :class:`.AsyncAdaptedQueuePool` class is used automatically when
56 using :func:`_asyncio.create_async_engine`, if no other kind of pool
57 is specified.
58
59 .. seealso::
60
61 :class:`.AsyncAdaptedQueuePool`
62
63 """
64
65 _is_asyncio = False # type: ignore[assignment]
66
67 _queue_class: Type[sqla_queue.QueueCommon[ConnectionPoolEntry]] = (
68 sqla_queue.Queue
69 )
70
71 _pool: sqla_queue.QueueCommon[ConnectionPoolEntry]
72
73 def __init__(
74 self,
75 creator: Union[_CreatorFnType, _CreatorWRecFnType],
76 pool_size: int = 5,
77 max_overflow: int = 10,
78 timeout: float = 30.0,
79 use_lifo: bool = False,
80 **kw: Any,
81 ):
82 r"""
83 Construct a QueuePool.
84
85 :param creator: a callable function that returns a DB-API
86 connection object, same as that of :paramref:`_pool.Pool.creator`.
87
88 :param pool_size: The size of the pool to be maintained,
89 defaults to 5. This is the largest number of connections that
90 will be kept persistently in the pool. Note that the pool
91 begins with no connections; once this number of connections
92 is requested, that number of connections will remain.
93 ``pool_size`` can be set to 0 to indicate no size limit; to
94 disable pooling, use a :class:`~sqlalchemy.pool.NullPool`
95 instead.
96
97 :param max_overflow: The maximum overflow size of the
98 pool. When the number of checked-out connections reaches the
99 size set in pool_size, additional connections will be
100 returned up to this limit. When those additional connections
101 are returned to the pool, they are disconnected and
102 discarded. It follows then that the total number of
103 simultaneous connections the pool will allow is pool_size +
104 `max_overflow`, and the total number of "sleeping"
105 connections the pool will allow is pool_size. `max_overflow`
106 can be set to -1 to indicate no overflow limit; no limit
107 will be placed on the total number of concurrent
108 connections. Defaults to 10.
109
110 :param timeout: The number of seconds to wait before giving up
111 on returning a connection. Defaults to 30.0. This can be a float
112 but is subject to the limitations of Python time functions which
113 may not be reliable in the tens of milliseconds.
114
115 :param use_lifo: use LIFO (last-in-first-out) when retrieving
116 connections instead of FIFO (first-in-first-out). Using LIFO, a
117 server-side timeout scheme can reduce the number of connections used
118 during non-peak periods of use. When planning for server-side
119 timeouts, ensure that a recycle or pre-ping strategy is in use to
120 gracefully handle stale connections.
121
122 .. versionadded:: 1.3
123
124 .. seealso::
125
126 :ref:`pool_use_lifo`
127
128 :ref:`pool_disconnects`
129
130 :param \**kw: Other keyword arguments including
131 :paramref:`_pool.Pool.recycle`, :paramref:`_pool.Pool.echo`,
132 :paramref:`_pool.Pool.reset_on_return` and others are passed to the
133 :class:`_pool.Pool` constructor.
134
135 """
136
137 Pool.__init__(self, creator, **kw)
138 self._pool = self._queue_class(pool_size, use_lifo=use_lifo)
139 self._overflow = 0 - pool_size
140 self._max_overflow = -1 if pool_size == 0 else max_overflow
141 self._timeout = timeout
142 self._overflow_lock = threading.Lock()
143
144 def _do_return_conn(self, record: ConnectionPoolEntry) -> None:
145 try:
146 self._pool.put(record, False)
147 except sqla_queue.Full:
148 try:
149 record.close()
150 finally:
151 self._dec_overflow()
152
153 def _do_get(self) -> ConnectionPoolEntry:
154 use_overflow = self._max_overflow > -1
155
156 wait = use_overflow and self._overflow >= self._max_overflow
157 try:
158 return self._pool.get(wait, self._timeout)
159 except sqla_queue.Empty:
160 # don't do things inside of "except Empty", because when we say
161 # we timed out or can't connect and raise, Python 3 tells
162 # people the real error is queue.Empty which it isn't.
163 pass
164 if use_overflow and self._overflow >= self._max_overflow:
165 if not wait:
166 return self._do_get()
167 else:
168 raise exc.TimeoutError(
169 "QueuePool limit of size %d overflow %d reached, "
170 "connection timed out, timeout %0.2f"
171 % (self.size(), self.overflow(), self._timeout),
172 code="3o7r",
173 )
174
175 if self._inc_overflow():
176 try:
177 return self._create_connection()
178 except:
179 with util.safe_reraise():
180 self._dec_overflow()
181 raise
182 else:
183 return self._do_get()
184
185 def _inc_overflow(self) -> bool:
186 if self._max_overflow == -1:
187 self._overflow += 1
188 return True
189 with self._overflow_lock:
190 if self._overflow < self._max_overflow:
191 self._overflow += 1
192 return True
193 else:
194 return False
195
196 def _dec_overflow(self) -> Literal[True]:
197 if self._max_overflow == -1:
198 self._overflow -= 1
199 return True
200 with self._overflow_lock:
201 self._overflow -= 1
202 return True
203
204 def recreate(self) -> QueuePool:
205 self.logger.info("Pool recreating")
206 return self.__class__(
207 self._creator,
208 pool_size=self._pool.maxsize,
209 max_overflow=self._max_overflow,
210 pre_ping=self._pre_ping,
211 use_lifo=self._pool.use_lifo,
212 timeout=self._timeout,
213 recycle=self._recycle,
214 echo=self.echo,
215 logging_name=self._orig_logging_name,
216 reset_on_return=self._reset_on_return,
217 _dispatch=self.dispatch,
218 dialect=self._dialect,
219 )
220
221 def dispose(self) -> None:
222 while True:
223 try:
224 conn = self._pool.get(False)
225 conn.close()
226 except sqla_queue.Empty:
227 break
228
229 self._overflow = 0 - self.size()
230 self.logger.info("Pool disposed. %s", self.status())
231
232 def status(self) -> str:
233 return (
234 "Pool size: %d Connections in pool: %d "
235 "Current Overflow: %d Current Checked out "
236 "connections: %d"
237 % (
238 self.size(),
239 self.checkedin(),
240 self.overflow(),
241 self.checkedout(),
242 )
243 )
244
245 def size(self) -> int:
246 return self._pool.maxsize
247
248 def timeout(self) -> float:
249 return self._timeout
250
251 def checkedin(self) -> int:
252 return self._pool.qsize()
253
254 def overflow(self) -> int:
255 return self._overflow if self._pool.maxsize else 0
256
257 def checkedout(self) -> int:
258 return self._pool.maxsize - self._pool.qsize() + self._overflow
259
260
261class AsyncAdaptedQueuePool(QueuePool):
262 """An asyncio-compatible version of :class:`.QueuePool`.
263
264 This pool is used by default when using :class:`.AsyncEngine` engines that
265 were generated from :func:`_asyncio.create_async_engine`. It uses an
266 asyncio-compatible queue implementation that does not use
267 ``threading.Lock``.
268
269 The arguments and operation of :class:`.AsyncAdaptedQueuePool` are
270 otherwise identical to that of :class:`.QueuePool`.
271
272 """
273
274 _is_asyncio = True # type: ignore[assignment]
275 _queue_class: Type[sqla_queue.QueueCommon[ConnectionPoolEntry]] = (
276 sqla_queue.AsyncAdaptedQueue
277 )
278
279 _dialect = _AsyncConnDialect()
280
281
282class NullPool(Pool):
283 """A Pool which does not pool connections.
284
285 Instead it literally opens and closes the underlying DB-API connection
286 per each connection open/close.
287
288 Reconnect-related functions such as ``recycle`` and connection
289 invalidation are not supported by this Pool implementation, since
290 no connections are held persistently.
291
292 The :class:`.NullPool` class **is compatible** with asyncio and
293 :func:`_asyncio.create_async_engine`.
294
295 """
296
297 def status(self) -> str:
298 return "NullPool"
299
300 def _do_return_conn(self, record: ConnectionPoolEntry) -> None:
301 record.close()
302
303 def _do_get(self) -> ConnectionPoolEntry:
304 return self._create_connection()
305
306 def recreate(self) -> NullPool:
307 self.logger.info("Pool recreating")
308
309 return self.__class__(
310 self._creator,
311 recycle=self._recycle,
312 echo=self.echo,
313 logging_name=self._orig_logging_name,
314 reset_on_return=self._reset_on_return,
315 pre_ping=self._pre_ping,
316 _dispatch=self.dispatch,
317 dialect=self._dialect,
318 )
319
320 def dispose(self) -> None:
321 pass
322
323
324class SingletonThreadPool(Pool):
325 """A Pool that maintains one connection per thread.
326
327 Maintains one connection per each thread, never moving a connection to a
328 thread other than the one which it was created in.
329
330 .. warning:: the :class:`.SingletonThreadPool` will call ``.close()``
331 on arbitrary connections that exist beyond the size setting of
332 ``pool_size``, e.g. if more unique **thread identities**
333 than what ``pool_size`` states are used. This cleanup is
334 non-deterministic and not sensitive to whether or not the connections
335 linked to those thread identities are currently in use.
336
337 :class:`.SingletonThreadPool` may be improved in a future release,
338 however in its current status it is generally used only for test
339 scenarios using a SQLite ``:memory:`` database and is not recommended
340 for production use.
341
342 The :class:`.SingletonThreadPool` class **is not compatible** with asyncio
343 and :func:`_asyncio.create_async_engine`.
344
345
346 Options are the same as those of :class:`_pool.Pool`, as well as:
347
348 :param pool_size: The number of threads in which to maintain connections
349 at once. Defaults to five.
350
351 :class:`.SingletonThreadPool` is used by the SQLite dialect
352 automatically when a memory-based database is used.
353 See :ref:`sqlite_toplevel`.
354
355 """
356
357 _is_asyncio = False # type: ignore[assignment]
358
359 def __init__(
360 self,
361 creator: Union[_CreatorFnType, _CreatorWRecFnType],
362 pool_size: int = 5,
363 **kw: Any,
364 ):
365 Pool.__init__(self, creator, **kw)
366 self._conn = threading.local()
367 self._fairy = threading.local()
368 self._all_conns: Set[ConnectionPoolEntry] = set()
369 self.size = pool_size
370
371 def recreate(self) -> SingletonThreadPool:
372 self.logger.info("Pool recreating")
373 return self.__class__(
374 self._creator,
375 pool_size=self.size,
376 recycle=self._recycle,
377 echo=self.echo,
378 pre_ping=self._pre_ping,
379 logging_name=self._orig_logging_name,
380 reset_on_return=self._reset_on_return,
381 _dispatch=self.dispatch,
382 dialect=self._dialect,
383 )
384
385 def dispose(self) -> None:
386 """Dispose of this pool."""
387
388 for conn in self._all_conns:
389 try:
390 conn.close()
391 except Exception:
392 # pysqlite won't even let you close a conn from a thread
393 # that didn't create it
394 pass
395
396 self._all_conns.clear()
397
398 def _cleanup(self) -> None:
399 while len(self._all_conns) >= self.size:
400 c = self._all_conns.pop()
401 c.close()
402
403 def status(self) -> str:
404 return "SingletonThreadPool id:%d size: %d" % (
405 id(self),
406 len(self._all_conns),
407 )
408
409 def _do_return_conn(self, record: ConnectionPoolEntry) -> None:
410 try:
411 del self._fairy.current
412 except AttributeError:
413 pass
414
415 def _do_get(self) -> ConnectionPoolEntry:
416 try:
417 if TYPE_CHECKING:
418 c = cast(ConnectionPoolEntry, self._conn.current())
419 else:
420 c = self._conn.current()
421 if c:
422 return c
423 except AttributeError:
424 pass
425 c = self._create_connection()
426 self._conn.current = weakref.ref(c)
427 if len(self._all_conns) >= self.size:
428 self._cleanup()
429 self._all_conns.add(c)
430 return c
431
432 def connect(self) -> PoolProxiedConnection:
433 # vendored from Pool to include the now removed use_threadlocal
434 # behavior
435 try:
436 rec = cast(_ConnectionFairy, self._fairy.current())
437 except AttributeError:
438 pass
439 else:
440 if rec is not None:
441 return rec._checkout_existing()
442
443 return _ConnectionFairy._checkout(self, self._fairy)
444
445
446class StaticPool(Pool):
447 """A Pool of exactly one connection, used for all requests.
448
449 Reconnect-related functions such as ``recycle`` and connection
450 invalidation (which is also used to support auto-reconnect) are only
451 partially supported right now and may not yield good results.
452
453 The :class:`.StaticPool` class **is compatible** with asyncio and
454 :func:`_asyncio.create_async_engine`.
455
456 """
457
458 @util.memoized_property
459 def connection(self) -> _ConnectionRecord:
460 return _ConnectionRecord(self)
461
462 def status(self) -> str:
463 return "StaticPool"
464
465 def dispose(self) -> None:
466 if (
467 "connection" in self.__dict__
468 and self.connection.dbapi_connection is not None
469 ):
470 self.connection.close()
471 del self.__dict__["connection"]
472
473 def recreate(self) -> StaticPool:
474 self.logger.info("Pool recreating")
475 return self.__class__(
476 creator=self._creator,
477 recycle=self._recycle,
478 reset_on_return=self._reset_on_return,
479 pre_ping=self._pre_ping,
480 echo=self.echo,
481 logging_name=self._orig_logging_name,
482 _dispatch=self.dispatch,
483 dialect=self._dialect,
484 )
485
486 def _transfer_from(self, other_static_pool: StaticPool) -> None:
487 # used by the test suite to make a new engine / pool without
488 # losing the state of an existing SQLite :memory: connection
489 def creator(rec: ConnectionPoolEntry) -> DBAPIConnection:
490 conn = other_static_pool.connection.dbapi_connection
491 assert conn is not None
492 return conn
493
494 self._invoke_creator = creator
495
496 def _create_connection(self) -> ConnectionPoolEntry:
497 raise NotImplementedError()
498
499 def _do_return_conn(self, record: ConnectionPoolEntry) -> None:
500 pass
501
502 def _do_get(self) -> ConnectionPoolEntry:
503 rec = self.connection
504 if rec._is_hard_or_soft_invalidated():
505 del self.__dict__["connection"]
506 rec = self.connection
507
508 return rec
509
510
511class AssertionPool(Pool):
512 """A :class:`_pool.Pool` that allows at most one checked out connection at
513 any given time.
514
515 This will raise an exception if more than one connection is checked out
516 at a time. Useful for debugging code that is using more connections
517 than desired.
518
519 The :class:`.AssertionPool` class **is compatible** with asyncio and
520 :func:`_asyncio.create_async_engine`.
521
522 """
523
524 _conn: Optional[ConnectionPoolEntry]
525 _checkout_traceback: Optional[List[str]]
526
527 def __init__(self, *args: Any, **kw: Any):
528 self._conn = None
529 self._checked_out = False
530 self._store_traceback = kw.pop("store_traceback", True)
531 self._checkout_traceback = None
532 Pool.__init__(self, *args, **kw)
533
534 def status(self) -> str:
535 return "AssertionPool"
536
537 def _do_return_conn(self, record: ConnectionPoolEntry) -> None:
538 if not self._checked_out:
539 raise AssertionError("connection is not checked out")
540 self._checked_out = False
541 assert record is self._conn
542
543 def dispose(self) -> None:
544 self._checked_out = False
545 if self._conn:
546 self._conn.close()
547
548 def recreate(self) -> AssertionPool:
549 self.logger.info("Pool recreating")
550 return self.__class__(
551 self._creator,
552 echo=self.echo,
553 pre_ping=self._pre_ping,
554 recycle=self._recycle,
555 reset_on_return=self._reset_on_return,
556 logging_name=self._orig_logging_name,
557 _dispatch=self.dispatch,
558 dialect=self._dialect,
559 )
560
561 def _do_get(self) -> ConnectionPoolEntry:
562 if self._checked_out:
563 if self._checkout_traceback:
564 suffix = " at:\n%s" % "".join(
565 chop_traceback(self._checkout_traceback)
566 )
567 else:
568 suffix = ""
569 raise AssertionError("connection is already checked out" + suffix)
570
571 if not self._conn:
572 self._conn = self._create_connection()
573
574 self._checked_out = True
575 if self._store_traceback:
576 self._checkout_traceback = traceback.format_stack()
577 return self._conn