Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/pool/base.py: 58%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# pool/base.py
2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
9"""Base constructs for connection pools."""
11from __future__ import annotations
13from collections import deque
14import dataclasses
15from enum import Enum
16import threading
17import time
18import typing
19from typing import Any
20from typing import Callable
21from typing import cast
22from typing import Deque
23from typing import Dict
24from typing import List
25from typing import Optional
26from typing import Tuple
27from typing import TYPE_CHECKING
28from typing import Union
29import weakref
31from .. import event
32from .. import exc
33from .. import log
34from .. import util
35from ..util.typing import Literal
36from ..util.typing import Protocol
37from ..util.typing import Self
39if TYPE_CHECKING:
40 from ..engine.interfaces import DBAPIConnection
41 from ..engine.interfaces import DBAPICursor
42 from ..engine.interfaces import Dialect
43 from ..event import _DispatchCommon
44 from ..event import _ListenerFnType
45 from ..event import dispatcher
46 from ..sql._typing import _InfoType
49@dataclasses.dataclass(frozen=True)
50class PoolResetState:
51 """describes the state of a DBAPI connection as it is being passed to
52 the :meth:`.PoolEvents.reset` connection pool event.
54 .. versionadded:: 2.0.0b3
56 """
58 __slots__ = ("transaction_was_reset", "terminate_only", "asyncio_safe")
60 transaction_was_reset: bool
61 """Indicates if the transaction on the DBAPI connection was already
62 essentially "reset" back by the :class:`.Connection` object.
64 This boolean is True if the :class:`.Connection` had transactional
65 state present upon it, which was then not closed using the
66 :meth:`.Connection.rollback` or :meth:`.Connection.commit` method;
67 instead, the transaction was closed inline within the
68 :meth:`.Connection.close` method so is guaranteed to remain non-present
69 when this event is reached.
71 """
73 terminate_only: bool
74 """indicates if the connection is to be immediately terminated and
75 not checked in to the pool.
77 This occurs for connections that were invalidated, as well as asyncio
78 connections that were not cleanly handled by the calling code that
79 are instead being garbage collected. In the latter case,
80 operations can't be safely run on asyncio connections within garbage
81 collection as there is not necessarily an event loop present.
83 """
85 asyncio_safe: bool
86 """Indicates if the reset operation is occurring within a scope where
87 an enclosing event loop is expected to be present for asyncio applications.
89 Will be False in the case that the connection is being garbage collected.
91 """
94class ResetStyle(Enum):
95 """Describe options for "reset on return" behaviors."""
97 reset_rollback = 0
98 reset_commit = 1
99 reset_none = 2
102_ResetStyleArgType = Union[
103 ResetStyle,
104 Literal[True, None, False, "commit", "rollback"],
105]
106reset_rollback, reset_commit, reset_none = list(ResetStyle)
109class _ConnDialect:
110 """partial implementation of :class:`.Dialect`
111 which provides DBAPI connection methods.
113 When a :class:`_pool.Pool` is combined with an :class:`_engine.Engine`,
114 the :class:`_engine.Engine` replaces this with its own
115 :class:`.Dialect`.
117 """
119 is_async = False
120 has_terminate = False
122 def do_rollback(self, dbapi_connection: PoolProxiedConnection) -> None:
123 dbapi_connection.rollback()
125 def do_commit(self, dbapi_connection: PoolProxiedConnection) -> None:
126 dbapi_connection.commit()
128 def do_terminate(self, dbapi_connection: DBAPIConnection) -> None:
129 dbapi_connection.close()
131 def do_close(self, dbapi_connection: DBAPIConnection) -> None:
132 dbapi_connection.close()
134 def _do_ping_w_event(self, dbapi_connection: DBAPIConnection) -> bool:
135 raise NotImplementedError(
136 "The ping feature requires that a dialect is "
137 "passed to the connection pool."
138 )
140 def get_driver_connection(self, connection: DBAPIConnection) -> Any:
141 return connection
144class _AsyncConnDialect(_ConnDialect):
145 is_async = True
148class _CreatorFnType(Protocol):
149 def __call__(self) -> DBAPIConnection: ...
152class _CreatorWRecFnType(Protocol):
153 def __call__(self, rec: ConnectionPoolEntry) -> DBAPIConnection: ...
156class Pool(log.Identified, event.EventTarget):
157 """Abstract base class for connection pools."""
159 dispatch: dispatcher[Pool]
160 echo: log._EchoFlagType
162 _orig_logging_name: Optional[str]
163 _dialect: Union[_ConnDialect, Dialect] = _ConnDialect()
164 _creator_arg: Union[_CreatorFnType, _CreatorWRecFnType]
165 _invoke_creator: _CreatorWRecFnType
166 _invalidate_time: float
168 def __init__(
169 self,
170 creator: Union[_CreatorFnType, _CreatorWRecFnType],
171 recycle: int = -1,
172 echo: log._EchoFlagType = None,
173 logging_name: Optional[str] = None,
174 reset_on_return: _ResetStyleArgType = True,
175 events: Optional[List[Tuple[_ListenerFnType, str]]] = None,
176 dialect: Optional[Union[_ConnDialect, Dialect]] = None,
177 pre_ping: bool = False,
178 _dispatch: Optional[_DispatchCommon[Pool]] = None,
179 ):
180 """
181 Construct a Pool.
183 :param creator: a callable function that returns a DB-API
184 connection object. The function will be called with
185 parameters.
187 :param recycle: If set to a value other than -1, number of
188 seconds between connection recycling, which means upon
189 checkout, if this timeout is surpassed the connection will be
190 closed and replaced with a newly opened connection. Defaults to -1.
192 :param logging_name: String identifier which will be used within
193 the "name" field of logging records generated within the
194 "sqlalchemy.pool" logger. Defaults to a hexstring of the object's
195 id.
197 :param echo: if True, the connection pool will log
198 informational output such as when connections are invalidated
199 as well as when connections are recycled to the default log handler,
200 which defaults to ``sys.stdout`` for output.. If set to the string
201 ``"debug"``, the logging will include pool checkouts and checkins.
203 The :paramref:`_pool.Pool.echo` parameter can also be set from the
204 :func:`_sa.create_engine` call by using the
205 :paramref:`_sa.create_engine.echo_pool` parameter.
207 .. seealso::
209 :ref:`dbengine_logging` - further detail on how to configure
210 logging.
212 :param reset_on_return: Determine steps to take on
213 connections as they are returned to the pool, which were
214 not otherwise handled by a :class:`_engine.Connection`.
215 Available from :func:`_sa.create_engine` via the
216 :paramref:`_sa.create_engine.pool_reset_on_return` parameter.
218 :paramref:`_pool.Pool.reset_on_return` can have any of these values:
220 * ``"rollback"`` - call rollback() on the connection,
221 to release locks and transaction resources.
222 This is the default value. The vast majority
223 of use cases should leave this value set.
224 * ``"commit"`` - call commit() on the connection,
225 to release locks and transaction resources.
226 A commit here may be desirable for databases that
227 cache query plans if a commit is emitted,
228 such as Microsoft SQL Server. However, this
229 value is more dangerous than 'rollback' because
230 any data changes present on the transaction
231 are committed unconditionally.
232 * ``None`` - don't do anything on the connection.
233 This setting may be appropriate if the database / DBAPI
234 works in pure "autocommit" mode at all times, or if
235 a custom reset handler is established using the
236 :meth:`.PoolEvents.reset` event handler.
238 * ``True`` - same as 'rollback', this is here for
239 backwards compatibility.
240 * ``False`` - same as None, this is here for
241 backwards compatibility.
243 For further customization of reset on return, the
244 :meth:`.PoolEvents.reset` event hook may be used which can perform
245 any connection activity desired on reset.
247 .. seealso::
249 :ref:`pool_reset_on_return`
251 :meth:`.PoolEvents.reset`
253 :param events: a list of 2-tuples, each of the form
254 ``(callable, target)`` which will be passed to :func:`.event.listen`
255 upon construction. Provided here so that event listeners
256 can be assigned via :func:`_sa.create_engine` before dialect-level
257 listeners are applied.
259 :param dialect: a :class:`.Dialect` that will handle the job
260 of calling rollback(), close(), or commit() on DBAPI connections.
261 If omitted, a built-in "stub" dialect is used. Applications that
262 make use of :func:`_sa.create_engine` should not use this parameter
263 as it is handled by the engine creation strategy.
265 :param pre_ping: if True, the pool will emit a "ping" (typically
266 "SELECT 1", but is dialect-specific) on the connection
267 upon checkout, to test if the connection is alive or not. If not,
268 the connection is transparently re-connected and upon success, all
269 other pooled connections established prior to that timestamp are
270 invalidated. Requires that a dialect is passed as well to
271 interpret the disconnection error.
273 .. versionadded:: 1.2
275 """
276 if logging_name:
277 self.logging_name = self._orig_logging_name = logging_name
278 else:
279 self._orig_logging_name = None
281 log.instance_logger(self, echoflag=echo)
282 self._creator = creator
283 self._recycle = recycle
284 self._invalidate_time = 0
285 self._pre_ping = pre_ping
286 self._reset_on_return = util.parse_user_argument_for_enum(
287 reset_on_return,
288 {
289 ResetStyle.reset_rollback: ["rollback", True],
290 ResetStyle.reset_none: ["none", None, False],
291 ResetStyle.reset_commit: ["commit"],
292 },
293 "reset_on_return",
294 )
296 self.echo = echo
298 if _dispatch:
299 self.dispatch._update(_dispatch, only_propagate=False)
300 if dialect:
301 self._dialect = dialect
302 if events:
303 for fn, target in events:
304 event.listen(self, target, fn)
306 @util.hybridproperty
307 def _is_asyncio(self) -> bool:
308 return self._dialect.is_async
310 @property
311 def _creator(self) -> Union[_CreatorFnType, _CreatorWRecFnType]:
312 return self._creator_arg
314 @_creator.setter
315 def _creator(
316 self, creator: Union[_CreatorFnType, _CreatorWRecFnType]
317 ) -> None:
318 self._creator_arg = creator
320 # mypy seems to get super confused assigning functions to
321 # attributes
322 self._invoke_creator = self._should_wrap_creator(creator)
324 @_creator.deleter
325 def _creator(self) -> None:
326 # needed for mock testing
327 del self._creator_arg
328 del self._invoke_creator
330 def _should_wrap_creator(
331 self, creator: Union[_CreatorFnType, _CreatorWRecFnType]
332 ) -> _CreatorWRecFnType:
333 """Detect if creator accepts a single argument, or is sent
334 as a legacy style no-arg function.
336 """
338 try:
339 argspec = util.get_callable_argspec(self._creator, no_self=True)
340 except TypeError:
341 creator_fn = cast(_CreatorFnType, creator)
342 return lambda rec: creator_fn()
344 if argspec.defaults is not None:
345 defaulted = len(argspec.defaults)
346 else:
347 defaulted = 0
348 positionals = len(argspec[0]) - defaulted
350 # look for the exact arg signature that DefaultStrategy
351 # sends us
352 if (argspec[0], argspec[3]) == (["connection_record"], (None,)):
353 return cast(_CreatorWRecFnType, creator)
354 # or just a single positional
355 elif positionals == 1:
356 return cast(_CreatorWRecFnType, creator)
357 # all other cases, just wrap and assume legacy "creator" callable
358 # thing
359 else:
360 creator_fn = cast(_CreatorFnType, creator)
361 return lambda rec: creator_fn()
363 def _close_connection(
364 self, connection: DBAPIConnection, *, terminate: bool = False
365 ) -> None:
366 self.logger.debug(
367 "%s connection %r",
368 "Hard-closing" if terminate else "Closing",
369 connection,
370 )
371 try:
372 if terminate:
373 self._dialect.do_terminate(connection)
374 else:
375 self._dialect.do_close(connection)
376 except BaseException as e:
377 self.logger.error(
378 f"Exception {'terminating' if terminate else 'closing'} "
379 f"connection %r",
380 connection,
381 exc_info=True,
382 )
383 if not isinstance(e, Exception):
384 raise
386 def _create_connection(self) -> ConnectionPoolEntry:
387 """Called by subclasses to create a new ConnectionRecord."""
389 return _ConnectionRecord(self)
391 def _invalidate(
392 self,
393 connection: PoolProxiedConnection,
394 exception: Optional[BaseException] = None,
395 _checkin: bool = True,
396 ) -> None:
397 """Mark all connections established within the generation
398 of the given connection as invalidated.
400 If this pool's last invalidate time is before when the given
401 connection was created, update the timestamp til now. Otherwise,
402 no action is performed.
404 Connections with a start time prior to this pool's invalidation
405 time will be recycled upon next checkout.
406 """
407 rec = getattr(connection, "_connection_record", None)
408 if not rec or self._invalidate_time < rec.starttime:
409 self._invalidate_time = time.time()
410 if _checkin and getattr(connection, "is_valid", False):
411 connection.invalidate(exception)
413 def recreate(self) -> Pool:
414 """Return a new :class:`_pool.Pool`, of the same class as this one
415 and configured with identical creation arguments.
417 This method is used in conjunction with :meth:`dispose`
418 to close out an entire :class:`_pool.Pool` and create a new one in
419 its place.
421 """
423 raise NotImplementedError()
425 def dispose(self) -> None:
426 """Dispose of this pool.
428 This method leaves the possibility of checked-out connections
429 remaining open, as it only affects connections that are
430 idle in the pool.
432 .. seealso::
434 :meth:`Pool.recreate`
436 """
438 raise NotImplementedError()
440 def connect(self) -> PoolProxiedConnection:
441 """Return a DBAPI connection from the pool.
443 The connection is instrumented such that when its
444 ``close()`` method is called, the connection will be returned to
445 the pool.
447 """
448 return _ConnectionFairy._checkout(self)
450 def _return_conn(self, record: ConnectionPoolEntry) -> None:
451 """Given a _ConnectionRecord, return it to the :class:`_pool.Pool`.
453 This method is called when an instrumented DBAPI connection
454 has its ``close()`` method called.
456 """
457 self._do_return_conn(record)
459 def _do_get(self) -> ConnectionPoolEntry:
460 """Implementation for :meth:`get`, supplied by subclasses."""
462 raise NotImplementedError()
464 def _do_return_conn(self, record: ConnectionPoolEntry) -> None:
465 """Implementation for :meth:`return_conn`, supplied by subclasses."""
467 raise NotImplementedError()
469 def status(self) -> str:
470 """Returns a brief description of the state of this pool."""
471 raise NotImplementedError()
474class ManagesConnection:
475 """Common base for the two connection-management interfaces
476 :class:`.PoolProxiedConnection` and :class:`.ConnectionPoolEntry`.
478 These two objects are typically exposed in the public facing API
479 via the connection pool event hooks, documented at :class:`.PoolEvents`.
481 .. versionadded:: 2.0
483 """
485 __slots__ = ()
487 dbapi_connection: Optional[DBAPIConnection]
488 """A reference to the actual DBAPI connection being tracked.
490 This is a :pep:`249`-compliant object that for traditional sync-style
491 dialects is provided by the third-party
492 DBAPI implementation in use. For asyncio dialects, the implementation
493 is typically an adapter object provided by the SQLAlchemy dialect
494 itself; the underlying asyncio object is available via the
495 :attr:`.ManagesConnection.driver_connection` attribute.
497 SQLAlchemy's interface for the DBAPI connection is based on the
498 :class:`.DBAPIConnection` protocol object
500 .. seealso::
502 :attr:`.ManagesConnection.driver_connection`
504 :ref:`faq_dbapi_connection`
506 """
508 driver_connection: Optional[Any]
509 """The "driver level" connection object as used by the Python
510 DBAPI or database driver.
512 For traditional :pep:`249` DBAPI implementations, this object will
513 be the same object as that of
514 :attr:`.ManagesConnection.dbapi_connection`. For an asyncio database
515 driver, this will be the ultimate "connection" object used by that
516 driver, such as the ``asyncpg.Connection`` object which will not have
517 standard pep-249 methods.
519 .. versionadded:: 1.4.24
521 .. seealso::
523 :attr:`.ManagesConnection.dbapi_connection`
525 :ref:`faq_dbapi_connection`
527 """
529 @util.ro_memoized_property
530 def info(self) -> _InfoType:
531 """Info dictionary associated with the underlying DBAPI connection
532 referred to by this :class:`.ManagesConnection` instance, allowing
533 user-defined data to be associated with the connection.
535 The data in this dictionary is persistent for the lifespan
536 of the DBAPI connection itself, including across pool checkins
537 and checkouts. When the connection is invalidated
538 and replaced with a new one, this dictionary is cleared.
540 For a :class:`.PoolProxiedConnection` instance that's not associated
541 with a :class:`.ConnectionPoolEntry`, such as if it were detached, the
542 attribute returns a dictionary that is local to that
543 :class:`.ConnectionPoolEntry`. Therefore the
544 :attr:`.ManagesConnection.info` attribute will always provide a Python
545 dictionary.
547 .. seealso::
549 :attr:`.ManagesConnection.record_info`
552 """
553 raise NotImplementedError()
555 @util.ro_memoized_property
556 def record_info(self) -> Optional[_InfoType]:
557 """Persistent info dictionary associated with this
558 :class:`.ManagesConnection`.
560 Unlike the :attr:`.ManagesConnection.info` dictionary, the lifespan
561 of this dictionary is that of the :class:`.ConnectionPoolEntry`
562 which owns it; therefore this dictionary will persist across
563 reconnects and connection invalidation for a particular entry
564 in the connection pool.
566 For a :class:`.PoolProxiedConnection` instance that's not associated
567 with a :class:`.ConnectionPoolEntry`, such as if it were detached, the
568 attribute returns None. Contrast to the :attr:`.ManagesConnection.info`
569 dictionary which is never None.
572 .. seealso::
574 :attr:`.ManagesConnection.info`
576 """
577 raise NotImplementedError()
579 def invalidate(
580 self, e: Optional[BaseException] = None, soft: bool = False
581 ) -> None:
582 """Mark the managed connection as invalidated.
584 :param e: an exception object indicating a reason for the invalidation.
586 :param soft: if True, the connection isn't closed; instead, this
587 connection will be recycled on next checkout.
589 .. seealso::
591 :ref:`pool_connection_invalidation`
594 """
595 raise NotImplementedError()
598class ConnectionPoolEntry(ManagesConnection):
599 """Interface for the object that maintains an individual database
600 connection on behalf of a :class:`_pool.Pool` instance.
602 The :class:`.ConnectionPoolEntry` object represents the long term
603 maintenance of a particular connection for a pool, including expiring or
604 invalidating that connection to have it replaced with a new one, which will
605 continue to be maintained by that same :class:`.ConnectionPoolEntry`
606 instance. Compared to :class:`.PoolProxiedConnection`, which is the
607 short-term, per-checkout connection manager, this object lasts for the
608 lifespan of a particular "slot" within a connection pool.
610 The :class:`.ConnectionPoolEntry` object is mostly visible to public-facing
611 API code when it is delivered to connection pool event hooks, such as
612 :meth:`_events.PoolEvents.connect` and :meth:`_events.PoolEvents.checkout`.
614 .. versionadded:: 2.0 :class:`.ConnectionPoolEntry` provides the public
615 facing interface for the :class:`._ConnectionRecord` internal class.
617 """
619 __slots__ = ()
621 @property
622 def in_use(self) -> bool:
623 """Return True the connection is currently checked out"""
625 raise NotImplementedError()
627 def close(self) -> None:
628 """Close the DBAPI connection managed by this connection pool entry."""
629 raise NotImplementedError()
632class _ConnectionRecord(ConnectionPoolEntry):
633 """Maintains a position in a connection pool which references a pooled
634 connection.
636 This is an internal object used by the :class:`_pool.Pool` implementation
637 to provide context management to a DBAPI connection maintained by
638 that :class:`_pool.Pool`. The public facing interface for this class
639 is described by the :class:`.ConnectionPoolEntry` class. See that
640 class for public API details.
642 .. seealso::
644 :class:`.ConnectionPoolEntry`
646 :class:`.PoolProxiedConnection`
648 """
650 __slots__ = (
651 "__pool",
652 "fairy_ref",
653 "finalize_callback",
654 "fresh",
655 "starttime",
656 "dbapi_connection",
657 "__weakref__",
658 "__dict__",
659 )
661 finalize_callback: Deque[Callable[[DBAPIConnection], None]]
662 fresh: bool
663 fairy_ref: Optional[weakref.ref[_ConnectionFairy]]
664 starttime: float
666 def __init__(self, pool: Pool, connect: bool = True):
667 self.fresh = False
668 self.fairy_ref = None
669 self.starttime = 0
670 self.dbapi_connection = None
672 self.__pool = pool
673 if connect:
674 self.__connect()
675 self.finalize_callback = deque()
677 dbapi_connection: Optional[DBAPIConnection]
679 @property
680 def driver_connection(self) -> Optional[Any]: # type: ignore[override] # mypy#4125 # noqa: E501
681 if self.dbapi_connection is None:
682 return None
683 else:
684 return self.__pool._dialect.get_driver_connection(
685 self.dbapi_connection
686 )
688 @property
689 @util.deprecated(
690 "2.0",
691 "The _ConnectionRecord.connection attribute is deprecated; "
692 "please use 'driver_connection'",
693 )
694 def connection(self) -> Optional[DBAPIConnection]:
695 return self.dbapi_connection
697 _soft_invalidate_time: float = 0
699 @util.ro_memoized_property
700 def info(self) -> _InfoType:
701 return {}
703 @util.ro_memoized_property
704 def record_info(self) -> Optional[_InfoType]:
705 return {}
707 @classmethod
708 def checkout(cls, pool: Pool) -> _ConnectionFairy:
709 if TYPE_CHECKING:
710 rec = cast(_ConnectionRecord, pool._do_get())
711 else:
712 rec = pool._do_get()
714 try:
715 dbapi_connection = rec.get_connection()
716 except BaseException as err:
717 with util.safe_reraise():
718 rec._checkin_failed(err, _fairy_was_created=False)
720 # not reached, for code linters only
721 raise
723 echo = pool._should_log_debug()
724 fairy = _ConnectionFairy(pool, dbapi_connection, rec, echo)
726 rec.fairy_ref = ref = weakref.ref(
727 fairy,
728 lambda ref: (
729 _finalize_fairy(
730 None, rec, pool, ref, echo, transaction_was_reset=False
731 )
732 if _finalize_fairy is not None
733 else None
734 ),
735 )
736 _strong_ref_connection_records[ref] = rec
737 if echo:
738 pool.logger.debug(
739 "Connection %r checked out from pool", dbapi_connection
740 )
741 return fairy
743 def _checkin_failed(
744 self, err: BaseException, _fairy_was_created: bool = True
745 ) -> None:
746 self.invalidate(e=err)
747 self.checkin(
748 _fairy_was_created=_fairy_was_created,
749 )
751 def checkin(self, _fairy_was_created: bool = True) -> None:
752 if self.fairy_ref is None and _fairy_was_created:
753 # _fairy_was_created is False for the initial get connection phase;
754 # meaning there was no _ConnectionFairy and we must unconditionally
755 # do a checkin.
756 #
757 # otherwise, if fairy_was_created==True, if fairy_ref is None here
758 # that means we were checked in already, so this looks like
759 # a double checkin.
760 util.warn("Double checkin attempted on %s" % self)
761 return
762 self.fairy_ref = None
763 connection = self.dbapi_connection
764 pool = self.__pool
765 while self.finalize_callback:
766 finalizer = self.finalize_callback.pop()
767 if connection is not None:
768 finalizer(connection)
769 if pool.dispatch.checkin:
770 pool.dispatch.checkin(connection, self)
772 pool._return_conn(self)
774 @property
775 def in_use(self) -> bool:
776 return self.fairy_ref is not None
778 @property
779 def last_connect_time(self) -> float:
780 return self.starttime
782 def close(self) -> None:
783 if self.dbapi_connection is not None:
784 self.__close()
786 def invalidate(
787 self, e: Optional[BaseException] = None, soft: bool = False
788 ) -> None:
789 # already invalidated
790 if self.dbapi_connection is None:
791 return
792 if soft:
793 self.__pool.dispatch.soft_invalidate(
794 self.dbapi_connection, self, e
795 )
796 else:
797 self.__pool.dispatch.invalidate(self.dbapi_connection, self, e)
798 if e is not None:
799 self.__pool.logger.info(
800 "%sInvalidate connection %r (reason: %s:%s)",
801 "Soft " if soft else "",
802 self.dbapi_connection,
803 e.__class__.__name__,
804 e,
805 )
806 else:
807 self.__pool.logger.info(
808 "%sInvalidate connection %r",
809 "Soft " if soft else "",
810 self.dbapi_connection,
811 )
813 if soft:
814 self._soft_invalidate_time = time.time()
815 else:
816 self.__close(terminate=True)
817 self.dbapi_connection = None
819 def get_connection(self) -> DBAPIConnection:
820 recycle = False
822 # NOTE: the various comparisons here are assuming that measurable time
823 # passes between these state changes. however, time.time() is not
824 # guaranteed to have sub-second precision. comparisons of
825 # "invalidation time" to "starttime" should perhaps use >= so that the
826 # state change can take place assuming no measurable time has passed,
827 # however this does not guarantee correct behavior here as if time
828 # continues to not pass, it will try to reconnect repeatedly until
829 # these timestamps diverge, so in that sense using > is safer. Per
830 # https://stackoverflow.com/a/1938096/34549, Windows time.time() may be
831 # within 16 milliseconds accuracy, so unit tests for connection
832 # invalidation need a sleep of at least this long between initial start
833 # time and invalidation for the logic below to work reliably.
835 if self.dbapi_connection is None:
836 self.info.clear()
837 self.__connect()
838 elif (
839 self.__pool._recycle > -1
840 and time.time() - self.starttime > self.__pool._recycle
841 ):
842 self.__pool.logger.info(
843 "Connection %r exceeded timeout; recycling",
844 self.dbapi_connection,
845 )
846 recycle = True
847 elif self.__pool._invalidate_time > self.starttime:
848 self.__pool.logger.info(
849 "Connection %r invalidated due to pool invalidation; "
850 + "recycling",
851 self.dbapi_connection,
852 )
853 recycle = True
854 elif self._soft_invalidate_time > self.starttime:
855 self.__pool.logger.info(
856 "Connection %r invalidated due to local soft invalidation; "
857 + "recycling",
858 self.dbapi_connection,
859 )
860 recycle = True
862 if recycle:
863 self.__close(terminate=True)
864 self.info.clear()
866 self.__connect()
868 assert self.dbapi_connection is not None
869 return self.dbapi_connection
871 def _is_hard_or_soft_invalidated(self) -> bool:
872 return (
873 self.dbapi_connection is None
874 or self.__pool._invalidate_time > self.starttime
875 or (self._soft_invalidate_time > self.starttime)
876 )
878 def __close(self, *, terminate: bool = False) -> None:
879 self.finalize_callback.clear()
880 if self.__pool.dispatch.close:
881 self.__pool.dispatch.close(self.dbapi_connection, self)
882 assert self.dbapi_connection is not None
883 self.__pool._close_connection(
884 self.dbapi_connection, terminate=terminate
885 )
886 self.dbapi_connection = None
888 def __connect(self) -> None:
889 pool = self.__pool
891 # ensure any existing connection is removed, so that if
892 # creator fails, this attribute stays None
893 self.dbapi_connection = None
894 try:
895 self.starttime = time.time()
896 self.dbapi_connection = connection = pool._invoke_creator(self)
897 pool.logger.debug("Created new connection %r", connection)
898 self.fresh = True
899 except BaseException as e:
900 with util.safe_reraise():
901 pool.logger.debug("Error on connect(): %s", e)
902 else:
903 # in SQLAlchemy 1.4 the first_connect event is not used by
904 # the engine, so this will usually not be set
905 if pool.dispatch.first_connect:
906 pool.dispatch.first_connect.for_modify(
907 pool.dispatch
908 ).exec_once_unless_exception(self.dbapi_connection, self)
910 # init of the dialect now takes place within the connect
911 # event, so ensure a mutex is used on the first run
912 pool.dispatch.connect.for_modify(
913 pool.dispatch
914 )._exec_w_sync_on_first_run(self.dbapi_connection, self)
917def _finalize_fairy(
918 dbapi_connection: Optional[DBAPIConnection],
919 connection_record: Optional[_ConnectionRecord],
920 pool: Pool,
921 ref: Optional[
922 weakref.ref[_ConnectionFairy]
923 ], # this is None when called directly, not by the gc
924 echo: Optional[log._EchoFlagType],
925 transaction_was_reset: bool = False,
926 fairy: Optional[_ConnectionFairy] = None,
927) -> None:
928 """Cleanup for a :class:`._ConnectionFairy` whether or not it's already
929 been garbage collected.
931 When using an async dialect no IO can happen here (without using
932 a dedicated thread), since this is called outside the greenlet
933 context and with an already running loop. In this case function
934 will only log a message and raise a warning.
935 """
937 is_gc_cleanup = ref is not None
939 if is_gc_cleanup:
940 assert ref is not None
941 _strong_ref_connection_records.pop(ref, None)
942 assert connection_record is not None
943 if connection_record.fairy_ref is not ref:
944 return
945 assert dbapi_connection is None
946 dbapi_connection = connection_record.dbapi_connection
948 elif fairy:
949 _strong_ref_connection_records.pop(weakref.ref(fairy), None)
951 # null pool is not _is_asyncio but can be used also with async dialects
952 dont_restore_gced = pool._dialect.is_async
954 if dont_restore_gced:
955 detach = connection_record is None or is_gc_cleanup
956 can_manipulate_connection = not is_gc_cleanup
957 can_close_or_terminate_connection = (
958 not pool._dialect.is_async or pool._dialect.has_terminate
959 )
960 requires_terminate_for_close = (
961 pool._dialect.is_async and pool._dialect.has_terminate
962 )
964 else:
965 detach = connection_record is None
966 can_manipulate_connection = can_close_or_terminate_connection = True
967 requires_terminate_for_close = False
969 if dbapi_connection is not None:
970 if connection_record and echo:
971 pool.logger.debug(
972 "Connection %r being returned to pool", dbapi_connection
973 )
975 try:
976 if not fairy:
977 assert connection_record is not None
978 fairy = _ConnectionFairy(
979 pool,
980 dbapi_connection,
981 connection_record,
982 echo,
983 )
984 assert fairy.dbapi_connection is dbapi_connection
986 fairy._reset(
987 pool,
988 transaction_was_reset=transaction_was_reset,
989 terminate_only=detach,
990 asyncio_safe=can_manipulate_connection,
991 )
993 if detach:
994 if connection_record:
995 fairy._pool = pool
996 fairy.detach()
998 if can_close_or_terminate_connection:
999 if pool.dispatch.close_detached:
1000 pool.dispatch.close_detached(dbapi_connection)
1002 pool._close_connection(
1003 dbapi_connection,
1004 terminate=requires_terminate_for_close,
1005 )
1007 except BaseException as e:
1008 pool.logger.error(
1009 "Exception during reset or similar", exc_info=True
1010 )
1011 if connection_record:
1012 connection_record.invalidate(e=e)
1013 if not isinstance(e, Exception):
1014 raise
1015 finally:
1016 if detach and is_gc_cleanup and dont_restore_gced:
1017 message = (
1018 "The garbage collector is trying to clean up "
1019 f"non-checked-in connection {dbapi_connection!r}, "
1020 f"""which will be {
1021 'dropped, as it cannot be safely terminated'
1022 if not can_close_or_terminate_connection
1023 else 'terminated'
1024 }. """
1025 "Please ensure that SQLAlchemy pooled connections are "
1026 "returned to "
1027 "the pool explicitly, either by calling ``close()`` "
1028 "or by using appropriate context managers to manage "
1029 "their lifecycle."
1030 )
1031 pool.logger.error(message)
1032 util.warn(message)
1034 if connection_record and connection_record.fairy_ref is not None:
1035 connection_record.checkin()
1037 # give gc some help. See
1038 # test/engine/test_pool.py::PoolEventsTest::test_checkin_event_gc[True]
1039 # which actually started failing when pytest warnings plugin was
1040 # turned on, due to util.warn() above
1041 if fairy is not None:
1042 fairy.dbapi_connection = None # type: ignore
1043 fairy._connection_record = None
1044 del dbapi_connection
1045 del connection_record
1046 del fairy
1049# a dictionary of the _ConnectionFairy weakrefs to _ConnectionRecord, so that
1050# GC under pypy will call ConnectionFairy finalizers. linked directly to the
1051# weakref that will empty itself when collected so that it should not create
1052# any unmanaged memory references.
1053_strong_ref_connection_records: Dict[
1054 weakref.ref[_ConnectionFairy], _ConnectionRecord
1055] = {}
1058class PoolProxiedConnection(ManagesConnection):
1059 """A connection-like adapter for a :pep:`249` DBAPI connection, which
1060 includes additional methods specific to the :class:`.Pool` implementation.
1062 :class:`.PoolProxiedConnection` is the public-facing interface for the
1063 internal :class:`._ConnectionFairy` implementation object; users familiar
1064 with :class:`._ConnectionFairy` can consider this object to be equivalent.
1066 .. versionadded:: 2.0 :class:`.PoolProxiedConnection` provides the public-
1067 facing interface for the :class:`._ConnectionFairy` internal class.
1069 """
1071 __slots__ = ()
1073 if typing.TYPE_CHECKING:
1075 def commit(self) -> None: ...
1077 def cursor(self, *args: Any, **kwargs: Any) -> DBAPICursor: ...
1079 def rollback(self) -> None: ...
1081 def __getattr__(self, key: str) -> Any: ...
1083 @property
1084 def is_valid(self) -> bool:
1085 """Return True if this :class:`.PoolProxiedConnection` still refers
1086 to an active DBAPI connection."""
1088 raise NotImplementedError()
1090 @property
1091 def is_detached(self) -> bool:
1092 """Return True if this :class:`.PoolProxiedConnection` is detached
1093 from its pool."""
1095 raise NotImplementedError()
1097 def detach(self) -> None:
1098 """Separate this connection from its Pool.
1100 This means that the connection will no longer be returned to the
1101 pool when closed, and will instead be literally closed. The
1102 associated :class:`.ConnectionPoolEntry` is de-associated from this
1103 DBAPI connection.
1105 Note that any overall connection limiting constraints imposed by a
1106 Pool implementation may be violated after a detach, as the detached
1107 connection is removed from the pool's knowledge and control.
1109 """
1111 raise NotImplementedError()
1113 def close(self) -> None:
1114 """Release this connection back to the pool.
1116 The :meth:`.PoolProxiedConnection.close` method shadows the
1117 :pep:`249` ``.close()`` method, altering its behavior to instead
1118 :term:`release` the proxied connection back to the connection pool.
1120 Upon release to the pool, whether the connection stays "opened" and
1121 pooled in the Python process, versus actually closed out and removed
1122 from the Python process, is based on the pool implementation in use and
1123 its configuration and current state.
1125 """
1126 raise NotImplementedError()
1128 def __enter__(self) -> Self:
1129 return self
1131 def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
1132 self.close()
1133 return None
1136class _AdhocProxiedConnection(PoolProxiedConnection):
1137 """provides the :class:`.PoolProxiedConnection` interface for cases where
1138 the DBAPI connection is not actually proxied.
1140 This is used by the engine internals to pass a consistent
1141 :class:`.PoolProxiedConnection` object to consuming dialects in response to
1142 pool events that may not always have the :class:`._ConnectionFairy`
1143 available.
1145 """
1147 __slots__ = ("dbapi_connection", "_connection_record", "_is_valid")
1149 dbapi_connection: DBAPIConnection
1150 _connection_record: ConnectionPoolEntry
1152 def __init__(
1153 self,
1154 dbapi_connection: DBAPIConnection,
1155 connection_record: ConnectionPoolEntry,
1156 ):
1157 self.dbapi_connection = dbapi_connection
1158 self._connection_record = connection_record
1159 self._is_valid = True
1161 @property
1162 def driver_connection(self) -> Any: # type: ignore[override] # mypy#4125
1163 return self._connection_record.driver_connection
1165 @property
1166 def connection(self) -> DBAPIConnection:
1167 return self.dbapi_connection
1169 @property
1170 def is_valid(self) -> bool:
1171 """Implement is_valid state attribute.
1173 for the adhoc proxied connection it's assumed the connection is valid
1174 as there is no "invalidate" routine.
1176 """
1177 return self._is_valid
1179 def invalidate(
1180 self, e: Optional[BaseException] = None, soft: bool = False
1181 ) -> None:
1182 self._is_valid = False
1184 @util.ro_non_memoized_property
1185 def record_info(self) -> Optional[_InfoType]:
1186 return self._connection_record.record_info
1188 def cursor(self, *args: Any, **kwargs: Any) -> DBAPICursor:
1189 return self.dbapi_connection.cursor(*args, **kwargs)
1191 def __getattr__(self, key: Any) -> Any:
1192 return getattr(self.dbapi_connection, key)
1195class _ConnectionFairy(PoolProxiedConnection):
1196 """Proxies a DBAPI connection and provides return-on-dereference
1197 support.
1199 This is an internal object used by the :class:`_pool.Pool` implementation
1200 to provide context management to a DBAPI connection delivered by
1201 that :class:`_pool.Pool`. The public facing interface for this class
1202 is described by the :class:`.PoolProxiedConnection` class. See that
1203 class for public API details.
1205 The name "fairy" is inspired by the fact that the
1206 :class:`._ConnectionFairy` object's lifespan is transitory, as it lasts
1207 only for the length of a specific DBAPI connection being checked out from
1208 the pool, and additionally that as a transparent proxy, it is mostly
1209 invisible.
1211 .. seealso::
1213 :class:`.PoolProxiedConnection`
1215 :class:`.ConnectionPoolEntry`
1218 """
1220 __slots__ = (
1221 "dbapi_connection",
1222 "_connection_record",
1223 "_echo",
1224 "_pool",
1225 "_counter",
1226 "__weakref__",
1227 "__dict__",
1228 )
1230 pool: Pool
1231 dbapi_connection: DBAPIConnection
1232 _echo: log._EchoFlagType
1234 def __init__(
1235 self,
1236 pool: Pool,
1237 dbapi_connection: DBAPIConnection,
1238 connection_record: _ConnectionRecord,
1239 echo: log._EchoFlagType,
1240 ):
1241 self._pool = pool
1242 self._counter = 0
1243 self.dbapi_connection = dbapi_connection
1244 self._connection_record = connection_record
1245 self._echo = echo
1247 _connection_record: Optional[_ConnectionRecord]
1249 @property
1250 def driver_connection(self) -> Optional[Any]: # type: ignore[override] # mypy#4125 # noqa: E501
1251 if self._connection_record is None:
1252 return None
1253 return self._connection_record.driver_connection
1255 @property
1256 @util.deprecated(
1257 "2.0",
1258 "The _ConnectionFairy.connection attribute is deprecated; "
1259 "please use 'driver_connection'",
1260 )
1261 def connection(self) -> DBAPIConnection:
1262 return self.dbapi_connection
1264 @classmethod
1265 def _checkout(
1266 cls,
1267 pool: Pool,
1268 threadconns: Optional[threading.local] = None,
1269 fairy: Optional[_ConnectionFairy] = None,
1270 ) -> _ConnectionFairy:
1271 if not fairy:
1272 fairy = _ConnectionRecord.checkout(pool)
1274 if threadconns is not None:
1275 threadconns.current = weakref.ref(fairy)
1277 assert (
1278 fairy._connection_record is not None
1279 ), "can't 'checkout' a detached connection fairy"
1280 assert (
1281 fairy.dbapi_connection is not None
1282 ), "can't 'checkout' an invalidated connection fairy"
1284 fairy._counter += 1
1285 if (
1286 not pool.dispatch.checkout and not pool._pre_ping
1287 ) or fairy._counter != 1:
1288 return fairy
1290 # Pool listeners can trigger a reconnection on checkout, as well
1291 # as the pre-pinger.
1292 # there are three attempts made here, but note that if the database
1293 # is not accessible from a connection standpoint, those won't proceed
1294 # here.
1296 attempts = 2
1298 while attempts > 0:
1299 connection_is_fresh = fairy._connection_record.fresh
1300 fairy._connection_record.fresh = False
1301 try:
1302 if pool._pre_ping:
1303 if not connection_is_fresh:
1304 if fairy._echo:
1305 pool.logger.debug(
1306 "Pool pre-ping on connection %s",
1307 fairy.dbapi_connection,
1308 )
1309 result = pool._dialect._do_ping_w_event(
1310 fairy.dbapi_connection
1311 )
1312 if not result:
1313 if fairy._echo:
1314 pool.logger.debug(
1315 "Pool pre-ping on connection %s failed, "
1316 "will invalidate pool",
1317 fairy.dbapi_connection,
1318 )
1319 raise exc.InvalidatePoolError()
1320 elif fairy._echo:
1321 pool.logger.debug(
1322 "Connection %s is fresh, skipping pre-ping",
1323 fairy.dbapi_connection,
1324 )
1326 pool.dispatch.checkout(
1327 fairy.dbapi_connection, fairy._connection_record, fairy
1328 )
1329 return fairy
1330 except exc.DisconnectionError as e:
1331 if e.invalidate_pool:
1332 pool.logger.info(
1333 "Disconnection detected on checkout, "
1334 "invalidating all pooled connections prior to "
1335 "current timestamp (reason: %r)",
1336 e,
1337 )
1338 fairy._connection_record.invalidate(e)
1339 pool._invalidate(fairy, e, _checkin=False)
1340 else:
1341 pool.logger.info(
1342 "Disconnection detected on checkout, "
1343 "invalidating individual connection %s (reason: %r)",
1344 fairy.dbapi_connection,
1345 e,
1346 )
1347 fairy._connection_record.invalidate(e)
1348 try:
1349 fairy.dbapi_connection = (
1350 fairy._connection_record.get_connection()
1351 )
1352 except BaseException as err:
1353 with util.safe_reraise():
1354 fairy._connection_record._checkin_failed(
1355 err,
1356 _fairy_was_created=True,
1357 )
1359 # prevent _ConnectionFairy from being carried
1360 # in the stack trace. Do this after the
1361 # connection record has been checked in, so that
1362 # if the del triggers a finalize fairy, it won't
1363 # try to checkin a second time.
1364 del fairy
1366 # never called, this is for code linters
1367 raise
1369 attempts -= 1
1370 except BaseException as be_outer:
1371 with util.safe_reraise():
1372 rec = fairy._connection_record
1373 if rec is not None:
1374 rec._checkin_failed(
1375 be_outer,
1376 _fairy_was_created=True,
1377 )
1379 # prevent _ConnectionFairy from being carried
1380 # in the stack trace, see above
1381 del fairy
1383 # never called, this is for code linters
1384 raise
1386 pool.logger.info("Reconnection attempts exhausted on checkout")
1387 fairy.invalidate()
1388 raise exc.InvalidRequestError("This connection is closed")
1390 def _checkout_existing(self) -> _ConnectionFairy:
1391 return _ConnectionFairy._checkout(self._pool, fairy=self)
1393 def _checkin(self, transaction_was_reset: bool = False) -> None:
1394 _finalize_fairy(
1395 self.dbapi_connection,
1396 self._connection_record,
1397 self._pool,
1398 None,
1399 self._echo,
1400 transaction_was_reset=transaction_was_reset,
1401 fairy=self,
1402 )
1404 def _close(self) -> None:
1405 self._checkin()
1407 def _reset(
1408 self,
1409 pool: Pool,
1410 transaction_was_reset: bool,
1411 terminate_only: bool,
1412 asyncio_safe: bool,
1413 ) -> None:
1414 if pool.dispatch.reset:
1415 pool.dispatch.reset(
1416 self.dbapi_connection,
1417 self._connection_record,
1418 PoolResetState(
1419 transaction_was_reset=transaction_was_reset,
1420 terminate_only=terminate_only,
1421 asyncio_safe=asyncio_safe,
1422 ),
1423 )
1425 if not asyncio_safe:
1426 return
1428 if pool._reset_on_return is reset_rollback:
1429 if transaction_was_reset:
1430 if self._echo:
1431 pool.logger.debug(
1432 "Connection %s reset, transaction already reset",
1433 self.dbapi_connection,
1434 )
1435 else:
1436 if self._echo:
1437 pool.logger.debug(
1438 "Connection %s rollback-on-return",
1439 self.dbapi_connection,
1440 )
1441 pool._dialect.do_rollback(self)
1442 elif pool._reset_on_return is reset_commit:
1443 if self._echo:
1444 pool.logger.debug(
1445 "Connection %s commit-on-return",
1446 self.dbapi_connection,
1447 )
1448 pool._dialect.do_commit(self)
1450 @property
1451 def _logger(self) -> log._IdentifiedLoggerType:
1452 return self._pool.logger
1454 @property
1455 def is_valid(self) -> bool:
1456 return self.dbapi_connection is not None
1458 @property
1459 def is_detached(self) -> bool:
1460 return self._connection_record is None
1462 @util.ro_memoized_property
1463 def info(self) -> _InfoType:
1464 if self._connection_record is None:
1465 return {}
1466 else:
1467 return self._connection_record.info
1469 @util.ro_non_memoized_property
1470 def record_info(self) -> Optional[_InfoType]:
1471 if self._connection_record is None:
1472 return None
1473 else:
1474 return self._connection_record.record_info
1476 def invalidate(
1477 self, e: Optional[BaseException] = None, soft: bool = False
1478 ) -> None:
1479 if self.dbapi_connection is None:
1480 util.warn("Can't invalidate an already-closed connection.")
1481 return
1482 if self._connection_record:
1483 self._connection_record.invalidate(e=e, soft=soft)
1484 if not soft:
1485 # prevent any rollback / reset actions etc. on
1486 # the connection
1487 self.dbapi_connection = None # type: ignore
1489 # finalize
1490 self._checkin()
1492 def cursor(self, *args: Any, **kwargs: Any) -> DBAPICursor:
1493 assert self.dbapi_connection is not None
1494 return self.dbapi_connection.cursor(*args, **kwargs)
1496 def __getattr__(self, key: str) -> Any:
1497 return getattr(self.dbapi_connection, key)
1499 def detach(self) -> None:
1500 if self._connection_record is not None:
1501 rec = self._connection_record
1502 rec.fairy_ref = None
1503 rec.dbapi_connection = None
1504 # TODO: should this be _return_conn?
1505 self._pool._do_return_conn(self._connection_record)
1507 # can't get the descriptor assignment to work here
1508 # in pylance. mypy is OK w/ it
1509 self.info = self.info.copy() # type: ignore
1511 self._connection_record = None
1513 if self._pool.dispatch.detach:
1514 self._pool.dispatch.detach(self.dbapi_connection, rec)
1516 def close(self) -> None:
1517 self._counter -= 1
1518 if self._counter == 0:
1519 self._checkin()
1521 def _close_special(self, transaction_reset: bool = False) -> None:
1522 self._counter -= 1
1523 if self._counter == 0:
1524 self._checkin(transaction_was_reset=transaction_reset)