Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/impl.py: 33%
215 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
1# sqlalchemy/pool.py
2# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
9"""Pool implementation classes.
11"""
13import traceback
14import weakref
16from .base import _AsyncConnDialect
17from .base import _ConnectionFairy
18from .base import _ConnectionRecord
19from .base import Pool
20from .. import exc
21from .. import util
22from ..util import chop_traceback
23from ..util import queue as sqla_queue
24from ..util import threading
27class QueuePool(Pool):
29 """A :class:`_pool.Pool`
30 that imposes a limit on the number of open connections.
32 :class:`.QueuePool` is the default pooling implementation used for
33 all :class:`_engine.Engine` objects, unless the SQLite dialect is in use.
35 """
37 _is_asyncio = False
38 _queue_class = sqla_queue.Queue
40 def __init__(
41 self,
42 creator,
43 pool_size=5,
44 max_overflow=10,
45 timeout=30.0,
46 use_lifo=False,
47 **kw
48 ):
49 r"""
50 Construct a QueuePool.
52 :param creator: a callable function that returns a DB-API
53 connection object, same as that of :paramref:`_pool.Pool.creator`.
55 :param pool_size: The size of the pool to be maintained,
56 defaults to 5. This is the largest number of connections that
57 will be kept persistently in the pool. Note that the pool
58 begins with no connections; once this number of connections
59 is requested, that number of connections will remain.
60 ``pool_size`` can be set to 0 to indicate no size limit; to
61 disable pooling, use a :class:`~sqlalchemy.pool.NullPool`
62 instead.
64 :param max_overflow: The maximum overflow size of the
65 pool. When the number of checked-out connections reaches the
66 size set in pool_size, additional connections will be
67 returned up to this limit. When those additional connections
68 are returned to the pool, they are disconnected and
69 discarded. It follows then that the total number of
70 simultaneous connections the pool will allow is pool_size +
71 `max_overflow`, and the total number of "sleeping"
72 connections the pool will allow is pool_size. `max_overflow`
73 can be set to -1 to indicate no overflow limit; no limit
74 will be placed on the total number of concurrent
75 connections. Defaults to 10.
77 :param timeout: The number of seconds to wait before giving up
78 on returning a connection. Defaults to 30.0. This can be a float
79 but is subject to the limitations of Python time functions which
80 may not be reliable in the tens of milliseconds.
82 :param use_lifo: use LIFO (last-in-first-out) when retrieving
83 connections instead of FIFO (first-in-first-out). Using LIFO, a
84 server-side timeout scheme can reduce the number of connections used
85 during non-peak periods of use. When planning for server-side
86 timeouts, ensure that a recycle or pre-ping strategy is in use to
87 gracefully handle stale connections.
89 .. versionadded:: 1.3
91 .. seealso::
93 :ref:`pool_use_lifo`
95 :ref:`pool_disconnects`
97 :param \**kw: Other keyword arguments including
98 :paramref:`_pool.Pool.recycle`, :paramref:`_pool.Pool.echo`,
99 :paramref:`_pool.Pool.reset_on_return` and others are passed to the
100 :class:`_pool.Pool` constructor.
102 """
103 Pool.__init__(self, creator, **kw)
104 self._pool = self._queue_class(pool_size, use_lifo=use_lifo)
105 self._overflow = 0 - pool_size
106 self._max_overflow = max_overflow
107 self._timeout = timeout
108 self._overflow_lock = threading.Lock()
110 def _do_return_conn(self, conn):
111 try:
112 self._pool.put(conn, False)
113 except sqla_queue.Full:
114 try:
115 conn.close()
116 finally:
117 self._dec_overflow()
119 def _do_get(self):
120 use_overflow = self._max_overflow > -1
122 try:
123 wait = use_overflow and self._overflow >= self._max_overflow
124 return self._pool.get(wait, self._timeout)
125 except sqla_queue.Empty:
126 # don't do things inside of "except Empty", because when we say
127 # we timed out or can't connect and raise, Python 3 tells
128 # people the real error is queue.Empty which it isn't.
129 pass
130 if use_overflow and self._overflow >= self._max_overflow:
131 if not wait:
132 return self._do_get()
133 else:
134 raise exc.TimeoutError(
135 "QueuePool limit of size %d overflow %d reached, "
136 "connection timed out, timeout %0.2f"
137 % (self.size(), self.overflow(), self._timeout),
138 code="3o7r",
139 )
141 if self._inc_overflow():
142 try:
143 return self._create_connection()
144 except:
145 with util.safe_reraise():
146 self._dec_overflow()
147 else:
148 return self._do_get()
150 def _inc_overflow(self):
151 if self._max_overflow == -1:
152 self._overflow += 1
153 return True
154 with self._overflow_lock:
155 if self._overflow < self._max_overflow:
156 self._overflow += 1
157 return True
158 else:
159 return False
161 def _dec_overflow(self):
162 if self._max_overflow == -1:
163 self._overflow -= 1
164 return True
165 with self._overflow_lock:
166 self._overflow -= 1
167 return True
169 def recreate(self):
170 self.logger.info("Pool recreating")
171 return self.__class__(
172 self._creator,
173 pool_size=self._pool.maxsize,
174 max_overflow=self._max_overflow,
175 pre_ping=self._pre_ping,
176 use_lifo=self._pool.use_lifo,
177 timeout=self._timeout,
178 recycle=self._recycle,
179 echo=self.echo,
180 logging_name=self._orig_logging_name,
181 reset_on_return=self._reset_on_return,
182 _dispatch=self.dispatch,
183 dialect=self._dialect,
184 )
186 def dispose(self):
187 while True:
188 try:
189 conn = self._pool.get(False)
190 conn.close()
191 except sqla_queue.Empty:
192 break
194 self._overflow = 0 - self.size()
195 self.logger.info("Pool disposed. %s", self.status())
197 def status(self):
198 return (
199 "Pool size: %d Connections in pool: %d "
200 "Current Overflow: %d Current Checked out "
201 "connections: %d"
202 % (
203 self.size(),
204 self.checkedin(),
205 self.overflow(),
206 self.checkedout(),
207 )
208 )
210 def size(self):
211 return self._pool.maxsize
213 def timeout(self):
214 return self._timeout
216 def checkedin(self):
217 return self._pool.qsize()
219 def overflow(self):
220 return self._overflow
222 def checkedout(self):
223 return self._pool.maxsize - self._pool.qsize() + self._overflow
226class AsyncAdaptedQueuePool(QueuePool):
227 _is_asyncio = True
228 _queue_class = sqla_queue.AsyncAdaptedQueue
229 _dialect = _AsyncConnDialect()
232class FallbackAsyncAdaptedQueuePool(AsyncAdaptedQueuePool):
233 _queue_class = sqla_queue.FallbackAsyncAdaptedQueue
236class NullPool(Pool):
238 """A Pool which does not pool connections.
240 Instead it literally opens and closes the underlying DB-API connection
241 per each connection open/close.
243 Reconnect-related functions such as ``recycle`` and connection
244 invalidation are not supported by this Pool implementation, since
245 no connections are held persistently.
247 """
249 def status(self):
250 return "NullPool"
252 def _do_return_conn(self, conn):
253 conn.close()
255 def _do_get(self):
256 return self._create_connection()
258 def recreate(self):
259 self.logger.info("Pool recreating")
261 return self.__class__(
262 self._creator,
263 recycle=self._recycle,
264 echo=self.echo,
265 logging_name=self._orig_logging_name,
266 reset_on_return=self._reset_on_return,
267 pre_ping=self._pre_ping,
268 _dispatch=self.dispatch,
269 dialect=self._dialect,
270 )
272 def dispose(self):
273 pass
276class SingletonThreadPool(Pool):
278 """A Pool that maintains one connection per thread.
280 Maintains one connection per each thread, never moving a connection to a
281 thread other than the one which it was created in.
283 .. warning:: the :class:`.SingletonThreadPool` will call ``.close()``
284 on arbitrary connections that exist beyond the size setting of
285 ``pool_size``, e.g. if more unique **thread identities**
286 than what ``pool_size`` states are used. This cleanup is
287 non-deterministic and not sensitive to whether or not the connections
288 linked to those thread identities are currently in use.
290 :class:`.SingletonThreadPool` may be improved in a future release,
291 however in its current status it is generally used only for test
292 scenarios using a SQLite ``:memory:`` database and is not recommended
293 for production use.
296 Options are the same as those of :class:`_pool.Pool`, as well as:
298 :param pool_size: The number of threads in which to maintain connections
299 at once. Defaults to five.
301 :class:`.SingletonThreadPool` is used by the SQLite dialect
302 automatically when a memory-based database is used.
303 See :ref:`sqlite_toplevel`.
305 """
307 _is_asyncio = False
309 def __init__(self, creator, pool_size=5, **kw):
310 Pool.__init__(self, creator, **kw)
311 self._conn = threading.local()
312 self._fairy = threading.local()
313 self._all_conns = set()
314 self.size = pool_size
316 def recreate(self):
317 self.logger.info("Pool recreating")
318 return self.__class__(
319 self._creator,
320 pool_size=self.size,
321 recycle=self._recycle,
322 echo=self.echo,
323 pre_ping=self._pre_ping,
324 logging_name=self._orig_logging_name,
325 reset_on_return=self._reset_on_return,
326 _dispatch=self.dispatch,
327 dialect=self._dialect,
328 )
330 def dispose(self):
331 """Dispose of this pool."""
333 for conn in self._all_conns:
334 try:
335 conn.close()
336 except Exception:
337 # pysqlite won't even let you close a conn from a thread
338 # that didn't create it
339 pass
341 self._all_conns.clear()
343 def _cleanup(self):
344 while len(self._all_conns) >= self.size:
345 c = self._all_conns.pop()
346 c.close()
348 def status(self):
349 return "SingletonThreadPool id:%d size: %d" % (
350 id(self),
351 len(self._all_conns),
352 )
354 def _do_return_conn(self, conn):
355 pass
357 def _do_get(self):
358 try:
359 c = self._conn.current()
360 if c:
361 return c
362 except AttributeError:
363 pass
364 c = self._create_connection()
365 self._conn.current = weakref.ref(c)
366 if len(self._all_conns) >= self.size:
367 self._cleanup()
368 self._all_conns.add(c)
369 return c
371 def connect(self):
372 # vendored from Pool to include the now removed use_threadlocal
373 # behavior
374 try:
375 rec = self._fairy.current()
376 except AttributeError:
377 pass
378 else:
379 if rec is not None:
380 return rec._checkout_existing()
382 return _ConnectionFairy._checkout(self, self._fairy)
384 def _return_conn(self, record):
385 try:
386 del self._fairy.current
387 except AttributeError:
388 pass
389 self._do_return_conn(record)
392class StaticPool(Pool):
394 """A Pool of exactly one connection, used for all requests.
396 Reconnect-related functions such as ``recycle`` and connection
397 invalidation (which is also used to support auto-reconnect) are only
398 partially supported right now and may not yield good results.
401 """
403 @util.memoized_property
404 def connection(self):
405 return _ConnectionRecord(self)
407 def status(self):
408 return "StaticPool"
410 def dispose(self):
411 if (
412 "connection" in self.__dict__
413 and self.connection.dbapi_connection is not None
414 ):
415 self.connection.close()
416 del self.__dict__["connection"]
418 def recreate(self):
419 self.logger.info("Pool recreating")
420 return self.__class__(
421 creator=self._creator,
422 recycle=self._recycle,
423 reset_on_return=self._reset_on_return,
424 pre_ping=self._pre_ping,
425 echo=self.echo,
426 logging_name=self._orig_logging_name,
427 _dispatch=self.dispatch,
428 dialect=self._dialect,
429 )
431 def _transfer_from(self, other_static_pool):
432 # used by the test suite to make a new engine / pool without
433 # losing the state of an existing SQLite :memory: connection
434 self._invoke_creator = (
435 lambda crec: other_static_pool.connection.dbapi_connection
436 )
438 def _create_connection(self):
439 raise NotImplementedError()
441 def _do_return_conn(self, conn):
442 pass
444 def _do_get(self):
445 rec = self.connection
446 if rec._is_hard_or_soft_invalidated():
447 del self.__dict__["connection"]
448 rec = self.connection
450 return rec
453class AssertionPool(Pool):
455 """A :class:`_pool.Pool` that allows at most one checked out connection at
456 any given time.
458 This will raise an exception if more than one connection is checked out
459 at a time. Useful for debugging code that is using more connections
460 than desired.
462 """
464 def __init__(self, *args, **kw):
465 self._conn = None
466 self._checked_out = False
467 self._store_traceback = kw.pop("store_traceback", True)
468 self._checkout_traceback = None
469 Pool.__init__(self, *args, **kw)
471 def status(self):
472 return "AssertionPool"
474 def _do_return_conn(self, conn):
475 if not self._checked_out:
476 raise AssertionError("connection is not checked out")
477 self._checked_out = False
478 assert conn is self._conn
480 def dispose(self):
481 self._checked_out = False
482 if self._conn:
483 self._conn.close()
485 def recreate(self):
486 self.logger.info("Pool recreating")
487 return self.__class__(
488 self._creator,
489 echo=self.echo,
490 pre_ping=self._pre_ping,
491 recycle=self._recycle,
492 reset_on_return=self._reset_on_return,
493 logging_name=self._orig_logging_name,
494 _dispatch=self.dispatch,
495 dialect=self._dialect,
496 )
498 def _do_get(self):
499 if self._checked_out:
500 if self._checkout_traceback:
501 suffix = " at:\n%s" % "".join(
502 chop_traceback(self._checkout_traceback)
503 )
504 else:
505 suffix = ""
506 raise AssertionError("connection is already checked out" + suffix)
508 if not self._conn:
509 self._conn = self._create_connection()
511 self._checked_out = True
512 if self._store_traceback:
513 self._checkout_traceback = traceback.format_stack()
514 return self._conn