1# sqlalchemy/pool.py 
    2# Copyright (C) 2005-2021 the SQLAlchemy authors and contributors 
    3# <see AUTHORS file> 
    4# 
    5# This module is part of SQLAlchemy and is released under 
    6# the MIT License: http://www.opensource.org/licenses/mit-license.php 
    7 
    8 
    9"""Pool implementation classes. 
    10 
    11""" 
    12 
    13import traceback 
    14import weakref 
    15 
    16from .base import _ConnectionFairy 
    17from .base import _ConnectionRecord 
    18from .base import Pool 
    19from .. import exc 
    20from .. import util 
    21from ..util import chop_traceback 
    22from ..util import queue as sqla_queue 
    23from ..util import threading 
    24 
    25 
    26class QueuePool(Pool): 
    27 
    28    """A :class:`_pool.Pool` 
    29    that imposes a limit on the number of open connections. 
    30 
    31    :class:`.QueuePool` is the default pooling implementation used for 
    32    all :class:`_engine.Engine` objects, unless the SQLite dialect is in use. 
    33 
    34    """ 
    35 
    36    def __init__( 
    37        self, 
    38        creator, 
    39        pool_size=5, 
    40        max_overflow=10, 
    41        timeout=30, 
    42        use_lifo=False, 
    43        **kw 
    44    ): 
    45        r""" 
    46        Construct a QueuePool. 
    47 
    48        :param creator: a callable function that returns a DB-API 
    49          connection object, same as that of :paramref:`_pool.Pool.creator`. 
    50 
    51        :param pool_size: The size of the pool to be maintained, 
    52          defaults to 5. This is the largest number of connections that 
    53          will be kept persistently in the pool. Note that the pool 
    54          begins with no connections; once this number of connections 
    55          is requested, that number of connections will remain. 
    56          ``pool_size`` can be set to 0 to indicate no size limit; to 
    57          disable pooling, use a :class:`~sqlalchemy.pool.NullPool` 
    58          instead. 
    59 
    60        :param max_overflow: The maximum overflow size of the 
    61          pool. When the number of checked-out connections reaches the 
    62          size set in pool_size, additional connections will be 
    63          returned up to this limit. When those additional connections 
    64          are returned to the pool, they are disconnected and 
    65          discarded. It follows then that the total number of 
    66          simultaneous connections the pool will allow is pool_size + 
    67          `max_overflow`, and the total number of "sleeping" 
    68          connections the pool will allow is pool_size. `max_overflow` 
    69          can be set to -1 to indicate no overflow limit; no limit 
    70          will be placed on the total number of concurrent 
    71          connections. Defaults to 10. 
    72 
    73        :param timeout: The number of seconds to wait before giving up 
    74          on returning a connection. Defaults to 30. 
    75 
    76        :param use_lifo: use LIFO (last-in-first-out) when retrieving 
    77          connections instead of FIFO (first-in-first-out). Using LIFO, a 
    78          server-side timeout scheme can reduce the number of connections used 
    79          during non-peak periods of use.   When planning for server-side 
    80          timeouts, ensure that a recycle or pre-ping strategy is in use to 
    81          gracefully handle stale connections. 
    82 
    83          .. versionadded:: 1.3 
    84 
    85          .. seealso:: 
    86 
    87            :ref:`pool_use_lifo` 
    88 
    89            :ref:`pool_disconnects` 
    90 
    91        :param \**kw: Other keyword arguments including 
    92          :paramref:`_pool.Pool.recycle`, :paramref:`_pool.Pool.echo`, 
    93          :paramref:`_pool.Pool.reset_on_return` and others are passed to the 
    94          :class:`_pool.Pool` constructor. 
    95 
    96        """ 
    97        Pool.__init__(self, creator, **kw) 
    98        self._pool = sqla_queue.Queue(pool_size, use_lifo=use_lifo) 
    99        self._overflow = 0 - pool_size 
    100        self._max_overflow = max_overflow 
    101        self._timeout = timeout 
    102        self._overflow_lock = threading.Lock() 
    103 
    104    def _do_return_conn(self, conn): 
    105        try: 
    106            self._pool.put(conn, False) 
    107        except sqla_queue.Full: 
    108            try: 
    109                conn.close() 
    110            finally: 
    111                self._dec_overflow() 
    112 
    113    def _do_get(self): 
    114        use_overflow = self._max_overflow > -1 
    115 
    116        try: 
    117            wait = use_overflow and self._overflow >= self._max_overflow 
    118            return self._pool.get(wait, self._timeout) 
    119        except sqla_queue.Empty: 
    120            # don't do things inside of "except Empty", because when we say 
    121            # we timed out or can't connect and raise, Python 3 tells 
    122            # people the real error is queue.Empty which it isn't. 
    123            pass 
    124        if use_overflow and self._overflow >= self._max_overflow: 
    125            if not wait: 
    126                return self._do_get() 
    127            else: 
    128                raise exc.TimeoutError( 
    129                    "QueuePool limit of size %d overflow %d reached, " 
    130                    "connection timed out, timeout %d" 
    131                    % (self.size(), self.overflow(), self._timeout), 
    132                    code="3o7r", 
    133                ) 
    134 
    135        if self._inc_overflow(): 
    136            try: 
    137                return self._create_connection() 
    138            except: 
    139                with util.safe_reraise(): 
    140                    self._dec_overflow() 
    141        else: 
    142            return self._do_get() 
    143 
    144    def _inc_overflow(self): 
    145        if self._max_overflow == -1: 
    146            self._overflow += 1 
    147            return True 
    148        with self._overflow_lock: 
    149            if self._overflow < self._max_overflow: 
    150                self._overflow += 1 
    151                return True 
    152            else: 
    153                return False 
    154 
    155    def _dec_overflow(self): 
    156        if self._max_overflow == -1: 
    157            self._overflow -= 1 
    158            return True 
    159        with self._overflow_lock: 
    160            self._overflow -= 1 
    161            return True 
    162 
    163    def recreate(self): 
    164        self.logger.info("Pool recreating") 
    165        return self.__class__( 
    166            self._creator, 
    167            pool_size=self._pool.maxsize, 
    168            max_overflow=self._max_overflow, 
    169            pre_ping=self._pre_ping, 
    170            use_lifo=self._pool.use_lifo, 
    171            timeout=self._timeout, 
    172            recycle=self._recycle, 
    173            echo=self.echo, 
    174            logging_name=self._orig_logging_name, 
    175            use_threadlocal=self._use_threadlocal, 
    176            reset_on_return=self._reset_on_return, 
    177            _dispatch=self.dispatch, 
    178            dialect=self._dialect, 
    179        ) 
    180 
    181    def dispose(self): 
    182        while True: 
    183            try: 
    184                conn = self._pool.get(False) 
    185                conn.close() 
    186            except sqla_queue.Empty: 
    187                break 
    188 
    189        self._overflow = 0 - self.size() 
    190        self.logger.info("Pool disposed. %s", self.status()) 
    191 
    192    def status(self): 
    193        return ( 
    194            "Pool size: %d  Connections in pool: %d " 
    195            "Current Overflow: %d Current Checked out " 
    196            "connections: %d" 
    197            % ( 
    198                self.size(), 
    199                self.checkedin(), 
    200                self.overflow(), 
    201                self.checkedout(), 
    202            ) 
    203        ) 
    204 
    205    def size(self): 
    206        return self._pool.maxsize 
    207 
    208    def timeout(self): 
    209        return self._timeout 
    210 
    211    def checkedin(self): 
    212        return self._pool.qsize() 
    213 
    214    def overflow(self): 
    215        return self._overflow 
    216 
    217    def checkedout(self): 
    218        return self._pool.maxsize - self._pool.qsize() + self._overflow 
    219 
    220 
    221class NullPool(Pool): 
    222 
    223    """A Pool which does not pool connections. 
    224 
    225    Instead it literally opens and closes the underlying DB-API connection 
    226    per each connection open/close. 
    227 
    228    Reconnect-related functions such as ``recycle`` and connection 
    229    invalidation are not supported by this Pool implementation, since 
    230    no connections are held persistently. 
    231 
    232    """ 
    233 
    234    def status(self): 
    235        return "NullPool" 
    236 
    237    def _do_return_conn(self, conn): 
    238        conn.close() 
    239 
    240    def _do_get(self): 
    241        return self._create_connection() 
    242 
    243    def recreate(self): 
    244        self.logger.info("Pool recreating") 
    245 
    246        return self.__class__( 
    247            self._creator, 
    248            recycle=self._recycle, 
    249            echo=self.echo, 
    250            logging_name=self._orig_logging_name, 
    251            use_threadlocal=self._use_threadlocal, 
    252            reset_on_return=self._reset_on_return, 
    253            pre_ping=self._pre_ping, 
    254            _dispatch=self.dispatch, 
    255            dialect=self._dialect, 
    256        ) 
    257 
    258    def dispose(self): 
    259        pass 
    260 
    261 
    262class SingletonThreadPool(Pool): 
    263 
    264    """A Pool that maintains one connection per thread. 
    265 
    266    Maintains one connection per each thread, never moving a connection to a 
    267    thread other than the one which it was created in. 
    268 
    269    .. warning::  the :class:`.SingletonThreadPool` will call ``.close()`` 
    270       on arbitrary connections that exist beyond the size setting of 
    271       ``pool_size``, e.g. if more unique **thread identities** 
    272       than what ``pool_size`` states are used.   This cleanup is 
    273       non-deterministic and not sensitive to whether or not the connections 
    274       linked to those thread identities are currently in use. 
    275 
    276       :class:`.SingletonThreadPool` may be improved in a future release, 
    277       however in its current status it is generally used only for test 
    278       scenarios using a SQLite ``:memory:`` database and is not recommended 
    279       for production use. 
    280 
    281 
    282    Options are the same as those of :class:`_pool.Pool`, as well as: 
    283 
    284    :param pool_size: The number of threads in which to maintain connections 
    285        at once.  Defaults to five. 
    286 
    287    :class:`.SingletonThreadPool` is used by the SQLite dialect 
    288    automatically when a memory-based database is used. 
    289    See :ref:`sqlite_toplevel`. 
    290 
    291    """ 
    292 
    293    def __init__(self, creator, pool_size=5, **kw): 
    294        Pool.__init__(self, creator, **kw) 
    295        self._conn = threading.local() 
    296        self._fairy = threading.local() 
    297        self._all_conns = set() 
    298        self.size = pool_size 
    299 
    300    def recreate(self): 
    301        self.logger.info("Pool recreating") 
    302        return self.__class__( 
    303            self._creator, 
    304            pool_size=self.size, 
    305            recycle=self._recycle, 
    306            echo=self.echo, 
    307            pre_ping=self._pre_ping, 
    308            logging_name=self._orig_logging_name, 
    309            use_threadlocal=self._use_threadlocal, 
    310            reset_on_return=self._reset_on_return, 
    311            _dispatch=self.dispatch, 
    312            dialect=self._dialect, 
    313        ) 
    314 
    315    def dispose(self): 
    316        """Dispose of this pool.""" 
    317 
    318        for conn in self._all_conns: 
    319            try: 
    320                conn.close() 
    321            except Exception: 
    322                # pysqlite won't even let you close a conn from a thread 
    323                # that didn't create it 
    324                pass 
    325 
    326        self._all_conns.clear() 
    327 
    328    def _cleanup(self): 
    329        while len(self._all_conns) >= self.size: 
    330            c = self._all_conns.pop() 
    331            c.close() 
    332 
    333    def status(self): 
    334        return "SingletonThreadPool id:%d size: %d" % ( 
    335            id(self), 
    336            len(self._all_conns), 
    337        ) 
    338 
    339    def _do_return_conn(self, conn): 
    340        pass 
    341 
    342    def _do_get(self): 
    343        try: 
    344            c = self._conn.current() 
    345            if c: 
    346                return c 
    347        except AttributeError: 
    348            pass 
    349        c = self._create_connection() 
    350        self._conn.current = weakref.ref(c) 
    351        if len(self._all_conns) >= self.size: 
    352            self._cleanup() 
    353        self._all_conns.add(c) 
    354        return c 
    355 
    356    def connect(self): 
    357        # vendored from Pool to include use_threadlocal behavior 
    358        try: 
    359            rec = self._fairy.current() 
    360        except AttributeError: 
    361            pass 
    362        else: 
    363            if rec is not None: 
    364                return rec._checkout_existing() 
    365 
    366        return _ConnectionFairy._checkout(self, self._fairy) 
    367 
    368    def _return_conn(self, record): 
    369        try: 
    370            del self._fairy.current 
    371        except AttributeError: 
    372            pass 
    373        self._do_return_conn(record) 
    374 
    375 
    376class StaticPool(Pool): 
    377 
    378    """A Pool of exactly one connection, used for all requests. 
    379 
    380    Reconnect-related functions such as ``recycle`` and connection 
    381    invalidation (which is also used to support auto-reconnect) are not 
    382    currently supported by this Pool implementation but may be implemented 
    383    in a future release. 
    384 
    385    """ 
    386 
    387    @util.memoized_property 
    388    def _conn(self): 
    389        return self._creator() 
    390 
    391    @util.memoized_property 
    392    def connection(self): 
    393        return _ConnectionRecord(self) 
    394 
    395    def status(self): 
    396        return "StaticPool" 
    397 
    398    def dispose(self): 
    399        if "_conn" in self.__dict__: 
    400            self._conn.close() 
    401            self._conn = None 
    402 
    403    def recreate(self): 
    404        self.logger.info("Pool recreating") 
    405        return self.__class__( 
    406            creator=self._creator, 
    407            recycle=self._recycle, 
    408            use_threadlocal=self._use_threadlocal, 
    409            reset_on_return=self._reset_on_return, 
    410            pre_ping=self._pre_ping, 
    411            echo=self.echo, 
    412            logging_name=self._orig_logging_name, 
    413            _dispatch=self.dispatch, 
    414            dialect=self._dialect, 
    415        ) 
    416 
    417    def _create_connection(self): 
    418        return self._conn 
    419 
    420    def _do_return_conn(self, conn): 
    421        pass 
    422 
    423    def _do_get(self): 
    424        return self.connection 
    425 
    426 
    427class AssertionPool(Pool): 
    428 
    429    """A :class:`_pool.Pool` that allows at most one checked out connection at 
    430    any given time. 
    431 
    432    This will raise an exception if more than one connection is checked out 
    433    at a time.  Useful for debugging code that is using more connections 
    434    than desired. 
    435 
    436    """ 
    437 
    438    def __init__(self, *args, **kw): 
    439        self._conn = None 
    440        self._checked_out = False 
    441        self._store_traceback = kw.pop("store_traceback", True) 
    442        self._checkout_traceback = None 
    443        Pool.__init__(self, *args, **kw) 
    444 
    445    def status(self): 
    446        return "AssertionPool" 
    447 
    448    def _do_return_conn(self, conn): 
    449        if not self._checked_out: 
    450            raise AssertionError("connection is not checked out") 
    451        self._checked_out = False 
    452        assert conn is self._conn 
    453 
    454    def dispose(self): 
    455        self._checked_out = False 
    456        if self._conn: 
    457            self._conn.close() 
    458 
    459    def recreate(self): 
    460        self.logger.info("Pool recreating") 
    461        return self.__class__( 
    462            self._creator, 
    463            echo=self.echo, 
    464            pre_ping=self._pre_ping, 
    465            recycle=self._recycle, 
    466            reset_on_return=self._reset_on_return, 
    467            logging_name=self._orig_logging_name, 
    468            _dispatch=self.dispatch, 
    469            dialect=self._dialect, 
    470        ) 
    471 
    472    def _do_get(self): 
    473        if self._checked_out: 
    474            if self._checkout_traceback: 
    475                suffix = " at:\n%s" % "".join( 
    476                    chop_traceback(self._checkout_traceback) 
    477                ) 
    478            else: 
    479                suffix = "" 
    480            raise AssertionError("connection is already checked out" + suffix) 
    481 
    482        if not self._conn: 
    483            self._conn = self._create_connection() 
    484 
    485        self._checked_out = True 
    486        if self._store_traceback: 
    487            self._checkout_traceback = traceback.format_stack() 
    488        return self._conn