Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/pool/base.py: 57%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# pool/base.py
2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
9"""Base constructs for connection pools."""
11from __future__ import annotations
13from collections import deque
14import dataclasses
15from enum import Enum
16import threading
17import time
18import typing
19from typing import Any
20from typing import Callable
21from typing import cast
22from typing import Deque
23from typing import Dict
24from typing import List
25from typing import Literal
26from typing import Optional
27from typing import Protocol
28from typing import Tuple
29from typing import TYPE_CHECKING
30from typing import Union
31import weakref
33from .. import event
34from .. import exc
35from .. import log
36from .. import util
37from ..util.typing import Self
39if TYPE_CHECKING:
40 from ..engine.interfaces import DBAPIConnection
41 from ..engine.interfaces import DBAPICursor
42 from ..engine.interfaces import Dialect
43 from ..event import _DispatchCommon
44 from ..event import _ListenerFnType
45 from ..event import dispatcher
46 from ..sql._typing import _InfoType
49@dataclasses.dataclass(frozen=True)
50class PoolResetState:
51 """describes the state of a DBAPI connection as it is being passed to
52 the :meth:`.PoolEvents.reset` connection pool event.
54 .. versionadded:: 2.0.0b3
56 """
58 __slots__ = ("transaction_was_reset", "terminate_only", "asyncio_safe")
60 transaction_was_reset: bool
61 """Indicates if the transaction on the DBAPI connection was already
62 essentially "reset" back by the :class:`.Connection` object.
64 This boolean is True if the :class:`.Connection` had transactional
65 state present upon it, which was then not closed using the
66 :meth:`.Connection.rollback` or :meth:`.Connection.commit` method;
67 instead, the transaction was closed inline within the
68 :meth:`.Connection.close` method so is guaranteed to remain non-present
69 when this event is reached.
71 """
73 terminate_only: bool
74 """indicates if the connection is to be immediately terminated and
75 not checked in to the pool.
77 This occurs for connections that were invalidated, as well as asyncio
78 connections that were not cleanly handled by the calling code that
79 are instead being garbage collected. In the latter case,
80 operations can't be safely run on asyncio connections within garbage
81 collection as there is not necessarily an event loop present.
83 """
85 asyncio_safe: bool
86 """Indicates if the reset operation is occurring within a scope where
87 an enclosing event loop is expected to be present for asyncio applications.
89 Will be False in the case that the connection is being garbage collected.
91 """
94class ResetStyle(Enum):
95 """Describe options for "reset on return" behaviors."""
97 reset_rollback = 0
98 reset_commit = 1
99 reset_none = 2
102_ResetStyleArgType = Union[
103 ResetStyle,
104 Literal[True, None, False, "commit", "rollback"],
105]
106reset_rollback, reset_commit, reset_none = list(ResetStyle)
109class _ConnDialect:
110 """partial implementation of :class:`.Dialect`
111 which provides DBAPI connection methods.
113 When a :class:`_pool.Pool` is combined with an :class:`_engine.Engine`,
114 the :class:`_engine.Engine` replaces this with its own
115 :class:`.Dialect`.
117 """
119 is_async = False
120 has_terminate = False
122 def do_rollback(self, dbapi_connection: PoolProxiedConnection) -> None:
123 dbapi_connection.rollback()
125 def do_commit(self, dbapi_connection: PoolProxiedConnection) -> None:
126 dbapi_connection.commit()
128 def do_terminate(self, dbapi_connection: DBAPIConnection) -> None:
129 dbapi_connection.close()
131 def do_close(self, dbapi_connection: DBAPIConnection) -> None:
132 dbapi_connection.close()
134 def _do_ping_w_event(self, dbapi_connection: DBAPIConnection) -> bool:
135 raise NotImplementedError(
136 "The ping feature requires that a dialect is "
137 "passed to the connection pool."
138 )
140 def get_driver_connection(self, connection: DBAPIConnection) -> Any:
141 return connection
144class _AsyncConnDialect(_ConnDialect):
145 is_async = True
148class _CreatorFnType(Protocol):
149 def __call__(self) -> DBAPIConnection: ...
152class _CreatorWRecFnType(Protocol):
153 def __call__(self, rec: ConnectionPoolEntry) -> DBAPIConnection: ...
156class Pool(log.Identified, event.EventTarget):
157 """Abstract base class for connection pools."""
159 dispatch: dispatcher[Pool]
160 echo: log._EchoFlagType
162 _orig_logging_name: Optional[str]
163 _dialect: Union[_ConnDialect, Dialect] = _ConnDialect()
164 _creator_arg: Union[_CreatorFnType, _CreatorWRecFnType]
165 _invoke_creator: _CreatorWRecFnType
166 _invalidate_time: float
168 def __init__(
169 self,
170 creator: Union[_CreatorFnType, _CreatorWRecFnType],
171 recycle: int = -1,
172 echo: log._EchoFlagType = None,
173 logging_name: Optional[str] = None,
174 reset_on_return: _ResetStyleArgType = True,
175 events: Optional[List[Tuple[_ListenerFnType, str]]] = None,
176 dialect: Optional[Union[_ConnDialect, Dialect]] = None,
177 pre_ping: bool = False,
178 _dispatch: Optional[_DispatchCommon[Pool]] = None,
179 ):
180 """
181 Construct a Pool.
183 :param creator: a callable function that returns a DB-API
184 connection object. The function will be called with
185 parameters.
187 :param recycle: If set to a value other than -1, number of
188 seconds between connection recycling, which means upon
189 checkout, if this timeout is surpassed the connection will be
190 closed and replaced with a newly opened connection. Defaults to -1.
192 :param logging_name: String identifier which will be used within
193 the "name" field of logging records generated within the
194 "sqlalchemy.pool" logger. Defaults to a hexstring of the object's
195 id.
197 :param echo: if True, the connection pool will log
198 informational output such as when connections are invalidated
199 as well as when connections are recycled to the default log handler,
200 which defaults to ``sys.stdout`` for output.. If set to the string
201 ``"debug"``, the logging will include pool checkouts and checkins.
203 The :paramref:`_pool.Pool.echo` parameter can also be set from the
204 :func:`_sa.create_engine` call by using the
205 :paramref:`_sa.create_engine.echo_pool` parameter.
207 .. seealso::
209 :ref:`dbengine_logging` - further detail on how to configure
210 logging.
212 :param reset_on_return: Determine steps to take on
213 connections as they are returned to the pool, which were
214 not otherwise handled by a :class:`_engine.Connection`.
215 Available from :func:`_sa.create_engine` via the
216 :paramref:`_sa.create_engine.pool_reset_on_return` parameter.
218 :paramref:`_pool.Pool.reset_on_return` can have any of these values:
220 * ``"rollback"`` - call rollback() on the connection,
221 to release locks and transaction resources.
222 This is the default value. The vast majority
223 of use cases should leave this value set.
224 * ``"commit"`` - call commit() on the connection,
225 to release locks and transaction resources.
226 A commit here may be desirable for databases that
227 cache query plans if a commit is emitted,
228 such as Microsoft SQL Server. However, this
229 value is more dangerous than 'rollback' because
230 any data changes present on the transaction
231 are committed unconditionally.
232 * ``None`` - don't do anything on the connection.
233 This setting may be appropriate if the database / DBAPI
234 works in pure "autocommit" mode at all times, or if
235 a custom reset handler is established using the
236 :meth:`.PoolEvents.reset` event handler.
238 * ``True`` - same as 'rollback', this is here for
239 backwards compatibility.
240 * ``False`` - same as None, this is here for
241 backwards compatibility.
243 For further customization of reset on return, the
244 :meth:`.PoolEvents.reset` event hook may be used which can perform
245 any connection activity desired on reset.
247 .. seealso::
249 :ref:`pool_reset_on_return`
251 :meth:`.PoolEvents.reset`
253 :param events: a list of 2-tuples, each of the form
254 ``(callable, target)`` which will be passed to :func:`.event.listen`
255 upon construction. Provided here so that event listeners
256 can be assigned via :func:`_sa.create_engine` before dialect-level
257 listeners are applied.
259 :param dialect: a :class:`.Dialect` that will handle the job
260 of calling rollback(), close(), or commit() on DBAPI connections.
261 If omitted, a built-in "stub" dialect is used. Applications that
262 make use of :func:`_sa.create_engine` should not use this parameter
263 as it is handled by the engine creation strategy.
265 :param pre_ping: if True, the pool will emit a "ping" (typically
266 "SELECT 1", but is dialect-specific) on the connection
267 upon checkout, to test if the connection is alive or not. If not,
268 the connection is transparently re-connected and upon success, all
269 other pooled connections established prior to that timestamp are
270 invalidated. Requires that a dialect is passed as well to
271 interpret the disconnection error.
273 """
274 if logging_name:
275 self.logging_name = self._orig_logging_name = logging_name
276 else:
277 self._orig_logging_name = None
279 log.instance_logger(self, echoflag=echo)
280 self._creator = creator
281 self._recycle = recycle
282 self._invalidate_time = 0
283 self._pre_ping = pre_ping
284 self._reset_on_return = util.parse_user_argument_for_enum(
285 reset_on_return,
286 {
287 ResetStyle.reset_rollback: ["rollback", True],
288 ResetStyle.reset_none: ["none", None, False],
289 ResetStyle.reset_commit: ["commit"],
290 },
291 "reset_on_return",
292 )
294 self.echo = echo
296 if _dispatch:
297 self.dispatch._update(_dispatch, only_propagate=False)
298 if dialect:
299 self._dialect = dialect
300 if events:
301 for fn, target in events:
302 event.listen(self, target, fn)
304 @util.hybridproperty
305 def _is_asyncio(self) -> bool:
306 return self._dialect.is_async
308 @property
309 def _creator(self) -> Union[_CreatorFnType, _CreatorWRecFnType]:
310 return self._creator_arg
312 @_creator.setter
313 def _creator(
314 self, creator: Union[_CreatorFnType, _CreatorWRecFnType]
315 ) -> None:
316 self._creator_arg = creator
318 # mypy seems to get super confused assigning functions to
319 # attributes
320 self._invoke_creator = self._should_wrap_creator(creator)
322 @_creator.deleter
323 def _creator(self) -> None:
324 # needed for mock testing
325 del self._creator_arg
326 del self._invoke_creator
328 def _should_wrap_creator(
329 self, creator: Union[_CreatorFnType, _CreatorWRecFnType]
330 ) -> _CreatorWRecFnType:
331 """Detect if creator accepts a single argument, or is sent
332 as a legacy style no-arg function.
334 """
336 try:
337 argspec = util.get_callable_argspec(self._creator, no_self=True)
338 except TypeError:
339 creator_fn = cast(_CreatorFnType, creator)
340 return lambda rec: creator_fn()
342 if argspec.defaults is not None:
343 defaulted = len(argspec.defaults)
344 else:
345 defaulted = 0
346 positionals = len(argspec[0]) - defaulted
348 # look for the exact arg signature that DefaultStrategy
349 # sends us
350 if (argspec[0], argspec[3]) == (["connection_record"], (None,)):
351 return cast(_CreatorWRecFnType, creator)
352 # or just a single positional
353 elif positionals == 1:
354 return cast(_CreatorWRecFnType, creator)
355 # all other cases, just wrap and assume legacy "creator" callable
356 # thing
357 else:
358 creator_fn = cast(_CreatorFnType, creator)
359 return lambda rec: creator_fn()
361 def _close_connection(
362 self, connection: DBAPIConnection, *, terminate: bool = False
363 ) -> None:
364 self.logger.debug(
365 "%s connection %r",
366 "Hard-closing" if terminate else "Closing",
367 connection,
368 )
369 try:
370 if terminate:
371 self._dialect.do_terminate(connection)
372 else:
373 self._dialect.do_close(connection)
374 except BaseException as e:
375 self.logger.error(
376 f"Exception {'terminating' if terminate else 'closing'} "
377 f"connection %r",
378 connection,
379 exc_info=True,
380 )
381 if not isinstance(e, Exception):
382 raise
384 def _create_connection(self) -> ConnectionPoolEntry:
385 """Called by subclasses to create a new ConnectionRecord."""
387 return _ConnectionRecord(self)
389 def _invalidate(
390 self,
391 connection: PoolProxiedConnection,
392 exception: Optional[BaseException] = None,
393 _checkin: bool = True,
394 ) -> None:
395 """Mark all connections established within the generation
396 of the given connection as invalidated.
398 If this pool's last invalidate time is before when the given
399 connection was created, update the timestamp til now. Otherwise,
400 no action is performed.
402 Connections with a start time prior to this pool's invalidation
403 time will be recycled upon next checkout.
404 """
405 rec = getattr(connection, "_connection_record", None)
406 if not rec or self._invalidate_time < rec.starttime:
407 self._invalidate_time = time.time()
408 if _checkin and getattr(connection, "is_valid", False):
409 connection.invalidate(exception)
411 def recreate(self) -> Pool:
412 """Return a new :class:`_pool.Pool`, of the same class as this one
413 and configured with identical creation arguments.
415 This method is used in conjunction with :meth:`dispose`
416 to close out an entire :class:`_pool.Pool` and create a new one in
417 its place.
419 """
421 raise NotImplementedError()
423 def dispose(self) -> None:
424 """Dispose of this pool.
426 This method leaves the possibility of checked-out connections
427 remaining open, as it only affects connections that are
428 idle in the pool.
430 .. seealso::
432 :meth:`Pool.recreate`
434 """
436 raise NotImplementedError()
438 def connect(self) -> PoolProxiedConnection:
439 """Return a DBAPI connection from the pool.
441 The connection is instrumented such that when its
442 ``close()`` method is called, the connection will be returned to
443 the pool.
445 """
446 return _ConnectionFairy._checkout(self)
448 def _return_conn(self, record: ConnectionPoolEntry) -> None:
449 """Given a _ConnectionRecord, return it to the :class:`_pool.Pool`.
451 This method is called when an instrumented DBAPI connection
452 has its ``close()`` method called.
454 """
455 self._do_return_conn(record)
457 def _do_get(self) -> ConnectionPoolEntry:
458 """Implementation for :meth:`get`, supplied by subclasses."""
460 raise NotImplementedError()
462 def _do_return_conn(self, record: ConnectionPoolEntry) -> None:
463 """Implementation for :meth:`return_conn`, supplied by subclasses."""
465 raise NotImplementedError()
467 def status(self) -> str:
468 """Returns a brief description of the state of this pool."""
469 raise NotImplementedError()
472class ManagesConnection:
473 """Common base for the two connection-management interfaces
474 :class:`.PoolProxiedConnection` and :class:`.ConnectionPoolEntry`.
476 These two objects are typically exposed in the public facing API
477 via the connection pool event hooks, documented at :class:`.PoolEvents`.
479 .. versionadded:: 2.0
481 """
483 __slots__ = ()
485 dbapi_connection: Optional[DBAPIConnection]
486 """A reference to the actual DBAPI connection being tracked.
488 This is a :pep:`249`-compliant object that for traditional sync-style
489 dialects is provided by the third-party
490 DBAPI implementation in use. For asyncio dialects, the implementation
491 is typically an adapter object provided by the SQLAlchemy dialect
492 itself; the underlying asyncio object is available via the
493 :attr:`.ManagesConnection.driver_connection` attribute.
495 SQLAlchemy's interface for the DBAPI connection is based on the
496 :class:`.DBAPIConnection` protocol object
498 .. seealso::
500 :attr:`.ManagesConnection.driver_connection`
502 :ref:`faq_dbapi_connection`
504 """
506 driver_connection: Optional[Any]
507 """The "driver level" connection object as used by the Python
508 DBAPI or database driver.
510 For traditional :pep:`249` DBAPI implementations, this object will
511 be the same object as that of
512 :attr:`.ManagesConnection.dbapi_connection`. For an asyncio database
513 driver, this will be the ultimate "connection" object used by that
514 driver, such as the ``asyncpg.Connection`` object which will not have
515 standard pep-249 methods.
517 .. versionadded:: 1.4.24
519 .. seealso::
521 :attr:`.ManagesConnection.dbapi_connection`
523 :ref:`faq_dbapi_connection`
525 """
527 @util.ro_memoized_property
528 def info(self) -> _InfoType:
529 """Info dictionary associated with the underlying DBAPI connection
530 referred to by this :class:`.ManagesConnection` instance, allowing
531 user-defined data to be associated with the connection.
533 The data in this dictionary is persistent for the lifespan
534 of the DBAPI connection itself, including across pool checkins
535 and checkouts. When the connection is invalidated
536 and replaced with a new one, this dictionary is cleared.
538 For a :class:`.PoolProxiedConnection` instance that's not associated
539 with a :class:`.ConnectionPoolEntry`, such as if it were detached, the
540 attribute returns a dictionary that is local to that
541 :class:`.ConnectionPoolEntry`. Therefore the
542 :attr:`.ManagesConnection.info` attribute will always provide a Python
543 dictionary.
545 .. seealso::
547 :attr:`.ManagesConnection.record_info`
550 """
551 raise NotImplementedError()
553 @util.ro_memoized_property
554 def record_info(self) -> Optional[_InfoType]:
555 """Persistent info dictionary associated with this
556 :class:`.ManagesConnection`.
558 Unlike the :attr:`.ManagesConnection.info` dictionary, the lifespan
559 of this dictionary is that of the :class:`.ConnectionPoolEntry`
560 which owns it; therefore this dictionary will persist across
561 reconnects and connection invalidation for a particular entry
562 in the connection pool.
564 For a :class:`.PoolProxiedConnection` instance that's not associated
565 with a :class:`.ConnectionPoolEntry`, such as if it were detached, the
566 attribute returns None. Contrast to the :attr:`.ManagesConnection.info`
567 dictionary which is never None.
570 .. seealso::
572 :attr:`.ManagesConnection.info`
574 """
575 raise NotImplementedError()
577 def invalidate(
578 self, e: Optional[BaseException] = None, soft: bool = False
579 ) -> None:
580 """Mark the managed connection as invalidated.
582 :param e: an exception object indicating a reason for the invalidation.
584 :param soft: if True, the connection isn't closed; instead, this
585 connection will be recycled on next checkout.
587 .. seealso::
589 :ref:`pool_connection_invalidation`
592 """
593 raise NotImplementedError()
596class ConnectionPoolEntry(ManagesConnection):
597 """Interface for the object that maintains an individual database
598 connection on behalf of a :class:`_pool.Pool` instance.
600 The :class:`.ConnectionPoolEntry` object represents the long term
601 maintenance of a particular connection for a pool, including expiring or
602 invalidating that connection to have it replaced with a new one, which will
603 continue to be maintained by that same :class:`.ConnectionPoolEntry`
604 instance. Compared to :class:`.PoolProxiedConnection`, which is the
605 short-term, per-checkout connection manager, this object lasts for the
606 lifespan of a particular "slot" within a connection pool.
608 The :class:`.ConnectionPoolEntry` object is mostly visible to public-facing
609 API code when it is delivered to connection pool event hooks, such as
610 :meth:`_events.PoolEvents.connect` and :meth:`_events.PoolEvents.checkout`.
612 .. versionadded:: 2.0 :class:`.ConnectionPoolEntry` provides the public
613 facing interface for the :class:`._ConnectionRecord` internal class.
615 """
617 __slots__ = ()
619 @property
620 def in_use(self) -> bool:
621 """Return True the connection is currently checked out"""
623 raise NotImplementedError()
625 def close(self) -> None:
626 """Close the DBAPI connection managed by this connection pool entry."""
627 raise NotImplementedError()
630class _ConnectionRecord(ConnectionPoolEntry):
631 """Maintains a position in a connection pool which references a pooled
632 connection.
634 This is an internal object used by the :class:`_pool.Pool` implementation
635 to provide context management to a DBAPI connection maintained by
636 that :class:`_pool.Pool`. The public facing interface for this class
637 is described by the :class:`.ConnectionPoolEntry` class. See that
638 class for public API details.
640 .. seealso::
642 :class:`.ConnectionPoolEntry`
644 :class:`.PoolProxiedConnection`
646 """
648 __slots__ = (
649 "__pool",
650 "fairy_ref",
651 "finalize_callback",
652 "fresh",
653 "starttime",
654 "dbapi_connection",
655 "__weakref__",
656 "__dict__",
657 )
659 finalize_callback: Deque[Callable[[DBAPIConnection], None]]
660 fresh: bool
661 fairy_ref: Optional[weakref.ref[_ConnectionFairy]]
662 starttime: float
664 def __init__(self, pool: Pool, connect: bool = True):
665 self.fresh = False
666 self.fairy_ref = None
667 self.starttime = 0
668 self.dbapi_connection = None
670 self.__pool = pool
671 if connect:
672 self.__connect()
673 self.finalize_callback = deque()
675 dbapi_connection: Optional[DBAPIConnection]
677 @property
678 def driver_connection(self) -> Optional[Any]: # type: ignore[override] # mypy#4125 # noqa: E501
679 if self.dbapi_connection is None:
680 return None
681 else:
682 return self.__pool._dialect.get_driver_connection(
683 self.dbapi_connection
684 )
686 @property
687 @util.deprecated(
688 "2.0",
689 "The _ConnectionRecord.connection attribute is deprecated; "
690 "please use 'driver_connection'",
691 )
692 def connection(self) -> Optional[DBAPIConnection]:
693 return self.dbapi_connection
695 _soft_invalidate_time: float = 0
697 @util.ro_memoized_property
698 def info(self) -> _InfoType:
699 return {}
701 @util.ro_memoized_property
702 def record_info(self) -> Optional[_InfoType]:
703 return {}
705 @classmethod
706 def checkout(cls, pool: Pool) -> _ConnectionFairy:
707 if TYPE_CHECKING:
708 rec = cast(_ConnectionRecord, pool._do_get())
709 else:
710 rec = pool._do_get()
712 try:
713 dbapi_connection = rec.get_connection()
714 except BaseException as err:
715 with util.safe_reraise():
716 rec._checkin_failed(err, _fairy_was_created=False)
718 # not reached, for code linters only
719 raise
721 echo = pool._should_log_debug()
722 fairy = _ConnectionFairy(pool, dbapi_connection, rec, echo)
724 rec.fairy_ref = ref = weakref.ref(
725 fairy,
726 lambda ref: (
727 _finalize_fairy(
728 None, rec, pool, ref, echo, transaction_was_reset=False
729 )
730 if _finalize_fairy is not None
731 else None
732 ),
733 )
734 _strong_ref_connection_records[ref] = rec
735 if echo:
736 pool.logger.debug(
737 "Connection %r checked out from pool", dbapi_connection
738 )
739 return fairy
741 def _checkin_failed(
742 self, err: BaseException, _fairy_was_created: bool = True
743 ) -> None:
744 self.invalidate(e=err)
745 self.checkin(
746 _fairy_was_created=_fairy_was_created,
747 )
749 def checkin(self, _fairy_was_created: bool = True) -> None:
750 if self.fairy_ref is None and _fairy_was_created:
751 # _fairy_was_created is False for the initial get connection phase;
752 # meaning there was no _ConnectionFairy and we must unconditionally
753 # do a checkin.
754 #
755 # otherwise, if fairy_was_created==True, if fairy_ref is None here
756 # that means we were checked in already, so this looks like
757 # a double checkin.
758 util.warn("Double checkin attempted on %s" % self)
759 return
760 self.fairy_ref = None
761 connection = self.dbapi_connection
762 pool = self.__pool
763 while self.finalize_callback:
764 finalizer = self.finalize_callback.pop()
765 if connection is not None:
766 finalizer(connection)
767 if pool.dispatch.checkin:
768 pool.dispatch.checkin(connection, self)
770 pool._return_conn(self)
772 @property
773 def in_use(self) -> bool:
774 return self.fairy_ref is not None
776 @property
777 def last_connect_time(self) -> float:
778 return self.starttime
780 def close(self) -> None:
781 if self.dbapi_connection is not None:
782 self.__close()
784 def invalidate(
785 self, e: Optional[BaseException] = None, soft: bool = False
786 ) -> None:
787 # already invalidated
788 if self.dbapi_connection is None:
789 return
790 if soft:
791 self.__pool.dispatch.soft_invalidate(
792 self.dbapi_connection, self, e
793 )
794 else:
795 self.__pool.dispatch.invalidate(self.dbapi_connection, self, e)
796 if e is not None:
797 self.__pool.logger.info(
798 "%sInvalidate connection %r (reason: %s:%s)",
799 "Soft " if soft else "",
800 self.dbapi_connection,
801 e.__class__.__name__,
802 e,
803 )
804 else:
805 self.__pool.logger.info(
806 "%sInvalidate connection %r",
807 "Soft " if soft else "",
808 self.dbapi_connection,
809 )
811 if soft:
812 self._soft_invalidate_time = time.time()
813 else:
814 self.__close(terminate=True)
815 self.dbapi_connection = None
817 def get_connection(self) -> DBAPIConnection:
818 recycle = False
820 # NOTE: the various comparisons here are assuming that measurable time
821 # passes between these state changes. however, time.time() is not
822 # guaranteed to have sub-second precision. comparisons of
823 # "invalidation time" to "starttime" should perhaps use >= so that the
824 # state change can take place assuming no measurable time has passed,
825 # however this does not guarantee correct behavior here as if time
826 # continues to not pass, it will try to reconnect repeatedly until
827 # these timestamps diverge, so in that sense using > is safer. Per
828 # https://stackoverflow.com/a/1938096/34549, Windows time.time() may be
829 # within 16 milliseconds accuracy, so unit tests for connection
830 # invalidation need a sleep of at least this long between initial start
831 # time and invalidation for the logic below to work reliably.
833 if self.dbapi_connection is None:
834 self.info.clear()
835 self.__connect()
836 elif (
837 self.__pool._recycle > -1
838 and time.time() - self.starttime > self.__pool._recycle
839 ):
840 self.__pool.logger.info(
841 "Connection %r exceeded timeout; recycling",
842 self.dbapi_connection,
843 )
844 recycle = True
845 elif self.__pool._invalidate_time > self.starttime:
846 self.__pool.logger.info(
847 "Connection %r invalidated due to pool invalidation; "
848 + "recycling",
849 self.dbapi_connection,
850 )
851 recycle = True
852 elif self._soft_invalidate_time > self.starttime:
853 self.__pool.logger.info(
854 "Connection %r invalidated due to local soft invalidation; "
855 + "recycling",
856 self.dbapi_connection,
857 )
858 recycle = True
860 if recycle:
861 self.__close(terminate=True)
862 self.info.clear()
864 self.__connect()
866 assert self.dbapi_connection is not None
867 return self.dbapi_connection
869 def _is_hard_or_soft_invalidated(self) -> bool:
870 return (
871 self.dbapi_connection is None
872 or self.__pool._invalidate_time > self.starttime
873 or (self._soft_invalidate_time > self.starttime)
874 )
876 def __close(self, *, terminate: bool = False) -> None:
877 self.finalize_callback.clear()
878 if self.__pool.dispatch.close:
879 self.__pool.dispatch.close(self.dbapi_connection, self)
880 assert self.dbapi_connection is not None
881 self.__pool._close_connection(
882 self.dbapi_connection, terminate=terminate
883 )
884 self.dbapi_connection = None
886 def __connect(self) -> None:
887 pool = self.__pool
889 # ensure any existing connection is removed, so that if
890 # creator fails, this attribute stays None
891 self.dbapi_connection = None
892 try:
893 self.starttime = time.time()
894 self.dbapi_connection = connection = pool._invoke_creator(self)
895 pool.logger.debug("Created new connection %r", connection)
896 self.fresh = True
897 except BaseException as e:
898 with util.safe_reraise():
899 pool.logger.debug("Error on connect(): %s", e)
900 else:
901 # in SQLAlchemy 1.4 the first_connect event is not used by
902 # the engine, so this will usually not be set
903 if pool.dispatch.first_connect:
904 pool.dispatch.first_connect.for_modify(
905 pool.dispatch
906 ).exec_once_unless_exception(self.dbapi_connection, self)
908 # init of the dialect now takes place within the connect
909 # event, so ensure a mutex is used on the first run
910 pool.dispatch.connect.for_modify(
911 pool.dispatch
912 )._exec_w_sync_on_first_run(self.dbapi_connection, self)
915def _finalize_fairy(
916 dbapi_connection: Optional[DBAPIConnection],
917 connection_record: Optional[_ConnectionRecord],
918 pool: Pool,
919 ref: Optional[
920 weakref.ref[_ConnectionFairy]
921 ], # this is None when called directly, not by the gc
922 echo: Optional[log._EchoFlagType],
923 transaction_was_reset: bool = False,
924 fairy: Optional[_ConnectionFairy] = None,
925) -> None:
926 """Cleanup for a :class:`._ConnectionFairy` whether or not it's already
927 been garbage collected.
929 When using an async dialect no IO can happen here (without using
930 a dedicated thread), since this is called outside the greenlet
931 context and with an already running loop. In this case function
932 will only log a message and raise a warning.
933 """
935 is_gc_cleanup = ref is not None
937 if is_gc_cleanup:
938 assert ref is not None
939 _strong_ref_connection_records.pop(ref, None)
940 assert connection_record is not None
941 if connection_record.fairy_ref is not ref:
942 return
943 assert dbapi_connection is None
944 dbapi_connection = connection_record.dbapi_connection
946 elif fairy:
947 _strong_ref_connection_records.pop(weakref.ref(fairy), None)
949 # null pool is not _is_asyncio but can be used also with async dialects
950 dont_restore_gced = pool._dialect.is_async
952 if dont_restore_gced:
953 detach = connection_record is None or is_gc_cleanup
954 can_manipulate_connection = not is_gc_cleanup
955 can_close_or_terminate_connection = (
956 not pool._dialect.is_async or pool._dialect.has_terminate
957 )
958 requires_terminate_for_close = (
959 pool._dialect.is_async and pool._dialect.has_terminate
960 )
962 else:
963 detach = connection_record is None
964 can_manipulate_connection = can_close_or_terminate_connection = True
965 requires_terminate_for_close = False
967 if dbapi_connection is not None:
968 if connection_record and echo:
969 pool.logger.debug(
970 "Connection %r being returned to pool", dbapi_connection
971 )
973 try:
974 if not fairy:
975 assert connection_record is not None
976 fairy = _ConnectionFairy(
977 pool,
978 dbapi_connection,
979 connection_record,
980 echo,
981 )
982 assert fairy.dbapi_connection is dbapi_connection
984 fairy._reset(
985 pool,
986 transaction_was_reset=transaction_was_reset,
987 terminate_only=detach,
988 asyncio_safe=can_manipulate_connection,
989 )
991 if detach:
992 if connection_record:
993 fairy._pool = pool
994 fairy.detach()
996 if can_close_or_terminate_connection:
997 if pool.dispatch.close_detached:
998 pool.dispatch.close_detached(dbapi_connection)
1000 pool._close_connection(
1001 dbapi_connection,
1002 terminate=requires_terminate_for_close,
1003 )
1005 except BaseException as e:
1006 pool.logger.error(
1007 "Exception during reset or similar", exc_info=True
1008 )
1009 if connection_record:
1010 connection_record.invalidate(e=e)
1011 if not isinstance(e, Exception):
1012 raise
1013 finally:
1014 if detach and is_gc_cleanup and dont_restore_gced:
1015 message = (
1016 "The garbage collector is trying to clean up "
1017 f"non-checked-in connection {dbapi_connection!r}, "
1018 f"""which will be {
1019 'dropped, as it cannot be safely terminated'
1020 if not can_close_or_terminate_connection
1021 else 'terminated'
1022 }. """
1023 "Please ensure that SQLAlchemy pooled connections are "
1024 "returned to "
1025 "the pool explicitly, either by calling ``close()`` "
1026 "or by using appropriate context managers to manage "
1027 "their lifecycle."
1028 )
1029 pool.logger.error(message)
1030 util.warn(message)
1032 if connection_record and connection_record.fairy_ref is not None:
1033 connection_record.checkin()
1035 # give gc some help. See
1036 # test/engine/test_pool.py::PoolEventsTest::test_checkin_event_gc[True]
1037 # which actually started failing when pytest warnings plugin was
1038 # turned on, due to util.warn() above
1039 if fairy is not None:
1040 fairy.dbapi_connection = None # type: ignore
1041 fairy._connection_record = None
1042 del dbapi_connection
1043 del connection_record
1044 del fairy
1047# a dictionary of the _ConnectionFairy weakrefs to _ConnectionRecord, so that
1048# GC under pypy will call ConnectionFairy finalizers. linked directly to the
1049# weakref that will empty itself when collected so that it should not create
1050# any unmanaged memory references.
1051_strong_ref_connection_records: Dict[
1052 weakref.ref[_ConnectionFairy], _ConnectionRecord
1053] = {}
1056class PoolProxiedConnection(ManagesConnection):
1057 """A connection-like adapter for a :pep:`249` DBAPI connection, which
1058 includes additional methods specific to the :class:`.Pool` implementation.
1060 :class:`.PoolProxiedConnection` is the public-facing interface for the
1061 internal :class:`._ConnectionFairy` implementation object; users familiar
1062 with :class:`._ConnectionFairy` can consider this object to be equivalent.
1064 .. versionadded:: 2.0 :class:`.PoolProxiedConnection` provides the public-
1065 facing interface for the :class:`._ConnectionFairy` internal class.
1067 """
1069 __slots__ = ()
1071 if typing.TYPE_CHECKING:
1073 def commit(self) -> None: ...
1075 def cursor(self, *args: Any, **kwargs: Any) -> DBAPICursor: ...
1077 def rollback(self) -> None: ...
1079 def __getattr__(self, key: str) -> Any: ...
1081 @property
1082 def is_valid(self) -> bool:
1083 """Return True if this :class:`.PoolProxiedConnection` still refers
1084 to an active DBAPI connection."""
1086 raise NotImplementedError()
1088 @property
1089 def is_detached(self) -> bool:
1090 """Return True if this :class:`.PoolProxiedConnection` is detached
1091 from its pool."""
1093 raise NotImplementedError()
1095 def detach(self) -> None:
1096 """Separate this connection from its Pool.
1098 This means that the connection will no longer be returned to the
1099 pool when closed, and will instead be literally closed. The
1100 associated :class:`.ConnectionPoolEntry` is de-associated from this
1101 DBAPI connection.
1103 Note that any overall connection limiting constraints imposed by a
1104 Pool implementation may be violated after a detach, as the detached
1105 connection is removed from the pool's knowledge and control.
1107 """
1109 raise NotImplementedError()
1111 def close(self) -> None:
1112 """Release this connection back to the pool.
1114 The :meth:`.PoolProxiedConnection.close` method shadows the
1115 :pep:`249` ``.close()`` method, altering its behavior to instead
1116 :term:`release` the proxied connection back to the connection pool.
1118 Upon release to the pool, whether the connection stays "opened" and
1119 pooled in the Python process, versus actually closed out and removed
1120 from the Python process, is based on the pool implementation in use and
1121 its configuration and current state.
1123 """
1124 raise NotImplementedError()
1126 def __enter__(self) -> Self:
1127 return self
1129 def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
1130 self.close()
1131 return None
1134class _AdhocProxiedConnection(PoolProxiedConnection):
1135 """provides the :class:`.PoolProxiedConnection` interface for cases where
1136 the DBAPI connection is not actually proxied.
1138 This is used by the engine internals to pass a consistent
1139 :class:`.PoolProxiedConnection` object to consuming dialects in response to
1140 pool events that may not always have the :class:`._ConnectionFairy`
1141 available.
1143 """
1145 __slots__ = ("dbapi_connection", "_connection_record", "_is_valid")
1147 dbapi_connection: DBAPIConnection
1148 _connection_record: ConnectionPoolEntry
1150 def __init__(
1151 self,
1152 dbapi_connection: DBAPIConnection,
1153 connection_record: ConnectionPoolEntry,
1154 ):
1155 self.dbapi_connection = dbapi_connection
1156 self._connection_record = connection_record
1157 self._is_valid = True
1159 @property
1160 def driver_connection(self) -> Any: # type: ignore[override] # mypy#4125
1161 return self._connection_record.driver_connection
1163 @property
1164 def connection(self) -> DBAPIConnection:
1165 return self.dbapi_connection
1167 @property
1168 def is_valid(self) -> bool:
1169 """Implement is_valid state attribute.
1171 for the adhoc proxied connection it's assumed the connection is valid
1172 as there is no "invalidate" routine.
1174 """
1175 return self._is_valid
1177 def invalidate(
1178 self, e: Optional[BaseException] = None, soft: bool = False
1179 ) -> None:
1180 self._is_valid = False
1182 @util.ro_non_memoized_property
1183 def record_info(self) -> Optional[_InfoType]:
1184 return self._connection_record.record_info
1186 def cursor(self, *args: Any, **kwargs: Any) -> DBAPICursor:
1187 return self.dbapi_connection.cursor(*args, **kwargs)
1189 def __getattr__(self, key: Any) -> Any:
1190 return getattr(self.dbapi_connection, key)
1193class _ConnectionFairy(PoolProxiedConnection):
1194 """Proxies a DBAPI connection and provides return-on-dereference
1195 support.
1197 This is an internal object used by the :class:`_pool.Pool` implementation
1198 to provide context management to a DBAPI connection delivered by
1199 that :class:`_pool.Pool`. The public facing interface for this class
1200 is described by the :class:`.PoolProxiedConnection` class. See that
1201 class for public API details.
1203 The name "fairy" is inspired by the fact that the
1204 :class:`._ConnectionFairy` object's lifespan is transitory, as it lasts
1205 only for the length of a specific DBAPI connection being checked out from
1206 the pool, and additionally that as a transparent proxy, it is mostly
1207 invisible.
1209 .. seealso::
1211 :class:`.PoolProxiedConnection`
1213 :class:`.ConnectionPoolEntry`
1216 """
1218 __slots__ = (
1219 "dbapi_connection",
1220 "_connection_record",
1221 "_echo",
1222 "_pool",
1223 "_counter",
1224 "__weakref__",
1225 "__dict__",
1226 )
1228 pool: Pool
1229 dbapi_connection: DBAPIConnection
1230 _echo: log._EchoFlagType
1232 def __init__(
1233 self,
1234 pool: Pool,
1235 dbapi_connection: DBAPIConnection,
1236 connection_record: _ConnectionRecord,
1237 echo: log._EchoFlagType,
1238 ):
1239 self._pool = pool
1240 self._counter = 0
1241 self.dbapi_connection = dbapi_connection
1242 self._connection_record = connection_record
1243 self._echo = echo
1245 _connection_record: Optional[_ConnectionRecord]
1247 @property
1248 def driver_connection(self) -> Optional[Any]: # type: ignore[override] # mypy#4125 # noqa: E501
1249 if self._connection_record is None:
1250 return None
1251 return self._connection_record.driver_connection
1253 @property
1254 @util.deprecated(
1255 "2.0",
1256 "The _ConnectionFairy.connection attribute is deprecated; "
1257 "please use 'driver_connection'",
1258 )
1259 def connection(self) -> DBAPIConnection:
1260 return self.dbapi_connection
1262 @classmethod
1263 def _checkout(
1264 cls,
1265 pool: Pool,
1266 threadconns: Optional[threading.local] = None,
1267 fairy: Optional[_ConnectionFairy] = None,
1268 ) -> _ConnectionFairy:
1269 if not fairy:
1270 fairy = _ConnectionRecord.checkout(pool)
1272 if threadconns is not None:
1273 threadconns.current = weakref.ref(fairy)
1275 assert (
1276 fairy._connection_record is not None
1277 ), "can't 'checkout' a detached connection fairy"
1278 assert (
1279 fairy.dbapi_connection is not None
1280 ), "can't 'checkout' an invalidated connection fairy"
1282 fairy._counter += 1
1283 if (
1284 not pool.dispatch.checkout and not pool._pre_ping
1285 ) or fairy._counter != 1:
1286 return fairy
1288 # Pool listeners can trigger a reconnection on checkout, as well
1289 # as the pre-pinger.
1290 # there are three attempts made here, but note that if the database
1291 # is not accessible from a connection standpoint, those won't proceed
1292 # here.
1294 attempts = 2
1296 while attempts > 0:
1297 connection_is_fresh = fairy._connection_record.fresh
1298 fairy._connection_record.fresh = False
1299 try:
1300 if pool._pre_ping:
1301 if not connection_is_fresh:
1302 if fairy._echo:
1303 pool.logger.debug(
1304 "Pool pre-ping on connection %s",
1305 fairy.dbapi_connection,
1306 )
1307 result = pool._dialect._do_ping_w_event(
1308 fairy.dbapi_connection
1309 )
1310 if not result:
1311 if fairy._echo:
1312 pool.logger.debug(
1313 "Pool pre-ping on connection %s failed, "
1314 "will invalidate pool",
1315 fairy.dbapi_connection,
1316 )
1317 raise exc.InvalidatePoolError()
1318 elif fairy._echo:
1319 pool.logger.debug(
1320 "Connection %s is fresh, skipping pre-ping",
1321 fairy.dbapi_connection,
1322 )
1324 pool.dispatch.checkout(
1325 fairy.dbapi_connection, fairy._connection_record, fairy
1326 )
1327 return fairy
1328 except exc.DisconnectionError as e:
1329 if e.invalidate_pool:
1330 pool.logger.info(
1331 "Disconnection detected on checkout, "
1332 "invalidating all pooled connections prior to "
1333 "current timestamp (reason: %r)",
1334 e,
1335 )
1336 fairy._connection_record.invalidate(e)
1337 pool._invalidate(fairy, e, _checkin=False)
1338 else:
1339 pool.logger.info(
1340 "Disconnection detected on checkout, "
1341 "invalidating individual connection %s (reason: %r)",
1342 fairy.dbapi_connection,
1343 e,
1344 )
1345 fairy._connection_record.invalidate(e)
1346 try:
1347 fairy.dbapi_connection = (
1348 fairy._connection_record.get_connection()
1349 )
1350 except BaseException as err:
1351 with util.safe_reraise():
1352 fairy._connection_record._checkin_failed(
1353 err,
1354 _fairy_was_created=True,
1355 )
1357 # prevent _ConnectionFairy from being carried
1358 # in the stack trace. Do this after the
1359 # connection record has been checked in, so that
1360 # if the del triggers a finalize fairy, it won't
1361 # try to checkin a second time.
1362 del fairy
1364 # never called, this is for code linters
1365 raise
1367 attempts -= 1
1368 except BaseException as be_outer:
1369 with util.safe_reraise():
1370 rec = fairy._connection_record
1371 if rec is not None:
1372 rec._checkin_failed(
1373 be_outer,
1374 _fairy_was_created=True,
1375 )
1377 # prevent _ConnectionFairy from being carried
1378 # in the stack trace, see above
1379 del fairy
1381 # never called, this is for code linters
1382 raise
1384 pool.logger.info("Reconnection attempts exhausted on checkout")
1385 fairy.invalidate()
1386 raise exc.InvalidRequestError("This connection is closed")
1388 def _checkout_existing(self) -> _ConnectionFairy:
1389 return _ConnectionFairy._checkout(self._pool, fairy=self)
1391 def _checkin(self, transaction_was_reset: bool = False) -> None:
1392 _finalize_fairy(
1393 self.dbapi_connection,
1394 self._connection_record,
1395 self._pool,
1396 None,
1397 self._echo,
1398 transaction_was_reset=transaction_was_reset,
1399 fairy=self,
1400 )
1402 def _close(self) -> None:
1403 self._checkin()
1405 def _reset(
1406 self,
1407 pool: Pool,
1408 transaction_was_reset: bool,
1409 terminate_only: bool,
1410 asyncio_safe: bool,
1411 ) -> None:
1412 if pool.dispatch.reset:
1413 pool.dispatch.reset(
1414 self.dbapi_connection,
1415 self._connection_record,
1416 PoolResetState(
1417 transaction_was_reset=transaction_was_reset,
1418 terminate_only=terminate_only,
1419 asyncio_safe=asyncio_safe,
1420 ),
1421 )
1423 if not asyncio_safe:
1424 return
1426 if pool._reset_on_return is reset_rollback:
1427 if transaction_was_reset:
1428 if self._echo:
1429 pool.logger.debug(
1430 "Connection %s reset, transaction already reset",
1431 self.dbapi_connection,
1432 )
1433 else:
1434 if self._echo:
1435 pool.logger.debug(
1436 "Connection %s rollback-on-return",
1437 self.dbapi_connection,
1438 )
1439 pool._dialect.do_rollback(self)
1440 elif pool._reset_on_return is reset_commit:
1441 if self._echo:
1442 pool.logger.debug(
1443 "Connection %s commit-on-return",
1444 self.dbapi_connection,
1445 )
1446 pool._dialect.do_commit(self)
1448 @property
1449 def _logger(self) -> log._IdentifiedLoggerType:
1450 return self._pool.logger
1452 @property
1453 def is_valid(self) -> bool:
1454 return self.dbapi_connection is not None
1456 @property
1457 def is_detached(self) -> bool:
1458 return self._connection_record is None
1460 @util.ro_memoized_property
1461 def info(self) -> _InfoType:
1462 if self._connection_record is None:
1463 return {}
1464 else:
1465 return self._connection_record.info
1467 @util.ro_non_memoized_property
1468 def record_info(self) -> Optional[_InfoType]:
1469 if self._connection_record is None:
1470 return None
1471 else:
1472 return self._connection_record.record_info
1474 def invalidate(
1475 self, e: Optional[BaseException] = None, soft: bool = False
1476 ) -> None:
1477 if self.dbapi_connection is None:
1478 util.warn("Can't invalidate an already-closed connection.")
1479 return
1480 if self._connection_record:
1481 self._connection_record.invalidate(e=e, soft=soft)
1482 if not soft:
1483 # prevent any rollback / reset actions etc. on
1484 # the connection
1485 self.dbapi_connection = None # type: ignore
1487 # finalize
1488 self._checkin()
1490 def cursor(self, *args: Any, **kwargs: Any) -> DBAPICursor:
1491 assert self.dbapi_connection is not None
1492 return self.dbapi_connection.cursor(*args, **kwargs)
1494 def __getattr__(self, key: str) -> Any:
1495 return getattr(self.dbapi_connection, key)
1497 def detach(self) -> None:
1498 if self._connection_record is not None:
1499 rec = self._connection_record
1500 rec.fairy_ref = None
1501 rec.dbapi_connection = None
1502 # TODO: should this be _return_conn?
1503 self._pool._do_return_conn(self._connection_record)
1505 # can't get the descriptor assignment to work here
1506 # in pylance. mypy is OK w/ it
1507 self.info = self.info.copy() # type: ignore
1509 self._connection_record = None
1511 if self._pool.dispatch.detach:
1512 self._pool.dispatch.detach(self.dbapi_connection, rec)
1514 def close(self) -> None:
1515 self._counter -= 1
1516 if self._counter == 0:
1517 self._checkin()
1519 def _close_special(self, transaction_reset: bool = False) -> None:
1520 self._counter -= 1
1521 if self._counter == 0:
1522 self._checkin(transaction_was_reset=transaction_reset)