Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/pool/base.py: 58%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# pool/base.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
9"""Base constructs for connection pools."""
11from __future__ import annotations
13from collections import deque
14import dataclasses
15from enum import Enum
16import threading
17import time
18import typing
19from typing import Any
20from typing import Callable
21from typing import cast
22from typing import Deque
23from typing import Dict
24from typing import List
25from typing import Optional
26from typing import Tuple
27from typing import TYPE_CHECKING
28from typing import Union
29import weakref
31from .. import event
32from .. import exc
33from .. import log
34from .. import util
35from ..util.typing import Literal
36from ..util.typing import Protocol
38if TYPE_CHECKING:
39 from ..engine.interfaces import DBAPIConnection
40 from ..engine.interfaces import DBAPICursor
41 from ..engine.interfaces import Dialect
42 from ..event import _DispatchCommon
43 from ..event import _ListenerFnType
44 from ..event import dispatcher
45 from ..sql._typing import _InfoType
48@dataclasses.dataclass(frozen=True)
49class PoolResetState:
50 """describes the state of a DBAPI connection as it is being passed to
51 the :meth:`.PoolEvents.reset` connection pool event.
53 .. versionadded:: 2.0.0b3
55 """
57 __slots__ = ("transaction_was_reset", "terminate_only", "asyncio_safe")
59 transaction_was_reset: bool
60 """Indicates if the transaction on the DBAPI connection was already
61 essentially "reset" back by the :class:`.Connection` object.
63 This boolean is True if the :class:`.Connection` had transactional
64 state present upon it, which was then not closed using the
65 :meth:`.Connection.rollback` or :meth:`.Connection.commit` method;
66 instead, the transaction was closed inline within the
67 :meth:`.Connection.close` method so is guaranteed to remain non-present
68 when this event is reached.
70 """
72 terminate_only: bool
73 """indicates if the connection is to be immediately terminated and
74 not checked in to the pool.
76 This occurs for connections that were invalidated, as well as asyncio
77 connections that were not cleanly handled by the calling code that
78 are instead being garbage collected. In the latter case,
79 operations can't be safely run on asyncio connections within garbage
80 collection as there is not necessarily an event loop present.
82 """
84 asyncio_safe: bool
85 """Indicates if the reset operation is occurring within a scope where
86 an enclosing event loop is expected to be present for asyncio applications.
88 Will be False in the case that the connection is being garbage collected.
90 """
93class ResetStyle(Enum):
94 """Describe options for "reset on return" behaviors."""
96 reset_rollback = 0
97 reset_commit = 1
98 reset_none = 2
101_ResetStyleArgType = Union[
102 ResetStyle,
103 Literal[True, None, False, "commit", "rollback"],
104]
105reset_rollback, reset_commit, reset_none = list(ResetStyle)
108class _ConnDialect:
109 """partial implementation of :class:`.Dialect`
110 which provides DBAPI connection methods.
112 When a :class:`_pool.Pool` is combined with an :class:`_engine.Engine`,
113 the :class:`_engine.Engine` replaces this with its own
114 :class:`.Dialect`.
116 """
118 is_async = False
119 has_terminate = False
121 def do_rollback(self, dbapi_connection: PoolProxiedConnection) -> None:
122 dbapi_connection.rollback()
124 def do_commit(self, dbapi_connection: PoolProxiedConnection) -> None:
125 dbapi_connection.commit()
127 def do_terminate(self, dbapi_connection: DBAPIConnection) -> None:
128 dbapi_connection.close()
130 def do_close(self, dbapi_connection: DBAPIConnection) -> None:
131 dbapi_connection.close()
133 def _do_ping_w_event(self, dbapi_connection: DBAPIConnection) -> bool:
134 raise NotImplementedError(
135 "The ping feature requires that a dialect is "
136 "passed to the connection pool."
137 )
139 def get_driver_connection(self, connection: DBAPIConnection) -> Any:
140 return connection
143class _AsyncConnDialect(_ConnDialect):
144 is_async = True
147class _CreatorFnType(Protocol):
148 def __call__(self) -> DBAPIConnection: ...
151class _CreatorWRecFnType(Protocol):
152 def __call__(self, rec: ConnectionPoolEntry) -> DBAPIConnection: ...
155class Pool(log.Identified, event.EventTarget):
156 """Abstract base class for connection pools."""
158 dispatch: dispatcher[Pool]
159 echo: log._EchoFlagType
161 _orig_logging_name: Optional[str]
162 _dialect: Union[_ConnDialect, Dialect] = _ConnDialect()
163 _creator_arg: Union[_CreatorFnType, _CreatorWRecFnType]
164 _invoke_creator: _CreatorWRecFnType
165 _invalidate_time: float
167 def __init__(
168 self,
169 creator: Union[_CreatorFnType, _CreatorWRecFnType],
170 recycle: int = -1,
171 echo: log._EchoFlagType = None,
172 logging_name: Optional[str] = None,
173 reset_on_return: _ResetStyleArgType = True,
174 events: Optional[List[Tuple[_ListenerFnType, str]]] = None,
175 dialect: Optional[Union[_ConnDialect, Dialect]] = None,
176 pre_ping: bool = False,
177 _dispatch: Optional[_DispatchCommon[Pool]] = None,
178 ):
179 """
180 Construct a Pool.
182 :param creator: a callable function that returns a DB-API
183 connection object. The function will be called with
184 parameters.
186 :param recycle: If set to a value other than -1, number of
187 seconds between connection recycling, which means upon
188 checkout, if this timeout is surpassed the connection will be
189 closed and replaced with a newly opened connection. Defaults to -1.
191 :param logging_name: String identifier which will be used within
192 the "name" field of logging records generated within the
193 "sqlalchemy.pool" logger. Defaults to a hexstring of the object's
194 id.
196 :param echo: if True, the connection pool will log
197 informational output such as when connections are invalidated
198 as well as when connections are recycled to the default log handler,
199 which defaults to ``sys.stdout`` for output.. If set to the string
200 ``"debug"``, the logging will include pool checkouts and checkins.
202 The :paramref:`_pool.Pool.echo` parameter can also be set from the
203 :func:`_sa.create_engine` call by using the
204 :paramref:`_sa.create_engine.echo_pool` parameter.
206 .. seealso::
208 :ref:`dbengine_logging` - further detail on how to configure
209 logging.
211 :param reset_on_return: Determine steps to take on
212 connections as they are returned to the pool, which were
213 not otherwise handled by a :class:`_engine.Connection`.
214 Available from :func:`_sa.create_engine` via the
215 :paramref:`_sa.create_engine.pool_reset_on_return` parameter.
217 :paramref:`_pool.Pool.reset_on_return` can have any of these values:
219 * ``"rollback"`` - call rollback() on the connection,
220 to release locks and transaction resources.
221 This is the default value. The vast majority
222 of use cases should leave this value set.
223 * ``"commit"`` - call commit() on the connection,
224 to release locks and transaction resources.
225 A commit here may be desirable for databases that
226 cache query plans if a commit is emitted,
227 such as Microsoft SQL Server. However, this
228 value is more dangerous than 'rollback' because
229 any data changes present on the transaction
230 are committed unconditionally.
231 * ``None`` - don't do anything on the connection.
232 This setting may be appropriate if the database / DBAPI
233 works in pure "autocommit" mode at all times, or if
234 a custom reset handler is established using the
235 :meth:`.PoolEvents.reset` event handler.
237 * ``True`` - same as 'rollback', this is here for
238 backwards compatibility.
239 * ``False`` - same as None, this is here for
240 backwards compatibility.
242 For further customization of reset on return, the
243 :meth:`.PoolEvents.reset` event hook may be used which can perform
244 any connection activity desired on reset.
246 .. seealso::
248 :ref:`pool_reset_on_return`
250 :meth:`.PoolEvents.reset`
252 :param events: a list of 2-tuples, each of the form
253 ``(callable, target)`` which will be passed to :func:`.event.listen`
254 upon construction. Provided here so that event listeners
255 can be assigned via :func:`_sa.create_engine` before dialect-level
256 listeners are applied.
258 :param dialect: a :class:`.Dialect` that will handle the job
259 of calling rollback(), close(), or commit() on DBAPI connections.
260 If omitted, a built-in "stub" dialect is used. Applications that
261 make use of :func:`_sa.create_engine` should not use this parameter
262 as it is handled by the engine creation strategy.
264 :param pre_ping: if True, the pool will emit a "ping" (typically
265 "SELECT 1", but is dialect-specific) on the connection
266 upon checkout, to test if the connection is alive or not. If not,
267 the connection is transparently re-connected and upon success, all
268 other pooled connections established prior to that timestamp are
269 invalidated. Requires that a dialect is passed as well to
270 interpret the disconnection error.
272 .. versionadded:: 1.2
274 """
275 if logging_name:
276 self.logging_name = self._orig_logging_name = logging_name
277 else:
278 self._orig_logging_name = None
280 log.instance_logger(self, echoflag=echo)
281 self._creator = creator
282 self._recycle = recycle
283 self._invalidate_time = 0
284 self._pre_ping = pre_ping
285 self._reset_on_return = util.parse_user_argument_for_enum(
286 reset_on_return,
287 {
288 ResetStyle.reset_rollback: ["rollback", True],
289 ResetStyle.reset_none: ["none", None, False],
290 ResetStyle.reset_commit: ["commit"],
291 },
292 "reset_on_return",
293 )
295 self.echo = echo
297 if _dispatch:
298 self.dispatch._update(_dispatch, only_propagate=False)
299 if dialect:
300 self._dialect = dialect
301 if events:
302 for fn, target in events:
303 event.listen(self, target, fn)
305 @util.hybridproperty
306 def _is_asyncio(self) -> bool:
307 return self._dialect.is_async
309 @property
310 def _creator(self) -> Union[_CreatorFnType, _CreatorWRecFnType]:
311 return self._creator_arg
313 @_creator.setter
314 def _creator(
315 self, creator: Union[_CreatorFnType, _CreatorWRecFnType]
316 ) -> None:
317 self._creator_arg = creator
319 # mypy seems to get super confused assigning functions to
320 # attributes
321 self._invoke_creator = self._should_wrap_creator(creator)
323 @_creator.deleter
324 def _creator(self) -> None:
325 # needed for mock testing
326 del self._creator_arg
327 del self._invoke_creator
329 def _should_wrap_creator(
330 self, creator: Union[_CreatorFnType, _CreatorWRecFnType]
331 ) -> _CreatorWRecFnType:
332 """Detect if creator accepts a single argument, or is sent
333 as a legacy style no-arg function.
335 """
337 try:
338 argspec = util.get_callable_argspec(self._creator, no_self=True)
339 except TypeError:
340 creator_fn = cast(_CreatorFnType, creator)
341 return lambda rec: creator_fn()
343 if argspec.defaults is not None:
344 defaulted = len(argspec.defaults)
345 else:
346 defaulted = 0
347 positionals = len(argspec[0]) - defaulted
349 # look for the exact arg signature that DefaultStrategy
350 # sends us
351 if (argspec[0], argspec[3]) == (["connection_record"], (None,)):
352 return cast(_CreatorWRecFnType, creator)
353 # or just a single positional
354 elif positionals == 1:
355 return cast(_CreatorWRecFnType, creator)
356 # all other cases, just wrap and assume legacy "creator" callable
357 # thing
358 else:
359 creator_fn = cast(_CreatorFnType, creator)
360 return lambda rec: creator_fn()
362 def _close_connection(
363 self, connection: DBAPIConnection, *, terminate: bool = False
364 ) -> None:
365 self.logger.debug(
366 "%s connection %r",
367 "Hard-closing" if terminate else "Closing",
368 connection,
369 )
370 try:
371 if terminate:
372 self._dialect.do_terminate(connection)
373 else:
374 self._dialect.do_close(connection)
375 except BaseException as e:
376 self.logger.error(
377 f"Exception {'terminating' if terminate else 'closing'} "
378 f"connection %r",
379 connection,
380 exc_info=True,
381 )
382 if not isinstance(e, Exception):
383 raise
385 def _create_connection(self) -> ConnectionPoolEntry:
386 """Called by subclasses to create a new ConnectionRecord."""
388 return _ConnectionRecord(self)
390 def _invalidate(
391 self,
392 connection: PoolProxiedConnection,
393 exception: Optional[BaseException] = None,
394 _checkin: bool = True,
395 ) -> None:
396 """Mark all connections established within the generation
397 of the given connection as invalidated.
399 If this pool's last invalidate time is before when the given
400 connection was created, update the timestamp til now. Otherwise,
401 no action is performed.
403 Connections with a start time prior to this pool's invalidation
404 time will be recycled upon next checkout.
405 """
406 rec = getattr(connection, "_connection_record", None)
407 if not rec or self._invalidate_time < rec.starttime:
408 self._invalidate_time = time.time()
409 if _checkin and getattr(connection, "is_valid", False):
410 connection.invalidate(exception)
412 def recreate(self) -> Pool:
413 """Return a new :class:`_pool.Pool`, of the same class as this one
414 and configured with identical creation arguments.
416 This method is used in conjunction with :meth:`dispose`
417 to close out an entire :class:`_pool.Pool` and create a new one in
418 its place.
420 """
422 raise NotImplementedError()
424 def dispose(self) -> None:
425 """Dispose of this pool.
427 This method leaves the possibility of checked-out connections
428 remaining open, as it only affects connections that are
429 idle in the pool.
431 .. seealso::
433 :meth:`Pool.recreate`
435 """
437 raise NotImplementedError()
439 def connect(self) -> PoolProxiedConnection:
440 """Return a DBAPI connection from the pool.
442 The connection is instrumented such that when its
443 ``close()`` method is called, the connection will be returned to
444 the pool.
446 """
447 return _ConnectionFairy._checkout(self)
449 def _return_conn(self, record: ConnectionPoolEntry) -> None:
450 """Given a _ConnectionRecord, return it to the :class:`_pool.Pool`.
452 This method is called when an instrumented DBAPI connection
453 has its ``close()`` method called.
455 """
456 self._do_return_conn(record)
458 def _do_get(self) -> ConnectionPoolEntry:
459 """Implementation for :meth:`get`, supplied by subclasses."""
461 raise NotImplementedError()
463 def _do_return_conn(self, record: ConnectionPoolEntry) -> None:
464 """Implementation for :meth:`return_conn`, supplied by subclasses."""
466 raise NotImplementedError()
468 def status(self) -> str:
469 """Returns a brief description of the state of this pool."""
470 raise NotImplementedError()
473class ManagesConnection:
474 """Common base for the two connection-management interfaces
475 :class:`.PoolProxiedConnection` and :class:`.ConnectionPoolEntry`.
477 These two objects are typically exposed in the public facing API
478 via the connection pool event hooks, documented at :class:`.PoolEvents`.
480 .. versionadded:: 2.0
482 """
484 __slots__ = ()
486 dbapi_connection: Optional[DBAPIConnection]
487 """A reference to the actual DBAPI connection being tracked.
489 This is a :pep:`249`-compliant object that for traditional sync-style
490 dialects is provided by the third-party
491 DBAPI implementation in use. For asyncio dialects, the implementation
492 is typically an adapter object provided by the SQLAlchemy dialect
493 itself; the underlying asyncio object is available via the
494 :attr:`.ManagesConnection.driver_connection` attribute.
496 SQLAlchemy's interface for the DBAPI connection is based on the
497 :class:`.DBAPIConnection` protocol object
499 .. seealso::
501 :attr:`.ManagesConnection.driver_connection`
503 :ref:`faq_dbapi_connection`
505 """
507 driver_connection: Optional[Any]
508 """The "driver level" connection object as used by the Python
509 DBAPI or database driver.
511 For traditional :pep:`249` DBAPI implementations, this object will
512 be the same object as that of
513 :attr:`.ManagesConnection.dbapi_connection`. For an asyncio database
514 driver, this will be the ultimate "connection" object used by that
515 driver, such as the ``asyncpg.Connection`` object which will not have
516 standard pep-249 methods.
518 .. versionadded:: 1.4.24
520 .. seealso::
522 :attr:`.ManagesConnection.dbapi_connection`
524 :ref:`faq_dbapi_connection`
526 """
528 @util.ro_memoized_property
529 def info(self) -> _InfoType:
530 """Info dictionary associated with the underlying DBAPI connection
531 referred to by this :class:`.ManagesConnection` instance, allowing
532 user-defined data to be associated with the connection.
534 The data in this dictionary is persistent for the lifespan
535 of the DBAPI connection itself, including across pool checkins
536 and checkouts. When the connection is invalidated
537 and replaced with a new one, this dictionary is cleared.
539 For a :class:`.PoolProxiedConnection` instance that's not associated
540 with a :class:`.ConnectionPoolEntry`, such as if it were detached, the
541 attribute returns a dictionary that is local to that
542 :class:`.ConnectionPoolEntry`. Therefore the
543 :attr:`.ManagesConnection.info` attribute will always provide a Python
544 dictionary.
546 .. seealso::
548 :attr:`.ManagesConnection.record_info`
551 """
552 raise NotImplementedError()
554 @util.ro_memoized_property
555 def record_info(self) -> Optional[_InfoType]:
556 """Persistent info dictionary associated with this
557 :class:`.ManagesConnection`.
559 Unlike the :attr:`.ManagesConnection.info` dictionary, the lifespan
560 of this dictionary is that of the :class:`.ConnectionPoolEntry`
561 which owns it; therefore this dictionary will persist across
562 reconnects and connection invalidation for a particular entry
563 in the connection pool.
565 For a :class:`.PoolProxiedConnection` instance that's not associated
566 with a :class:`.ConnectionPoolEntry`, such as if it were detached, the
567 attribute returns None. Contrast to the :attr:`.ManagesConnection.info`
568 dictionary which is never None.
571 .. seealso::
573 :attr:`.ManagesConnection.info`
575 """
576 raise NotImplementedError()
578 def invalidate(
579 self, e: Optional[BaseException] = None, soft: bool = False
580 ) -> None:
581 """Mark the managed connection as invalidated.
583 :param e: an exception object indicating a reason for the invalidation.
585 :param soft: if True, the connection isn't closed; instead, this
586 connection will be recycled on next checkout.
588 .. seealso::
590 :ref:`pool_connection_invalidation`
593 """
594 raise NotImplementedError()
597class ConnectionPoolEntry(ManagesConnection):
598 """Interface for the object that maintains an individual database
599 connection on behalf of a :class:`_pool.Pool` instance.
601 The :class:`.ConnectionPoolEntry` object represents the long term
602 maintainance of a particular connection for a pool, including expiring or
603 invalidating that connection to have it replaced with a new one, which will
604 continue to be maintained by that same :class:`.ConnectionPoolEntry`
605 instance. Compared to :class:`.PoolProxiedConnection`, which is the
606 short-term, per-checkout connection manager, this object lasts for the
607 lifespan of a particular "slot" within a connection pool.
609 The :class:`.ConnectionPoolEntry` object is mostly visible to public-facing
610 API code when it is delivered to connection pool event hooks, such as
611 :meth:`_events.PoolEvents.connect` and :meth:`_events.PoolEvents.checkout`.
613 .. versionadded:: 2.0 :class:`.ConnectionPoolEntry` provides the public
614 facing interface for the :class:`._ConnectionRecord` internal class.
616 """
618 __slots__ = ()
620 @property
621 def in_use(self) -> bool:
622 """Return True the connection is currently checked out"""
624 raise NotImplementedError()
626 def close(self) -> None:
627 """Close the DBAPI connection managed by this connection pool entry."""
628 raise NotImplementedError()
631class _ConnectionRecord(ConnectionPoolEntry):
632 """Maintains a position in a connection pool which references a pooled
633 connection.
635 This is an internal object used by the :class:`_pool.Pool` implementation
636 to provide context management to a DBAPI connection maintained by
637 that :class:`_pool.Pool`. The public facing interface for this class
638 is described by the :class:`.ConnectionPoolEntry` class. See that
639 class for public API details.
641 .. seealso::
643 :class:`.ConnectionPoolEntry`
645 :class:`.PoolProxiedConnection`
647 """
649 __slots__ = (
650 "__pool",
651 "fairy_ref",
652 "finalize_callback",
653 "fresh",
654 "starttime",
655 "dbapi_connection",
656 "__weakref__",
657 "__dict__",
658 )
660 finalize_callback: Deque[Callable[[DBAPIConnection], None]]
661 fresh: bool
662 fairy_ref: Optional[weakref.ref[_ConnectionFairy]]
663 starttime: float
665 def __init__(self, pool: Pool, connect: bool = True):
666 self.fresh = False
667 self.fairy_ref = None
668 self.starttime = 0
669 self.dbapi_connection = None
671 self.__pool = pool
672 if connect:
673 self.__connect()
674 self.finalize_callback = deque()
676 dbapi_connection: Optional[DBAPIConnection]
678 @property
679 def driver_connection(self) -> Optional[Any]: # type: ignore[override] # mypy#4125 # noqa: E501
680 if self.dbapi_connection is None:
681 return None
682 else:
683 return self.__pool._dialect.get_driver_connection(
684 self.dbapi_connection
685 )
687 @property
688 @util.deprecated(
689 "2.0",
690 "The _ConnectionRecord.connection attribute is deprecated; "
691 "please use 'driver_connection'",
692 )
693 def connection(self) -> Optional[DBAPIConnection]:
694 return self.dbapi_connection
696 _soft_invalidate_time: float = 0
698 @util.ro_memoized_property
699 def info(self) -> _InfoType:
700 return {}
702 @util.ro_memoized_property
703 def record_info(self) -> Optional[_InfoType]:
704 return {}
706 @classmethod
707 def checkout(cls, pool: Pool) -> _ConnectionFairy:
708 if TYPE_CHECKING:
709 rec = cast(_ConnectionRecord, pool._do_get())
710 else:
711 rec = pool._do_get()
713 try:
714 dbapi_connection = rec.get_connection()
715 except BaseException as err:
716 with util.safe_reraise():
717 rec._checkin_failed(err, _fairy_was_created=False)
719 # not reached, for code linters only
720 raise
722 echo = pool._should_log_debug()
723 fairy = _ConnectionFairy(pool, dbapi_connection, rec, echo)
725 rec.fairy_ref = ref = weakref.ref(
726 fairy,
727 lambda ref: (
728 _finalize_fairy(
729 None, rec, pool, ref, echo, transaction_was_reset=False
730 )
731 if _finalize_fairy is not None
732 else None
733 ),
734 )
735 _strong_ref_connection_records[ref] = rec
736 if echo:
737 pool.logger.debug(
738 "Connection %r checked out from pool", dbapi_connection
739 )
740 return fairy
742 def _checkin_failed(
743 self, err: BaseException, _fairy_was_created: bool = True
744 ) -> None:
745 self.invalidate(e=err)
746 self.checkin(
747 _fairy_was_created=_fairy_was_created,
748 )
750 def checkin(self, _fairy_was_created: bool = True) -> None:
751 if self.fairy_ref is None and _fairy_was_created:
752 # _fairy_was_created is False for the initial get connection phase;
753 # meaning there was no _ConnectionFairy and we must unconditionally
754 # do a checkin.
755 #
756 # otherwise, if fairy_was_created==True, if fairy_ref is None here
757 # that means we were checked in already, so this looks like
758 # a double checkin.
759 util.warn("Double checkin attempted on %s" % self)
760 return
761 self.fairy_ref = None
762 connection = self.dbapi_connection
763 pool = self.__pool
764 while self.finalize_callback:
765 finalizer = self.finalize_callback.pop()
766 if connection is not None:
767 finalizer(connection)
768 if pool.dispatch.checkin:
769 pool.dispatch.checkin(connection, self)
771 pool._return_conn(self)
773 @property
774 def in_use(self) -> bool:
775 return self.fairy_ref is not None
777 @property
778 def last_connect_time(self) -> float:
779 return self.starttime
781 def close(self) -> None:
782 if self.dbapi_connection is not None:
783 self.__close()
785 def invalidate(
786 self, e: Optional[BaseException] = None, soft: bool = False
787 ) -> None:
788 # already invalidated
789 if self.dbapi_connection is None:
790 return
791 if soft:
792 self.__pool.dispatch.soft_invalidate(
793 self.dbapi_connection, self, e
794 )
795 else:
796 self.__pool.dispatch.invalidate(self.dbapi_connection, self, e)
797 if e is not None:
798 self.__pool.logger.info(
799 "%sInvalidate connection %r (reason: %s:%s)",
800 "Soft " if soft else "",
801 self.dbapi_connection,
802 e.__class__.__name__,
803 e,
804 )
805 else:
806 self.__pool.logger.info(
807 "%sInvalidate connection %r",
808 "Soft " if soft else "",
809 self.dbapi_connection,
810 )
812 if soft:
813 self._soft_invalidate_time = time.time()
814 else:
815 self.__close(terminate=True)
816 self.dbapi_connection = None
818 def get_connection(self) -> DBAPIConnection:
819 recycle = False
821 # NOTE: the various comparisons here are assuming that measurable time
822 # passes between these state changes. however, time.time() is not
823 # guaranteed to have sub-second precision. comparisons of
824 # "invalidation time" to "starttime" should perhaps use >= so that the
825 # state change can take place assuming no measurable time has passed,
826 # however this does not guarantee correct behavior here as if time
827 # continues to not pass, it will try to reconnect repeatedly until
828 # these timestamps diverge, so in that sense using > is safer. Per
829 # https://stackoverflow.com/a/1938096/34549, Windows time.time() may be
830 # within 16 milliseconds accuracy, so unit tests for connection
831 # invalidation need a sleep of at least this long between initial start
832 # time and invalidation for the logic below to work reliably.
834 if self.dbapi_connection is None:
835 self.info.clear()
836 self.__connect()
837 elif (
838 self.__pool._recycle > -1
839 and time.time() - self.starttime > self.__pool._recycle
840 ):
841 self.__pool.logger.info(
842 "Connection %r exceeded timeout; recycling",
843 self.dbapi_connection,
844 )
845 recycle = True
846 elif self.__pool._invalidate_time > self.starttime:
847 self.__pool.logger.info(
848 "Connection %r invalidated due to pool invalidation; "
849 + "recycling",
850 self.dbapi_connection,
851 )
852 recycle = True
853 elif self._soft_invalidate_time > self.starttime:
854 self.__pool.logger.info(
855 "Connection %r invalidated due to local soft invalidation; "
856 + "recycling",
857 self.dbapi_connection,
858 )
859 recycle = True
861 if recycle:
862 self.__close(terminate=True)
863 self.info.clear()
865 self.__connect()
867 assert self.dbapi_connection is not None
868 return self.dbapi_connection
870 def _is_hard_or_soft_invalidated(self) -> bool:
871 return (
872 self.dbapi_connection is None
873 or self.__pool._invalidate_time > self.starttime
874 or (self._soft_invalidate_time > self.starttime)
875 )
877 def __close(self, *, terminate: bool = False) -> None:
878 self.finalize_callback.clear()
879 if self.__pool.dispatch.close:
880 self.__pool.dispatch.close(self.dbapi_connection, self)
881 assert self.dbapi_connection is not None
882 self.__pool._close_connection(
883 self.dbapi_connection, terminate=terminate
884 )
885 self.dbapi_connection = None
887 def __connect(self) -> None:
888 pool = self.__pool
890 # ensure any existing connection is removed, so that if
891 # creator fails, this attribute stays None
892 self.dbapi_connection = None
893 try:
894 self.starttime = time.time()
895 self.dbapi_connection = connection = pool._invoke_creator(self)
896 pool.logger.debug("Created new connection %r", connection)
897 self.fresh = True
898 except BaseException as e:
899 with util.safe_reraise():
900 pool.logger.debug("Error on connect(): %s", e)
901 else:
902 # in SQLAlchemy 1.4 the first_connect event is not used by
903 # the engine, so this will usually not be set
904 if pool.dispatch.first_connect:
905 pool.dispatch.first_connect.for_modify(
906 pool.dispatch
907 ).exec_once_unless_exception(self.dbapi_connection, self)
909 # init of the dialect now takes place within the connect
910 # event, so ensure a mutex is used on the first run
911 pool.dispatch.connect.for_modify(
912 pool.dispatch
913 )._exec_w_sync_on_first_run(self.dbapi_connection, self)
916def _finalize_fairy(
917 dbapi_connection: Optional[DBAPIConnection],
918 connection_record: Optional[_ConnectionRecord],
919 pool: Pool,
920 ref: Optional[
921 weakref.ref[_ConnectionFairy]
922 ], # this is None when called directly, not by the gc
923 echo: Optional[log._EchoFlagType],
924 transaction_was_reset: bool = False,
925 fairy: Optional[_ConnectionFairy] = None,
926) -> None:
927 """Cleanup for a :class:`._ConnectionFairy` whether or not it's already
928 been garbage collected.
930 When using an async dialect no IO can happen here (without using
931 a dedicated thread), since this is called outside the greenlet
932 context and with an already running loop. In this case function
933 will only log a message and raise a warning.
934 """
936 is_gc_cleanup = ref is not None
938 if is_gc_cleanup:
939 assert ref is not None
940 _strong_ref_connection_records.pop(ref, None)
941 assert connection_record is not None
942 if connection_record.fairy_ref is not ref:
943 return
944 assert dbapi_connection is None
945 dbapi_connection = connection_record.dbapi_connection
947 elif fairy:
948 _strong_ref_connection_records.pop(weakref.ref(fairy), None)
950 # null pool is not _is_asyncio but can be used also with async dialects
951 dont_restore_gced = pool._dialect.is_async
953 if dont_restore_gced:
954 detach = connection_record is None or is_gc_cleanup
955 can_manipulate_connection = not is_gc_cleanup
956 can_close_or_terminate_connection = (
957 not pool._dialect.is_async or pool._dialect.has_terminate
958 )
959 requires_terminate_for_close = (
960 pool._dialect.is_async and pool._dialect.has_terminate
961 )
963 else:
964 detach = connection_record is None
965 can_manipulate_connection = can_close_or_terminate_connection = True
966 requires_terminate_for_close = False
968 if dbapi_connection is not None:
969 if connection_record and echo:
970 pool.logger.debug(
971 "Connection %r being returned to pool", dbapi_connection
972 )
974 try:
975 if not fairy:
976 assert connection_record is not None
977 fairy = _ConnectionFairy(
978 pool,
979 dbapi_connection,
980 connection_record,
981 echo,
982 )
983 assert fairy.dbapi_connection is dbapi_connection
985 fairy._reset(
986 pool,
987 transaction_was_reset=transaction_was_reset,
988 terminate_only=detach,
989 asyncio_safe=can_manipulate_connection,
990 )
992 if detach:
993 if connection_record:
994 fairy._pool = pool
995 fairy.detach()
997 if can_close_or_terminate_connection:
998 if pool.dispatch.close_detached:
999 pool.dispatch.close_detached(dbapi_connection)
1001 pool._close_connection(
1002 dbapi_connection,
1003 terminate=requires_terminate_for_close,
1004 )
1006 except BaseException as e:
1007 pool.logger.error(
1008 "Exception during reset or similar", exc_info=True
1009 )
1010 if connection_record:
1011 connection_record.invalidate(e=e)
1012 if not isinstance(e, Exception):
1013 raise
1014 finally:
1015 if detach and is_gc_cleanup and dont_restore_gced:
1016 message = (
1017 "The garbage collector is trying to clean up "
1018 f"non-checked-in connection {dbapi_connection!r}, "
1019 f"""which will be {
1020 'dropped, as it cannot be safely terminated'
1021 if not can_close_or_terminate_connection
1022 else 'terminated'
1023 }. """
1024 "Please ensure that SQLAlchemy pooled connections are "
1025 "returned to "
1026 "the pool explicitly, either by calling ``close()`` "
1027 "or by using appropriate context managers to manage "
1028 "their lifecycle."
1029 )
1030 pool.logger.error(message)
1031 util.warn(message)
1033 if connection_record and connection_record.fairy_ref is not None:
1034 connection_record.checkin()
1036 # give gc some help. See
1037 # test/engine/test_pool.py::PoolEventsTest::test_checkin_event_gc[True]
1038 # which actually started failing when pytest warnings plugin was
1039 # turned on, due to util.warn() above
1040 if fairy is not None:
1041 fairy.dbapi_connection = None # type: ignore
1042 fairy._connection_record = None
1043 del dbapi_connection
1044 del connection_record
1045 del fairy
1048# a dictionary of the _ConnectionFairy weakrefs to _ConnectionRecord, so that
1049# GC under pypy will call ConnectionFairy finalizers. linked directly to the
1050# weakref that will empty itself when collected so that it should not create
1051# any unmanaged memory references.
1052_strong_ref_connection_records: Dict[
1053 weakref.ref[_ConnectionFairy], _ConnectionRecord
1054] = {}
1057class PoolProxiedConnection(ManagesConnection):
1058 """A connection-like adapter for a :pep:`249` DBAPI connection, which
1059 includes additional methods specific to the :class:`.Pool` implementation.
1061 :class:`.PoolProxiedConnection` is the public-facing interface for the
1062 internal :class:`._ConnectionFairy` implementation object; users familiar
1063 with :class:`._ConnectionFairy` can consider this object to be equivalent.
1065 .. versionadded:: 2.0 :class:`.PoolProxiedConnection` provides the public-
1066 facing interface for the :class:`._ConnectionFairy` internal class.
1068 """
1070 __slots__ = ()
1072 if typing.TYPE_CHECKING:
1074 def commit(self) -> None: ...
1076 def cursor(self, *args: Any, **kwargs: Any) -> DBAPICursor: ...
1078 def rollback(self) -> None: ...
1080 def __getattr__(self, key: str) -> Any: ...
1082 @property
1083 def is_valid(self) -> bool:
1084 """Return True if this :class:`.PoolProxiedConnection` still refers
1085 to an active DBAPI connection."""
1087 raise NotImplementedError()
1089 @property
1090 def is_detached(self) -> bool:
1091 """Return True if this :class:`.PoolProxiedConnection` is detached
1092 from its pool."""
1094 raise NotImplementedError()
1096 def detach(self) -> None:
1097 """Separate this connection from its Pool.
1099 This means that the connection will no longer be returned to the
1100 pool when closed, and will instead be literally closed. The
1101 associated :class:`.ConnectionPoolEntry` is de-associated from this
1102 DBAPI connection.
1104 Note that any overall connection limiting constraints imposed by a
1105 Pool implementation may be violated after a detach, as the detached
1106 connection is removed from the pool's knowledge and control.
1108 """
1110 raise NotImplementedError()
1112 def close(self) -> None:
1113 """Release this connection back to the pool.
1115 The :meth:`.PoolProxiedConnection.close` method shadows the
1116 :pep:`249` ``.close()`` method, altering its behavior to instead
1117 :term:`release` the proxied connection back to the connection pool.
1119 Upon release to the pool, whether the connection stays "opened" and
1120 pooled in the Python process, versus actually closed out and removed
1121 from the Python process, is based on the pool implementation in use and
1122 its configuration and current state.
1124 """
1125 raise NotImplementedError()
1128class _AdhocProxiedConnection(PoolProxiedConnection):
1129 """provides the :class:`.PoolProxiedConnection` interface for cases where
1130 the DBAPI connection is not actually proxied.
1132 This is used by the engine internals to pass a consistent
1133 :class:`.PoolProxiedConnection` object to consuming dialects in response to
1134 pool events that may not always have the :class:`._ConnectionFairy`
1135 available.
1137 """
1139 __slots__ = ("dbapi_connection", "_connection_record", "_is_valid")
1141 dbapi_connection: DBAPIConnection
1142 _connection_record: ConnectionPoolEntry
1144 def __init__(
1145 self,
1146 dbapi_connection: DBAPIConnection,
1147 connection_record: ConnectionPoolEntry,
1148 ):
1149 self.dbapi_connection = dbapi_connection
1150 self._connection_record = connection_record
1151 self._is_valid = True
1153 @property
1154 def driver_connection(self) -> Any: # type: ignore[override] # mypy#4125
1155 return self._connection_record.driver_connection
1157 @property
1158 def connection(self) -> DBAPIConnection:
1159 return self.dbapi_connection
1161 @property
1162 def is_valid(self) -> bool:
1163 """Implement is_valid state attribute.
1165 for the adhoc proxied connection it's assumed the connection is valid
1166 as there is no "invalidate" routine.
1168 """
1169 return self._is_valid
1171 def invalidate(
1172 self, e: Optional[BaseException] = None, soft: bool = False
1173 ) -> None:
1174 self._is_valid = False
1176 @util.ro_non_memoized_property
1177 def record_info(self) -> Optional[_InfoType]:
1178 return self._connection_record.record_info
1180 def cursor(self, *args: Any, **kwargs: Any) -> DBAPICursor:
1181 return self.dbapi_connection.cursor(*args, **kwargs)
1183 def __getattr__(self, key: Any) -> Any:
1184 return getattr(self.dbapi_connection, key)
1187class _ConnectionFairy(PoolProxiedConnection):
1188 """Proxies a DBAPI connection and provides return-on-dereference
1189 support.
1191 This is an internal object used by the :class:`_pool.Pool` implementation
1192 to provide context management to a DBAPI connection delivered by
1193 that :class:`_pool.Pool`. The public facing interface for this class
1194 is described by the :class:`.PoolProxiedConnection` class. See that
1195 class for public API details.
1197 The name "fairy" is inspired by the fact that the
1198 :class:`._ConnectionFairy` object's lifespan is transitory, as it lasts
1199 only for the length of a specific DBAPI connection being checked out from
1200 the pool, and additionally that as a transparent proxy, it is mostly
1201 invisible.
1203 .. seealso::
1205 :class:`.PoolProxiedConnection`
1207 :class:`.ConnectionPoolEntry`
1210 """
1212 __slots__ = (
1213 "dbapi_connection",
1214 "_connection_record",
1215 "_echo",
1216 "_pool",
1217 "_counter",
1218 "__weakref__",
1219 "__dict__",
1220 )
1222 pool: Pool
1223 dbapi_connection: DBAPIConnection
1224 _echo: log._EchoFlagType
1226 def __init__(
1227 self,
1228 pool: Pool,
1229 dbapi_connection: DBAPIConnection,
1230 connection_record: _ConnectionRecord,
1231 echo: log._EchoFlagType,
1232 ):
1233 self._pool = pool
1234 self._counter = 0
1235 self.dbapi_connection = dbapi_connection
1236 self._connection_record = connection_record
1237 self._echo = echo
1239 _connection_record: Optional[_ConnectionRecord]
1241 @property
1242 def driver_connection(self) -> Optional[Any]: # type: ignore[override] # mypy#4125 # noqa: E501
1243 if self._connection_record is None:
1244 return None
1245 return self._connection_record.driver_connection
1247 @property
1248 @util.deprecated(
1249 "2.0",
1250 "The _ConnectionFairy.connection attribute is deprecated; "
1251 "please use 'driver_connection'",
1252 )
1253 def connection(self) -> DBAPIConnection:
1254 return self.dbapi_connection
1256 @classmethod
1257 def _checkout(
1258 cls,
1259 pool: Pool,
1260 threadconns: Optional[threading.local] = None,
1261 fairy: Optional[_ConnectionFairy] = None,
1262 ) -> _ConnectionFairy:
1263 if not fairy:
1264 fairy = _ConnectionRecord.checkout(pool)
1266 if threadconns is not None:
1267 threadconns.current = weakref.ref(fairy)
1269 assert (
1270 fairy._connection_record is not None
1271 ), "can't 'checkout' a detached connection fairy"
1272 assert (
1273 fairy.dbapi_connection is not None
1274 ), "can't 'checkout' an invalidated connection fairy"
1276 fairy._counter += 1
1277 if (
1278 not pool.dispatch.checkout and not pool._pre_ping
1279 ) or fairy._counter != 1:
1280 return fairy
1282 # Pool listeners can trigger a reconnection on checkout, as well
1283 # as the pre-pinger.
1284 # there are three attempts made here, but note that if the database
1285 # is not accessible from a connection standpoint, those won't proceed
1286 # here.
1288 attempts = 2
1290 while attempts > 0:
1291 connection_is_fresh = fairy._connection_record.fresh
1292 fairy._connection_record.fresh = False
1293 try:
1294 if pool._pre_ping:
1295 if not connection_is_fresh:
1296 if fairy._echo:
1297 pool.logger.debug(
1298 "Pool pre-ping on connection %s",
1299 fairy.dbapi_connection,
1300 )
1301 result = pool._dialect._do_ping_w_event(
1302 fairy.dbapi_connection
1303 )
1304 if not result:
1305 if fairy._echo:
1306 pool.logger.debug(
1307 "Pool pre-ping on connection %s failed, "
1308 "will invalidate pool",
1309 fairy.dbapi_connection,
1310 )
1311 raise exc.InvalidatePoolError()
1312 elif fairy._echo:
1313 pool.logger.debug(
1314 "Connection %s is fresh, skipping pre-ping",
1315 fairy.dbapi_connection,
1316 )
1318 pool.dispatch.checkout(
1319 fairy.dbapi_connection, fairy._connection_record, fairy
1320 )
1321 return fairy
1322 except exc.DisconnectionError as e:
1323 if e.invalidate_pool:
1324 pool.logger.info(
1325 "Disconnection detected on checkout, "
1326 "invalidating all pooled connections prior to "
1327 "current timestamp (reason: %r)",
1328 e,
1329 )
1330 fairy._connection_record.invalidate(e)
1331 pool._invalidate(fairy, e, _checkin=False)
1332 else:
1333 pool.logger.info(
1334 "Disconnection detected on checkout, "
1335 "invalidating individual connection %s (reason: %r)",
1336 fairy.dbapi_connection,
1337 e,
1338 )
1339 fairy._connection_record.invalidate(e)
1340 try:
1341 fairy.dbapi_connection = (
1342 fairy._connection_record.get_connection()
1343 )
1344 except BaseException as err:
1345 with util.safe_reraise():
1346 fairy._connection_record._checkin_failed(
1347 err,
1348 _fairy_was_created=True,
1349 )
1351 # prevent _ConnectionFairy from being carried
1352 # in the stack trace. Do this after the
1353 # connection record has been checked in, so that
1354 # if the del triggers a finalize fairy, it won't
1355 # try to checkin a second time.
1356 del fairy
1358 # never called, this is for code linters
1359 raise
1361 attempts -= 1
1362 except BaseException as be_outer:
1363 with util.safe_reraise():
1364 rec = fairy._connection_record
1365 if rec is not None:
1366 rec._checkin_failed(
1367 be_outer,
1368 _fairy_was_created=True,
1369 )
1371 # prevent _ConnectionFairy from being carried
1372 # in the stack trace, see above
1373 del fairy
1375 # never called, this is for code linters
1376 raise
1378 pool.logger.info("Reconnection attempts exhausted on checkout")
1379 fairy.invalidate()
1380 raise exc.InvalidRequestError("This connection is closed")
1382 def _checkout_existing(self) -> _ConnectionFairy:
1383 return _ConnectionFairy._checkout(self._pool, fairy=self)
1385 def _checkin(self, transaction_was_reset: bool = False) -> None:
1386 _finalize_fairy(
1387 self.dbapi_connection,
1388 self._connection_record,
1389 self._pool,
1390 None,
1391 self._echo,
1392 transaction_was_reset=transaction_was_reset,
1393 fairy=self,
1394 )
1396 def _close(self) -> None:
1397 self._checkin()
1399 def _reset(
1400 self,
1401 pool: Pool,
1402 transaction_was_reset: bool,
1403 terminate_only: bool,
1404 asyncio_safe: bool,
1405 ) -> None:
1406 if pool.dispatch.reset:
1407 pool.dispatch.reset(
1408 self.dbapi_connection,
1409 self._connection_record,
1410 PoolResetState(
1411 transaction_was_reset=transaction_was_reset,
1412 terminate_only=terminate_only,
1413 asyncio_safe=asyncio_safe,
1414 ),
1415 )
1417 if not asyncio_safe:
1418 return
1420 if pool._reset_on_return is reset_rollback:
1421 if transaction_was_reset:
1422 if self._echo:
1423 pool.logger.debug(
1424 "Connection %s reset, transaction already reset",
1425 self.dbapi_connection,
1426 )
1427 else:
1428 if self._echo:
1429 pool.logger.debug(
1430 "Connection %s rollback-on-return",
1431 self.dbapi_connection,
1432 )
1433 pool._dialect.do_rollback(self)
1434 elif pool._reset_on_return is reset_commit:
1435 if self._echo:
1436 pool.logger.debug(
1437 "Connection %s commit-on-return",
1438 self.dbapi_connection,
1439 )
1440 pool._dialect.do_commit(self)
1442 @property
1443 def _logger(self) -> log._IdentifiedLoggerType:
1444 return self._pool.logger
1446 @property
1447 def is_valid(self) -> bool:
1448 return self.dbapi_connection is not None
1450 @property
1451 def is_detached(self) -> bool:
1452 return self._connection_record is None
1454 @util.ro_memoized_property
1455 def info(self) -> _InfoType:
1456 if self._connection_record is None:
1457 return {}
1458 else:
1459 return self._connection_record.info
1461 @util.ro_non_memoized_property
1462 def record_info(self) -> Optional[_InfoType]:
1463 if self._connection_record is None:
1464 return None
1465 else:
1466 return self._connection_record.record_info
1468 def invalidate(
1469 self, e: Optional[BaseException] = None, soft: bool = False
1470 ) -> None:
1471 if self.dbapi_connection is None:
1472 util.warn("Can't invalidate an already-closed connection.")
1473 return
1474 if self._connection_record:
1475 self._connection_record.invalidate(e=e, soft=soft)
1476 if not soft:
1477 # prevent any rollback / reset actions etc. on
1478 # the connection
1479 self.dbapi_connection = None # type: ignore
1481 # finalize
1482 self._checkin()
1484 def cursor(self, *args: Any, **kwargs: Any) -> DBAPICursor:
1485 assert self.dbapi_connection is not None
1486 return self.dbapi_connection.cursor(*args, **kwargs)
1488 def __getattr__(self, key: str) -> Any:
1489 return getattr(self.dbapi_connection, key)
1491 def detach(self) -> None:
1492 if self._connection_record is not None:
1493 rec = self._connection_record
1494 rec.fairy_ref = None
1495 rec.dbapi_connection = None
1496 # TODO: should this be _return_conn?
1497 self._pool._do_return_conn(self._connection_record)
1499 # can't get the descriptor assignment to work here
1500 # in pylance. mypy is OK w/ it
1501 self.info = self.info.copy() # type: ignore
1503 self._connection_record = None
1505 if self._pool.dispatch.detach:
1506 self._pool.dispatch.detach(self.dbapi_connection, rec)
1508 def close(self) -> None:
1509 self._counter -= 1
1510 if self._counter == 0:
1511 self._checkin()
1513 def _close_special(self, transaction_reset: bool = False) -> None:
1514 self._counter -= 1
1515 if self._counter == 0:
1516 self._checkin(transaction_was_reset=transaction_reset)