1# sqlalchemy/pool.py
2# Copyright (C) 2005-2021 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: http://www.opensource.org/licenses/mit-license.php
7
8
9"""Pool implementation classes.
10
11"""
12
13import traceback
14import weakref
15
16from .base import _ConnectionFairy
17from .base import _ConnectionRecord
18from .base import Pool
19from .. import exc
20from .. import util
21from ..util import chop_traceback
22from ..util import queue as sqla_queue
23from ..util import threading
24
25
26class QueuePool(Pool):
27
28 """A :class:`_pool.Pool`
29 that imposes a limit on the number of open connections.
30
31 :class:`.QueuePool` is the default pooling implementation used for
32 all :class:`_engine.Engine` objects, unless the SQLite dialect is in use.
33
34 """
35
36 def __init__(
37 self,
38 creator,
39 pool_size=5,
40 max_overflow=10,
41 timeout=30,
42 use_lifo=False,
43 **kw
44 ):
45 r"""
46 Construct a QueuePool.
47
48 :param creator: a callable function that returns a DB-API
49 connection object, same as that of :paramref:`_pool.Pool.creator`.
50
51 :param pool_size: The size of the pool to be maintained,
52 defaults to 5. This is the largest number of connections that
53 will be kept persistently in the pool. Note that the pool
54 begins with no connections; once this number of connections
55 is requested, that number of connections will remain.
56 ``pool_size`` can be set to 0 to indicate no size limit; to
57 disable pooling, use a :class:`~sqlalchemy.pool.NullPool`
58 instead.
59
60 :param max_overflow: The maximum overflow size of the
61 pool. When the number of checked-out connections reaches the
62 size set in pool_size, additional connections will be
63 returned up to this limit. When those additional connections
64 are returned to the pool, they are disconnected and
65 discarded. It follows then that the total number of
66 simultaneous connections the pool will allow is pool_size +
67 `max_overflow`, and the total number of "sleeping"
68 connections the pool will allow is pool_size. `max_overflow`
69 can be set to -1 to indicate no overflow limit; no limit
70 will be placed on the total number of concurrent
71 connections. Defaults to 10.
72
73 :param timeout: The number of seconds to wait before giving up
74 on returning a connection. Defaults to 30.
75
76 :param use_lifo: use LIFO (last-in-first-out) when retrieving
77 connections instead of FIFO (first-in-first-out). Using LIFO, a
78 server-side timeout scheme can reduce the number of connections used
79 during non-peak periods of use. When planning for server-side
80 timeouts, ensure that a recycle or pre-ping strategy is in use to
81 gracefully handle stale connections.
82
83 .. versionadded:: 1.3
84
85 .. seealso::
86
87 :ref:`pool_use_lifo`
88
89 :ref:`pool_disconnects`
90
91 :param \**kw: Other keyword arguments including
92 :paramref:`_pool.Pool.recycle`, :paramref:`_pool.Pool.echo`,
93 :paramref:`_pool.Pool.reset_on_return` and others are passed to the
94 :class:`_pool.Pool` constructor.
95
96 """
97 Pool.__init__(self, creator, **kw)
98 self._pool = sqla_queue.Queue(pool_size, use_lifo=use_lifo)
99 self._overflow = 0 - pool_size
100 self._max_overflow = max_overflow
101 self._timeout = timeout
102 self._overflow_lock = threading.Lock()
103
104 def _do_return_conn(self, conn):
105 try:
106 self._pool.put(conn, False)
107 except sqla_queue.Full:
108 try:
109 conn.close()
110 finally:
111 self._dec_overflow()
112
113 def _do_get(self):
114 use_overflow = self._max_overflow > -1
115
116 try:
117 wait = use_overflow and self._overflow >= self._max_overflow
118 return self._pool.get(wait, self._timeout)
119 except sqla_queue.Empty:
120 # don't do things inside of "except Empty", because when we say
121 # we timed out or can't connect and raise, Python 3 tells
122 # people the real error is queue.Empty which it isn't.
123 pass
124 if use_overflow and self._overflow >= self._max_overflow:
125 if not wait:
126 return self._do_get()
127 else:
128 raise exc.TimeoutError(
129 "QueuePool limit of size %d overflow %d reached, "
130 "connection timed out, timeout %d"
131 % (self.size(), self.overflow(), self._timeout),
132 code="3o7r",
133 )
134
135 if self._inc_overflow():
136 try:
137 return self._create_connection()
138 except:
139 with util.safe_reraise():
140 self._dec_overflow()
141 else:
142 return self._do_get()
143
144 def _inc_overflow(self):
145 if self._max_overflow == -1:
146 self._overflow += 1
147 return True
148 with self._overflow_lock:
149 if self._overflow < self._max_overflow:
150 self._overflow += 1
151 return True
152 else:
153 return False
154
155 def _dec_overflow(self):
156 if self._max_overflow == -1:
157 self._overflow -= 1
158 return True
159 with self._overflow_lock:
160 self._overflow -= 1
161 return True
162
163 def recreate(self):
164 self.logger.info("Pool recreating")
165 return self.__class__(
166 self._creator,
167 pool_size=self._pool.maxsize,
168 max_overflow=self._max_overflow,
169 pre_ping=self._pre_ping,
170 use_lifo=self._pool.use_lifo,
171 timeout=self._timeout,
172 recycle=self._recycle,
173 echo=self.echo,
174 logging_name=self._orig_logging_name,
175 use_threadlocal=self._use_threadlocal,
176 reset_on_return=self._reset_on_return,
177 _dispatch=self.dispatch,
178 dialect=self._dialect,
179 )
180
181 def dispose(self):
182 while True:
183 try:
184 conn = self._pool.get(False)
185 conn.close()
186 except sqla_queue.Empty:
187 break
188
189 self._overflow = 0 - self.size()
190 self.logger.info("Pool disposed. %s", self.status())
191
192 def status(self):
193 return (
194 "Pool size: %d Connections in pool: %d "
195 "Current Overflow: %d Current Checked out "
196 "connections: %d"
197 % (
198 self.size(),
199 self.checkedin(),
200 self.overflow(),
201 self.checkedout(),
202 )
203 )
204
205 def size(self):
206 return self._pool.maxsize
207
208 def timeout(self):
209 return self._timeout
210
211 def checkedin(self):
212 return self._pool.qsize()
213
214 def overflow(self):
215 return self._overflow
216
217 def checkedout(self):
218 return self._pool.maxsize - self._pool.qsize() + self._overflow
219
220
221class NullPool(Pool):
222
223 """A Pool which does not pool connections.
224
225 Instead it literally opens and closes the underlying DB-API connection
226 per each connection open/close.
227
228 Reconnect-related functions such as ``recycle`` and connection
229 invalidation are not supported by this Pool implementation, since
230 no connections are held persistently.
231
232 """
233
234 def status(self):
235 return "NullPool"
236
237 def _do_return_conn(self, conn):
238 conn.close()
239
240 def _do_get(self):
241 return self._create_connection()
242
243 def recreate(self):
244 self.logger.info("Pool recreating")
245
246 return self.__class__(
247 self._creator,
248 recycle=self._recycle,
249 echo=self.echo,
250 logging_name=self._orig_logging_name,
251 use_threadlocal=self._use_threadlocal,
252 reset_on_return=self._reset_on_return,
253 pre_ping=self._pre_ping,
254 _dispatch=self.dispatch,
255 dialect=self._dialect,
256 )
257
258 def dispose(self):
259 pass
260
261
262class SingletonThreadPool(Pool):
263
264 """A Pool that maintains one connection per thread.
265
266 Maintains one connection per each thread, never moving a connection to a
267 thread other than the one which it was created in.
268
269 .. warning:: the :class:`.SingletonThreadPool` will call ``.close()``
270 on arbitrary connections that exist beyond the size setting of
271 ``pool_size``, e.g. if more unique **thread identities**
272 than what ``pool_size`` states are used. This cleanup is
273 non-deterministic and not sensitive to whether or not the connections
274 linked to those thread identities are currently in use.
275
276 :class:`.SingletonThreadPool` may be improved in a future release,
277 however in its current status it is generally used only for test
278 scenarios using a SQLite ``:memory:`` database and is not recommended
279 for production use.
280
281
282 Options are the same as those of :class:`_pool.Pool`, as well as:
283
284 :param pool_size: The number of threads in which to maintain connections
285 at once. Defaults to five.
286
287 :class:`.SingletonThreadPool` is used by the SQLite dialect
288 automatically when a memory-based database is used.
289 See :ref:`sqlite_toplevel`.
290
291 """
292
293 def __init__(self, creator, pool_size=5, **kw):
294 Pool.__init__(self, creator, **kw)
295 self._conn = threading.local()
296 self._fairy = threading.local()
297 self._all_conns = set()
298 self.size = pool_size
299
300 def recreate(self):
301 self.logger.info("Pool recreating")
302 return self.__class__(
303 self._creator,
304 pool_size=self.size,
305 recycle=self._recycle,
306 echo=self.echo,
307 pre_ping=self._pre_ping,
308 logging_name=self._orig_logging_name,
309 use_threadlocal=self._use_threadlocal,
310 reset_on_return=self._reset_on_return,
311 _dispatch=self.dispatch,
312 dialect=self._dialect,
313 )
314
315 def dispose(self):
316 """Dispose of this pool."""
317
318 for conn in self._all_conns:
319 try:
320 conn.close()
321 except Exception:
322 # pysqlite won't even let you close a conn from a thread
323 # that didn't create it
324 pass
325
326 self._all_conns.clear()
327
328 def _cleanup(self):
329 while len(self._all_conns) >= self.size:
330 c = self._all_conns.pop()
331 c.close()
332
333 def status(self):
334 return "SingletonThreadPool id:%d size: %d" % (
335 id(self),
336 len(self._all_conns),
337 )
338
339 def _do_return_conn(self, conn):
340 pass
341
342 def _do_get(self):
343 try:
344 c = self._conn.current()
345 if c:
346 return c
347 except AttributeError:
348 pass
349 c = self._create_connection()
350 self._conn.current = weakref.ref(c)
351 if len(self._all_conns) >= self.size:
352 self._cleanup()
353 self._all_conns.add(c)
354 return c
355
356 def connect(self):
357 # vendored from Pool to include use_threadlocal behavior
358 try:
359 rec = self._fairy.current()
360 except AttributeError:
361 pass
362 else:
363 if rec is not None:
364 return rec._checkout_existing()
365
366 return _ConnectionFairy._checkout(self, self._fairy)
367
368 def _return_conn(self, record):
369 try:
370 del self._fairy.current
371 except AttributeError:
372 pass
373 self._do_return_conn(record)
374
375
376class StaticPool(Pool):
377
378 """A Pool of exactly one connection, used for all requests.
379
380 Reconnect-related functions such as ``recycle`` and connection
381 invalidation (which is also used to support auto-reconnect) are not
382 currently supported by this Pool implementation but may be implemented
383 in a future release.
384
385 """
386
387 @util.memoized_property
388 def _conn(self):
389 return self._creator()
390
391 @util.memoized_property
392 def connection(self):
393 return _ConnectionRecord(self)
394
395 def status(self):
396 return "StaticPool"
397
398 def dispose(self):
399 if "_conn" in self.__dict__:
400 self._conn.close()
401 self._conn = None
402
403 def recreate(self):
404 self.logger.info("Pool recreating")
405 return self.__class__(
406 creator=self._creator,
407 recycle=self._recycle,
408 use_threadlocal=self._use_threadlocal,
409 reset_on_return=self._reset_on_return,
410 pre_ping=self._pre_ping,
411 echo=self.echo,
412 logging_name=self._orig_logging_name,
413 _dispatch=self.dispatch,
414 dialect=self._dialect,
415 )
416
417 def _create_connection(self):
418 return self._conn
419
420 def _do_return_conn(self, conn):
421 pass
422
423 def _do_get(self):
424 return self.connection
425
426
427class AssertionPool(Pool):
428
429 """A :class:`_pool.Pool` that allows at most one checked out connection at
430 any given time.
431
432 This will raise an exception if more than one connection is checked out
433 at a time. Useful for debugging code that is using more connections
434 than desired.
435
436 """
437
438 def __init__(self, *args, **kw):
439 self._conn = None
440 self._checked_out = False
441 self._store_traceback = kw.pop("store_traceback", True)
442 self._checkout_traceback = None
443 Pool.__init__(self, *args, **kw)
444
445 def status(self):
446 return "AssertionPool"
447
448 def _do_return_conn(self, conn):
449 if not self._checked_out:
450 raise AssertionError("connection is not checked out")
451 self._checked_out = False
452 assert conn is self._conn
453
454 def dispose(self):
455 self._checked_out = False
456 if self._conn:
457 self._conn.close()
458
459 def recreate(self):
460 self.logger.info("Pool recreating")
461 return self.__class__(
462 self._creator,
463 echo=self.echo,
464 pre_ping=self._pre_ping,
465 recycle=self._recycle,
466 reset_on_return=self._reset_on_return,
467 logging_name=self._orig_logging_name,
468 _dispatch=self.dispatch,
469 dialect=self._dialect,
470 )
471
472 def _do_get(self):
473 if self._checked_out:
474 if self._checkout_traceback:
475 suffix = " at:\n%s" % "".join(
476 chop_traceback(self._checkout_traceback)
477 )
478 else:
479 suffix = ""
480 raise AssertionError("connection is already checked out" + suffix)
481
482 if not self._conn:
483 self._conn = self._create_connection()
484
485 self._checked_out = True
486 if self._store_traceback:
487 self._checkout_traceback = traceback.format_stack()
488 return self._conn