Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py: 57%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# pool/base.py
2# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
9"""Base constructs for connection pools.
11"""
13from __future__ import annotations
15from collections import deque
16import dataclasses
17from enum import Enum
18import threading
19import time
20import typing
21from typing import Any
22from typing import Callable
23from typing import cast
24from typing import Deque
25from typing import Dict
26from typing import List
27from typing import Optional
28from typing import Protocol
29from typing import Tuple
30from typing import TYPE_CHECKING
31from typing import Union
32import weakref
34from .. import event
35from .. import exc
36from .. import log
37from .. import util
38from ..util.typing import Literal
40if TYPE_CHECKING:
41 from ..engine.interfaces import DBAPIConnection
42 from ..engine.interfaces import DBAPICursor
43 from ..engine.interfaces import Dialect
44 from ..event import _DispatchCommon
45 from ..event import _ListenerFnType
46 from ..event import dispatcher
47 from ..sql._typing import _InfoType
50@dataclasses.dataclass(frozen=True)
51class PoolResetState:
52 """describes the state of a DBAPI connection as it is being passed to
53 the :meth:`.PoolEvents.reset` connection pool event.
55 .. versionadded:: 2.0.0b3
57 """
59 __slots__ = ("transaction_was_reset", "terminate_only", "asyncio_safe")
61 transaction_was_reset: bool
62 """Indicates if the transaction on the DBAPI connection was already
63 essentially "reset" back by the :class:`.Connection` object.
65 This boolean is True if the :class:`.Connection` had transactional
66 state present upon it, which was then not closed using the
67 :meth:`.Connection.rollback` or :meth:`.Connection.commit` method;
68 instead, the transaction was closed inline within the
69 :meth:`.Connection.close` method so is guaranteed to remain non-present
70 when this event is reached.
72 """
74 terminate_only: bool
75 """indicates if the connection is to be immediately terminated and
76 not checked in to the pool.
78 This occurs for connections that were invalidated, as well as asyncio
79 connections that were not cleanly handled by the calling code that
80 are instead being garbage collected. In the latter case,
81 operations can't be safely run on asyncio connections within garbage
82 collection as there is not necessarily an event loop present.
84 """
86 asyncio_safe: bool
87 """Indicates if the reset operation is occurring within a scope where
88 an enclosing event loop is expected to be present for asyncio applications.
90 Will be False in the case that the connection is being garbage collected.
92 """
95class ResetStyle(Enum):
96 """Describe options for "reset on return" behaviors."""
98 reset_rollback = 0
99 reset_commit = 1
100 reset_none = 2
103_ResetStyleArgType = Union[
104 ResetStyle,
105 Literal[True, None, False, "commit", "rollback"],
106]
107reset_rollback, reset_commit, reset_none = list(ResetStyle)
110class _ConnDialect:
111 """partial implementation of :class:`.Dialect`
112 which provides DBAPI connection methods.
114 When a :class:`_pool.Pool` is combined with an :class:`_engine.Engine`,
115 the :class:`_engine.Engine` replaces this with its own
116 :class:`.Dialect`.
118 """
120 is_async = False
121 has_terminate = False
123 def do_rollback(self, dbapi_connection: PoolProxiedConnection) -> None:
124 dbapi_connection.rollback()
126 def do_commit(self, dbapi_connection: PoolProxiedConnection) -> None:
127 dbapi_connection.commit()
129 def do_terminate(self, dbapi_connection: DBAPIConnection) -> None:
130 dbapi_connection.close()
132 def do_close(self, dbapi_connection: DBAPIConnection) -> None:
133 dbapi_connection.close()
135 def _do_ping_w_event(self, dbapi_connection: DBAPIConnection) -> bool:
136 raise NotImplementedError(
137 "The ping feature requires that a dialect is "
138 "passed to the connection pool."
139 )
141 def get_driver_connection(self, connection: DBAPIConnection) -> Any:
142 return connection
145class _AsyncConnDialect(_ConnDialect):
146 is_async = True
149class _CreatorFnType(Protocol):
150 def __call__(self) -> DBAPIConnection: ...
153class _CreatorWRecFnType(Protocol):
154 def __call__(self, rec: ConnectionPoolEntry) -> DBAPIConnection: ...
157class Pool(log.Identified, event.EventTarget):
158 """Abstract base class for connection pools."""
160 dispatch: dispatcher[Pool]
161 echo: log._EchoFlagType
163 _orig_logging_name: Optional[str]
164 _dialect: Union[_ConnDialect, Dialect] = _ConnDialect()
165 _creator_arg: Union[_CreatorFnType, _CreatorWRecFnType]
166 _invoke_creator: _CreatorWRecFnType
167 _invalidate_time: float
169 def __init__(
170 self,
171 creator: Union[_CreatorFnType, _CreatorWRecFnType],
172 recycle: int = -1,
173 echo: log._EchoFlagType = None,
174 logging_name: Optional[str] = None,
175 reset_on_return: _ResetStyleArgType = True,
176 events: Optional[List[Tuple[_ListenerFnType, str]]] = None,
177 dialect: Optional[Union[_ConnDialect, Dialect]] = None,
178 pre_ping: bool = False,
179 _dispatch: Optional[_DispatchCommon[Pool]] = None,
180 ):
181 """
182 Construct a Pool.
184 :param creator: a callable function that returns a DB-API
185 connection object. The function will be called with
186 parameters.
188 :param recycle: If set to a value other than -1, number of
189 seconds between connection recycling, which means upon
190 checkout, if this timeout is surpassed the connection will be
191 closed and replaced with a newly opened connection. Defaults to -1.
193 :param logging_name: String identifier which will be used within
194 the "name" field of logging records generated within the
195 "sqlalchemy.pool" logger. Defaults to a hexstring of the object's
196 id.
198 :param echo: if True, the connection pool will log
199 informational output such as when connections are invalidated
200 as well as when connections are recycled to the default log handler,
201 which defaults to ``sys.stdout`` for output.. If set to the string
202 ``"debug"``, the logging will include pool checkouts and checkins.
204 The :paramref:`_pool.Pool.echo` parameter can also be set from the
205 :func:`_sa.create_engine` call by using the
206 :paramref:`_sa.create_engine.echo_pool` parameter.
208 .. seealso::
210 :ref:`dbengine_logging` - further detail on how to configure
211 logging.
213 :param reset_on_return: Determine steps to take on
214 connections as they are returned to the pool, which were
215 not otherwise handled by a :class:`_engine.Connection`.
216 Available from :func:`_sa.create_engine` via the
217 :paramref:`_sa.create_engine.pool_reset_on_return` parameter.
219 :paramref:`_pool.Pool.reset_on_return` can have any of these values:
221 * ``"rollback"`` - call rollback() on the connection,
222 to release locks and transaction resources.
223 This is the default value. The vast majority
224 of use cases should leave this value set.
225 * ``"commit"`` - call commit() on the connection,
226 to release locks and transaction resources.
227 A commit here may be desirable for databases that
228 cache query plans if a commit is emitted,
229 such as Microsoft SQL Server. However, this
230 value is more dangerous than 'rollback' because
231 any data changes present on the transaction
232 are committed unconditionally.
233 * ``None`` - don't do anything on the connection.
234 This setting may be appropriate if the database / DBAPI
235 works in pure "autocommit" mode at all times, or if
236 a custom reset handler is established using the
237 :meth:`.PoolEvents.reset` event handler.
239 * ``True`` - same as 'rollback', this is here for
240 backwards compatibility.
241 * ``False`` - same as None, this is here for
242 backwards compatibility.
244 For further customization of reset on return, the
245 :meth:`.PoolEvents.reset` event hook may be used which can perform
246 any connection activity desired on reset.
248 .. seealso::
250 :ref:`pool_reset_on_return`
252 :meth:`.PoolEvents.reset`
254 :param events: a list of 2-tuples, each of the form
255 ``(callable, target)`` which will be passed to :func:`.event.listen`
256 upon construction. Provided here so that event listeners
257 can be assigned via :func:`_sa.create_engine` before dialect-level
258 listeners are applied.
260 :param dialect: a :class:`.Dialect` that will handle the job
261 of calling rollback(), close(), or commit() on DBAPI connections.
262 If omitted, a built-in "stub" dialect is used. Applications that
263 make use of :func:`_sa.create_engine` should not use this parameter
264 as it is handled by the engine creation strategy.
266 :param pre_ping: if True, the pool will emit a "ping" (typically
267 "SELECT 1", but is dialect-specific) on the connection
268 upon checkout, to test if the connection is alive or not. If not,
269 the connection is transparently re-connected and upon success, all
270 other pooled connections established prior to that timestamp are
271 invalidated. Requires that a dialect is passed as well to
272 interpret the disconnection error.
274 .. versionadded:: 1.2
276 """
277 if logging_name:
278 self.logging_name = self._orig_logging_name = logging_name
279 else:
280 self._orig_logging_name = None
282 log.instance_logger(self, echoflag=echo)
283 self._creator = creator
284 self._recycle = recycle
285 self._invalidate_time = 0
286 self._pre_ping = pre_ping
287 self._reset_on_return = util.parse_user_argument_for_enum(
288 reset_on_return,
289 {
290 ResetStyle.reset_rollback: ["rollback", True],
291 ResetStyle.reset_none: ["none", None, False],
292 ResetStyle.reset_commit: ["commit"],
293 },
294 "reset_on_return",
295 )
297 self.echo = echo
299 if _dispatch:
300 self.dispatch._update(_dispatch, only_propagate=False)
301 if dialect:
302 self._dialect = dialect
303 if events:
304 for fn, target in events:
305 event.listen(self, target, fn)
307 @util.hybridproperty
308 def _is_asyncio(self) -> bool:
309 return self._dialect.is_async
311 @property
312 def _creator(self) -> Union[_CreatorFnType, _CreatorWRecFnType]:
313 return self._creator_arg
315 @_creator.setter
316 def _creator(
317 self, creator: Union[_CreatorFnType, _CreatorWRecFnType]
318 ) -> None:
319 self._creator_arg = creator
321 # mypy seems to get super confused assigning functions to
322 # attributes
323 self._invoke_creator = self._should_wrap_creator(creator)
325 @_creator.deleter
326 def _creator(self) -> None:
327 # needed for mock testing
328 del self._creator_arg
329 del self._invoke_creator
331 def _should_wrap_creator(
332 self, creator: Union[_CreatorFnType, _CreatorWRecFnType]
333 ) -> _CreatorWRecFnType:
334 """Detect if creator accepts a single argument, or is sent
335 as a legacy style no-arg function.
337 """
339 try:
340 argspec = util.get_callable_argspec(self._creator, no_self=True)
341 except TypeError:
342 creator_fn = cast(_CreatorFnType, creator)
343 return lambda rec: creator_fn()
345 if argspec.defaults is not None:
346 defaulted = len(argspec.defaults)
347 else:
348 defaulted = 0
349 positionals = len(argspec[0]) - defaulted
351 # look for the exact arg signature that DefaultStrategy
352 # sends us
353 if (argspec[0], argspec[3]) == (["connection_record"], (None,)):
354 return cast(_CreatorWRecFnType, creator)
355 # or just a single positional
356 elif positionals == 1:
357 return cast(_CreatorWRecFnType, creator)
358 # all other cases, just wrap and assume legacy "creator" callable
359 # thing
360 else:
361 creator_fn = cast(_CreatorFnType, creator)
362 return lambda rec: creator_fn()
364 def _close_connection(
365 self, connection: DBAPIConnection, *, terminate: bool = False
366 ) -> None:
367 self.logger.debug(
368 "%s connection %r",
369 "Hard-closing" if terminate else "Closing",
370 connection,
371 )
372 try:
373 if terminate:
374 self._dialect.do_terminate(connection)
375 else:
376 self._dialect.do_close(connection)
377 except BaseException as e:
378 self.logger.error(
379 f"Exception {'terminating' if terminate else 'closing'} "
380 f"connection %r",
381 connection,
382 exc_info=True,
383 )
384 if not isinstance(e, Exception):
385 raise
387 def _create_connection(self) -> ConnectionPoolEntry:
388 """Called by subclasses to create a new ConnectionRecord."""
390 return _ConnectionRecord(self)
392 def _invalidate(
393 self,
394 connection: PoolProxiedConnection,
395 exception: Optional[BaseException] = None,
396 _checkin: bool = True,
397 ) -> None:
398 """Mark all connections established within the generation
399 of the given connection as invalidated.
401 If this pool's last invalidate time is before when the given
402 connection was created, update the timestamp til now. Otherwise,
403 no action is performed.
405 Connections with a start time prior to this pool's invalidation
406 time will be recycled upon next checkout.
407 """
408 rec = getattr(connection, "_connection_record", None)
409 if not rec or self._invalidate_time < rec.starttime:
410 self._invalidate_time = time.time()
411 if _checkin and getattr(connection, "is_valid", False):
412 connection.invalidate(exception)
414 def recreate(self) -> Pool:
415 """Return a new :class:`_pool.Pool`, of the same class as this one
416 and configured with identical creation arguments.
418 This method is used in conjunction with :meth:`dispose`
419 to close out an entire :class:`_pool.Pool` and create a new one in
420 its place.
422 """
424 raise NotImplementedError()
426 def dispose(self) -> None:
427 """Dispose of this pool.
429 This method leaves the possibility of checked-out connections
430 remaining open, as it only affects connections that are
431 idle in the pool.
433 .. seealso::
435 :meth:`Pool.recreate`
437 """
439 raise NotImplementedError()
441 def connect(self) -> PoolProxiedConnection:
442 """Return a DBAPI connection from the pool.
444 The connection is instrumented such that when its
445 ``close()`` method is called, the connection will be returned to
446 the pool.
448 """
449 return _ConnectionFairy._checkout(self)
451 def _return_conn(self, record: ConnectionPoolEntry) -> None:
452 """Given a _ConnectionRecord, return it to the :class:`_pool.Pool`.
454 This method is called when an instrumented DBAPI connection
455 has its ``close()`` method called.
457 """
458 self._do_return_conn(record)
460 def _do_get(self) -> ConnectionPoolEntry:
461 """Implementation for :meth:`get`, supplied by subclasses."""
463 raise NotImplementedError()
465 def _do_return_conn(self, record: ConnectionPoolEntry) -> None:
466 """Implementation for :meth:`return_conn`, supplied by subclasses."""
468 raise NotImplementedError()
470 def status(self) -> str:
471 raise NotImplementedError()
474class ManagesConnection:
475 """Common base for the two connection-management interfaces
476 :class:`.PoolProxiedConnection` and :class:`.ConnectionPoolEntry`.
478 These two objects are typically exposed in the public facing API
479 via the connection pool event hooks, documented at :class:`.PoolEvents`.
481 .. versionadded:: 2.0
483 """
485 __slots__ = ()
487 dbapi_connection: Optional[DBAPIConnection]
488 """A reference to the actual DBAPI connection being tracked.
490 This is a :pep:`249`-compliant object that for traditional sync-style
491 dialects is provided by the third-party
492 DBAPI implementation in use. For asyncio dialects, the implementation
493 is typically an adapter object provided by the SQLAlchemy dialect
494 itself; the underlying asyncio object is available via the
495 :attr:`.ManagesConnection.driver_connection` attribute.
497 SQLAlchemy's interface for the DBAPI connection is based on the
498 :class:`.DBAPIConnection` protocol object
500 .. seealso::
502 :attr:`.ManagesConnection.driver_connection`
504 :ref:`faq_dbapi_connection`
506 """
508 driver_connection: Optional[Any]
509 """The "driver level" connection object as used by the Python
510 DBAPI or database driver.
512 For traditional :pep:`249` DBAPI implementations, this object will
513 be the same object as that of
514 :attr:`.ManagesConnection.dbapi_connection`. For an asyncio database
515 driver, this will be the ultimate "connection" object used by that
516 driver, such as the ``asyncpg.Connection`` object which will not have
517 standard pep-249 methods.
519 .. versionadded:: 1.4.24
521 .. seealso::
523 :attr:`.ManagesConnection.dbapi_connection`
525 :ref:`faq_dbapi_connection`
527 """
529 @util.ro_memoized_property
530 def info(self) -> _InfoType:
531 """Info dictionary associated with the underlying DBAPI connection
532 referred to by this :class:`.ManagesConnection` instance, allowing
533 user-defined data to be associated with the connection.
535 The data in this dictionary is persistent for the lifespan
536 of the DBAPI connection itself, including across pool checkins
537 and checkouts. When the connection is invalidated
538 and replaced with a new one, this dictionary is cleared.
540 For a :class:`.PoolProxiedConnection` instance that's not associated
541 with a :class:`.ConnectionPoolEntry`, such as if it were detached, the
542 attribute returns a dictionary that is local to that
543 :class:`.ConnectionPoolEntry`. Therefore the
544 :attr:`.ManagesConnection.info` attribute will always provide a Python
545 dictionary.
547 .. seealso::
549 :attr:`.ManagesConnection.record_info`
552 """
553 raise NotImplementedError()
555 @util.ro_memoized_property
556 def record_info(self) -> Optional[_InfoType]:
557 """Persistent info dictionary associated with this
558 :class:`.ManagesConnection`.
560 Unlike the :attr:`.ManagesConnection.info` dictionary, the lifespan
561 of this dictionary is that of the :class:`.ConnectionPoolEntry`
562 which owns it; therefore this dictionary will persist across
563 reconnects and connection invalidation for a particular entry
564 in the connection pool.
566 For a :class:`.PoolProxiedConnection` instance that's not associated
567 with a :class:`.ConnectionPoolEntry`, such as if it were detached, the
568 attribute returns None. Contrast to the :attr:`.ManagesConnection.info`
569 dictionary which is never None.
572 .. seealso::
574 :attr:`.ManagesConnection.info`
576 """
577 raise NotImplementedError()
579 def invalidate(
580 self, e: Optional[BaseException] = None, soft: bool = False
581 ) -> None:
582 """Mark the managed connection as invalidated.
584 :param e: an exception object indicating a reason for the invalidation.
586 :param soft: if True, the connection isn't closed; instead, this
587 connection will be recycled on next checkout.
589 .. seealso::
591 :ref:`pool_connection_invalidation`
594 """
595 raise NotImplementedError()
598class ConnectionPoolEntry(ManagesConnection):
599 """Interface for the object that maintains an individual database
600 connection on behalf of a :class:`_pool.Pool` instance.
602 The :class:`.ConnectionPoolEntry` object represents the long term
603 maintainance of a particular connection for a pool, including expiring or
604 invalidating that connection to have it replaced with a new one, which will
605 continue to be maintained by that same :class:`.ConnectionPoolEntry`
606 instance. Compared to :class:`.PoolProxiedConnection`, which is the
607 short-term, per-checkout connection manager, this object lasts for the
608 lifespan of a particular "slot" within a connection pool.
610 The :class:`.ConnectionPoolEntry` object is mostly visible to public-facing
611 API code when it is delivered to connection pool event hooks, such as
612 :meth:`_events.PoolEvents.connect` and :meth:`_events.PoolEvents.checkout`.
614 .. versionadded:: 2.0 :class:`.ConnectionPoolEntry` provides the public
615 facing interface for the :class:`._ConnectionRecord` internal class.
617 """
619 __slots__ = ()
621 @property
622 def in_use(self) -> bool:
623 """Return True the connection is currently checked out"""
625 raise NotImplementedError()
627 def close(self) -> None:
628 """Close the DBAPI connection managed by this connection pool entry."""
629 raise NotImplementedError()
632class _ConnectionRecord(ConnectionPoolEntry):
633 """Maintains a position in a connection pool which references a pooled
634 connection.
636 This is an internal object used by the :class:`_pool.Pool` implementation
637 to provide context management to a DBAPI connection maintained by
638 that :class:`_pool.Pool`. The public facing interface for this class
639 is described by the :class:`.ConnectionPoolEntry` class. See that
640 class for public API details.
642 .. seealso::
644 :class:`.ConnectionPoolEntry`
646 :class:`.PoolProxiedConnection`
648 """
650 __slots__ = (
651 "__pool",
652 "fairy_ref",
653 "finalize_callback",
654 "fresh",
655 "starttime",
656 "dbapi_connection",
657 "__weakref__",
658 "__dict__",
659 )
661 finalize_callback: Deque[Callable[[DBAPIConnection], None]]
662 fresh: bool
663 fairy_ref: Optional[weakref.ref[_ConnectionFairy]]
664 starttime: float
666 def __init__(self, pool: Pool, connect: bool = True):
667 self.fresh = False
668 self.fairy_ref = None
669 self.starttime = 0
670 self.dbapi_connection = None
672 self.__pool = pool
673 if connect:
674 self.__connect()
675 self.finalize_callback = deque()
677 dbapi_connection: Optional[DBAPIConnection]
679 @property
680 def driver_connection(self) -> Optional[Any]: # type: ignore[override] # mypy#4125 # noqa: E501
681 if self.dbapi_connection is None:
682 return None
683 else:
684 return self.__pool._dialect.get_driver_connection(
685 self.dbapi_connection
686 )
688 @property
689 @util.deprecated(
690 "2.0",
691 "The _ConnectionRecord.connection attribute is deprecated; "
692 "please use 'driver_connection'",
693 )
694 def connection(self) -> Optional[DBAPIConnection]:
695 return self.dbapi_connection
697 _soft_invalidate_time: float = 0
699 @util.ro_memoized_property
700 def info(self) -> _InfoType:
701 return {}
703 @util.ro_memoized_property
704 def record_info(self) -> Optional[_InfoType]:
705 return {}
707 @classmethod
708 def checkout(cls, pool: Pool) -> _ConnectionFairy:
709 if TYPE_CHECKING:
710 rec = cast(_ConnectionRecord, pool._do_get())
711 else:
712 rec = pool._do_get()
714 try:
715 dbapi_connection = rec.get_connection()
716 except BaseException as err:
717 with util.safe_reraise():
718 rec._checkin_failed(err, _fairy_was_created=False)
720 # not reached, for code linters only
721 raise
723 echo = pool._should_log_debug()
724 fairy = _ConnectionFairy(pool, dbapi_connection, rec, echo)
726 rec.fairy_ref = ref = weakref.ref(
727 fairy,
728 lambda ref: (
729 _finalize_fairy(
730 None, rec, pool, ref, echo, transaction_was_reset=False
731 )
732 if _finalize_fairy is not None
733 else None
734 ),
735 )
736 _strong_ref_connection_records[ref] = rec
737 if echo:
738 pool.logger.debug(
739 "Connection %r checked out from pool", dbapi_connection
740 )
741 return fairy
743 def _checkin_failed(
744 self, err: BaseException, _fairy_was_created: bool = True
745 ) -> None:
746 self.invalidate(e=err)
747 self.checkin(
748 _fairy_was_created=_fairy_was_created,
749 )
751 def checkin(self, _fairy_was_created: bool = True) -> None:
752 if self.fairy_ref is None and _fairy_was_created:
753 # _fairy_was_created is False for the initial get connection phase;
754 # meaning there was no _ConnectionFairy and we must unconditionally
755 # do a checkin.
756 #
757 # otherwise, if fairy_was_created==True, if fairy_ref is None here
758 # that means we were checked in already, so this looks like
759 # a double checkin.
760 util.warn("Double checkin attempted on %s" % self)
761 return
762 self.fairy_ref = None
763 connection = self.dbapi_connection
764 pool = self.__pool
765 while self.finalize_callback:
766 finalizer = self.finalize_callback.pop()
767 if connection is not None:
768 finalizer(connection)
769 if pool.dispatch.checkin:
770 pool.dispatch.checkin(connection, self)
772 pool._return_conn(self)
774 @property
775 def in_use(self) -> bool:
776 return self.fairy_ref is not None
778 @property
779 def last_connect_time(self) -> float:
780 return self.starttime
782 def close(self) -> None:
783 if self.dbapi_connection is not None:
784 self.__close()
786 def invalidate(
787 self, e: Optional[BaseException] = None, soft: bool = False
788 ) -> None:
789 # already invalidated
790 if self.dbapi_connection is None:
791 return
792 if soft:
793 self.__pool.dispatch.soft_invalidate(
794 self.dbapi_connection, self, e
795 )
796 else:
797 self.__pool.dispatch.invalidate(self.dbapi_connection, self, e)
798 if e is not None:
799 self.__pool.logger.info(
800 "%sInvalidate connection %r (reason: %s:%s)",
801 "Soft " if soft else "",
802 self.dbapi_connection,
803 e.__class__.__name__,
804 e,
805 )
806 else:
807 self.__pool.logger.info(
808 "%sInvalidate connection %r",
809 "Soft " if soft else "",
810 self.dbapi_connection,
811 )
813 if soft:
814 self._soft_invalidate_time = time.time()
815 else:
816 self.__close(terminate=True)
817 self.dbapi_connection = None
819 def get_connection(self) -> DBAPIConnection:
820 recycle = False
822 # NOTE: the various comparisons here are assuming that measurable time
823 # passes between these state changes. however, time.time() is not
824 # guaranteed to have sub-second precision. comparisons of
825 # "invalidation time" to "starttime" should perhaps use >= so that the
826 # state change can take place assuming no measurable time has passed,
827 # however this does not guarantee correct behavior here as if time
828 # continues to not pass, it will try to reconnect repeatedly until
829 # these timestamps diverge, so in that sense using > is safer. Per
830 # https://stackoverflow.com/a/1938096/34549, Windows time.time() may be
831 # within 16 milliseconds accuracy, so unit tests for connection
832 # invalidation need a sleep of at least this long between initial start
833 # time and invalidation for the logic below to work reliably.
835 if self.dbapi_connection is None:
836 self.info.clear()
837 self.__connect()
838 elif (
839 self.__pool._recycle > -1
840 and time.time() - self.starttime > self.__pool._recycle
841 ):
842 self.__pool.logger.info(
843 "Connection %r exceeded timeout; recycling",
844 self.dbapi_connection,
845 )
846 recycle = True
847 elif self.__pool._invalidate_time > self.starttime:
848 self.__pool.logger.info(
849 "Connection %r invalidated due to pool invalidation; "
850 + "recycling",
851 self.dbapi_connection,
852 )
853 recycle = True
854 elif self._soft_invalidate_time > self.starttime:
855 self.__pool.logger.info(
856 "Connection %r invalidated due to local soft invalidation; "
857 + "recycling",
858 self.dbapi_connection,
859 )
860 recycle = True
862 if recycle:
863 self.__close(terminate=True)
864 self.info.clear()
866 self.__connect()
868 assert self.dbapi_connection is not None
869 return self.dbapi_connection
871 def _is_hard_or_soft_invalidated(self) -> bool:
872 return (
873 self.dbapi_connection is None
874 or self.__pool._invalidate_time > self.starttime
875 or (self._soft_invalidate_time > self.starttime)
876 )
878 def __close(self, *, terminate: bool = False) -> None:
879 self.finalize_callback.clear()
880 if self.__pool.dispatch.close:
881 self.__pool.dispatch.close(self.dbapi_connection, self)
882 assert self.dbapi_connection is not None
883 self.__pool._close_connection(
884 self.dbapi_connection, terminate=terminate
885 )
886 self.dbapi_connection = None
888 def __connect(self) -> None:
889 pool = self.__pool
891 # ensure any existing connection is removed, so that if
892 # creator fails, this attribute stays None
893 self.dbapi_connection = None
894 try:
895 self.starttime = time.time()
896 self.dbapi_connection = connection = pool._invoke_creator(self)
897 pool.logger.debug("Created new connection %r", connection)
898 self.fresh = True
899 except BaseException as e:
900 with util.safe_reraise():
901 pool.logger.debug("Error on connect(): %s", e)
902 else:
903 # in SQLAlchemy 1.4 the first_connect event is not used by
904 # the engine, so this will usually not be set
905 if pool.dispatch.first_connect:
906 pool.dispatch.first_connect.for_modify(
907 pool.dispatch
908 ).exec_once_unless_exception(self.dbapi_connection, self)
910 # init of the dialect now takes place within the connect
911 # event, so ensure a mutex is used on the first run
912 pool.dispatch.connect.for_modify(
913 pool.dispatch
914 )._exec_w_sync_on_first_run(self.dbapi_connection, self)
917def _finalize_fairy(
918 dbapi_connection: Optional[DBAPIConnection],
919 connection_record: Optional[_ConnectionRecord],
920 pool: Pool,
921 ref: Optional[
922 weakref.ref[_ConnectionFairy]
923 ], # this is None when called directly, not by the gc
924 echo: Optional[log._EchoFlagType],
925 transaction_was_reset: bool = False,
926 fairy: Optional[_ConnectionFairy] = None,
927) -> None:
928 """Cleanup for a :class:`._ConnectionFairy` whether or not it's already
929 been garbage collected.
931 When using an async dialect no IO can happen here (without using
932 a dedicated thread), since this is called outside the greenlet
933 context and with an already running loop. In this case function
934 will only log a message and raise a warning.
935 """
937 is_gc_cleanup = ref is not None
939 if is_gc_cleanup:
940 assert ref is not None
941 _strong_ref_connection_records.pop(ref, None)
942 assert connection_record is not None
943 if connection_record.fairy_ref is not ref:
944 return
945 assert dbapi_connection is None
946 dbapi_connection = connection_record.dbapi_connection
948 elif fairy:
949 _strong_ref_connection_records.pop(weakref.ref(fairy), None)
951 # null pool is not _is_asyncio but can be used also with async dialects
952 dont_restore_gced = pool._dialect.is_async
954 if dont_restore_gced:
955 detach = connection_record is None or is_gc_cleanup
956 can_manipulate_connection = not is_gc_cleanup
957 can_close_or_terminate_connection = (
958 not pool._dialect.is_async or pool._dialect.has_terminate
959 )
960 requires_terminate_for_close = (
961 pool._dialect.is_async and pool._dialect.has_terminate
962 )
964 else:
965 detach = connection_record is None
966 can_manipulate_connection = can_close_or_terminate_connection = True
967 requires_terminate_for_close = False
969 if dbapi_connection is not None:
970 if connection_record and echo:
971 pool.logger.debug(
972 "Connection %r being returned to pool", dbapi_connection
973 )
975 try:
976 if not fairy:
977 assert connection_record is not None
978 fairy = _ConnectionFairy(
979 pool,
980 dbapi_connection,
981 connection_record,
982 echo,
983 )
984 assert fairy.dbapi_connection is dbapi_connection
986 fairy._reset(
987 pool,
988 transaction_was_reset=transaction_was_reset,
989 terminate_only=detach,
990 asyncio_safe=can_manipulate_connection,
991 )
993 if detach:
994 if connection_record:
995 fairy._pool = pool
996 fairy.detach()
998 if can_close_or_terminate_connection:
999 if pool.dispatch.close_detached:
1000 pool.dispatch.close_detached(dbapi_connection)
1002 pool._close_connection(
1003 dbapi_connection,
1004 terminate=requires_terminate_for_close,
1005 )
1007 except BaseException as e:
1008 pool.logger.error(
1009 "Exception during reset or similar", exc_info=True
1010 )
1011 if connection_record:
1012 connection_record.invalidate(e=e)
1013 if not isinstance(e, Exception):
1014 raise
1015 finally:
1016 if detach and is_gc_cleanup and dont_restore_gced:
1017 message = (
1018 "The garbage collector is trying to clean up "
1019 f"non-checked-in connection {dbapi_connection!r}, "
1020 f"""which will be {
1021 'dropped, as it cannot be safely terminated'
1022 if not can_close_or_terminate_connection
1023 else 'terminated'
1024 }. """
1025 "Please ensure that SQLAlchemy pooled connections are "
1026 "returned to "
1027 "the pool explicitly, either by calling ``close()`` "
1028 "or by using appropriate context managers to manage "
1029 "their lifecycle."
1030 )
1031 pool.logger.error(message)
1032 util.warn(message)
1034 if connection_record and connection_record.fairy_ref is not None:
1035 connection_record.checkin()
1037 # give gc some help. See
1038 # test/engine/test_pool.py::PoolEventsTest::test_checkin_event_gc[True]
1039 # which actually started failing when pytest warnings plugin was
1040 # turned on, due to util.warn() above
1041 if fairy is not None:
1042 fairy.dbapi_connection = None # type: ignore
1043 fairy._connection_record = None
1044 del dbapi_connection
1045 del connection_record
1046 del fairy
1049# a dictionary of the _ConnectionFairy weakrefs to _ConnectionRecord, so that
1050# GC under pypy will call ConnectionFairy finalizers. linked directly to the
1051# weakref that will empty itself when collected so that it should not create
1052# any unmanaged memory references.
1053_strong_ref_connection_records: Dict[
1054 weakref.ref[_ConnectionFairy], _ConnectionRecord
1055] = {}
1058class PoolProxiedConnection(ManagesConnection):
1059 """A connection-like adapter for a :pep:`249` DBAPI connection, which
1060 includes additional methods specific to the :class:`.Pool` implementation.
1062 :class:`.PoolProxiedConnection` is the public-facing interface for the
1063 internal :class:`._ConnectionFairy` implementation object; users familiar
1064 with :class:`._ConnectionFairy` can consider this object to be equivalent.
1066 .. versionadded:: 2.0 :class:`.PoolProxiedConnection` provides the public-
1067 facing interface for the :class:`._ConnectionFairy` internal class.
1069 """
1071 __slots__ = ()
1073 if typing.TYPE_CHECKING:
1075 def commit(self) -> None: ...
1077 def cursor(self) -> DBAPICursor: ...
1079 def rollback(self) -> None: ...
1081 @property
1082 def is_valid(self) -> bool:
1083 """Return True if this :class:`.PoolProxiedConnection` still refers
1084 to an active DBAPI connection."""
1086 raise NotImplementedError()
1088 @property
1089 def is_detached(self) -> bool:
1090 """Return True if this :class:`.PoolProxiedConnection` is detached
1091 from its pool."""
1093 raise NotImplementedError()
1095 def detach(self) -> None:
1096 """Separate this connection from its Pool.
1098 This means that the connection will no longer be returned to the
1099 pool when closed, and will instead be literally closed. The
1100 associated :class:`.ConnectionPoolEntry` is de-associated from this
1101 DBAPI connection.
1103 Note that any overall connection limiting constraints imposed by a
1104 Pool implementation may be violated after a detach, as the detached
1105 connection is removed from the pool's knowledge and control.
1107 """
1109 raise NotImplementedError()
1111 def close(self) -> None:
1112 """Release this connection back to the pool.
1114 The :meth:`.PoolProxiedConnection.close` method shadows the
1115 :pep:`249` ``.close()`` method, altering its behavior to instead
1116 :term:`release` the proxied connection back to the connection pool.
1118 Upon release to the pool, whether the connection stays "opened" and
1119 pooled in the Python process, versus actually closed out and removed
1120 from the Python process, is based on the pool implementation in use and
1121 its configuration and current state.
1123 """
1124 raise NotImplementedError()
1127class _AdhocProxiedConnection(PoolProxiedConnection):
1128 """provides the :class:`.PoolProxiedConnection` interface for cases where
1129 the DBAPI connection is not actually proxied.
1131 This is used by the engine internals to pass a consistent
1132 :class:`.PoolProxiedConnection` object to consuming dialects in response to
1133 pool events that may not always have the :class:`._ConnectionFairy`
1134 available.
1136 """
1138 __slots__ = ("dbapi_connection", "_connection_record", "_is_valid")
1140 dbapi_connection: DBAPIConnection
1141 _connection_record: ConnectionPoolEntry
1143 def __init__(
1144 self,
1145 dbapi_connection: DBAPIConnection,
1146 connection_record: ConnectionPoolEntry,
1147 ):
1148 self.dbapi_connection = dbapi_connection
1149 self._connection_record = connection_record
1150 self._is_valid = True
1152 @property
1153 def driver_connection(self) -> Any: # type: ignore[override] # mypy#4125
1154 return self._connection_record.driver_connection
1156 @property
1157 def connection(self) -> DBAPIConnection:
1158 return self.dbapi_connection
1160 @property
1161 def is_valid(self) -> bool:
1162 """Implement is_valid state attribute.
1164 for the adhoc proxied connection it's assumed the connection is valid
1165 as there is no "invalidate" routine.
1167 """
1168 return self._is_valid
1170 def invalidate(
1171 self, e: Optional[BaseException] = None, soft: bool = False
1172 ) -> None:
1173 self._is_valid = False
1175 @util.ro_non_memoized_property
1176 def record_info(self) -> Optional[_InfoType]:
1177 return self._connection_record.record_info
1179 def cursor(self, *args: Any, **kwargs: Any) -> DBAPICursor:
1180 return self.dbapi_connection.cursor(*args, **kwargs)
1182 def __getattr__(self, key: Any) -> Any:
1183 return getattr(self.dbapi_connection, key)
1186class _ConnectionFairy(PoolProxiedConnection):
1187 """Proxies a DBAPI connection and provides return-on-dereference
1188 support.
1190 This is an internal object used by the :class:`_pool.Pool` implementation
1191 to provide context management to a DBAPI connection delivered by
1192 that :class:`_pool.Pool`. The public facing interface for this class
1193 is described by the :class:`.PoolProxiedConnection` class. See that
1194 class for public API details.
1196 The name "fairy" is inspired by the fact that the
1197 :class:`._ConnectionFairy` object's lifespan is transitory, as it lasts
1198 only for the length of a specific DBAPI connection being checked out from
1199 the pool, and additionally that as a transparent proxy, it is mostly
1200 invisible.
1202 .. seealso::
1204 :class:`.PoolProxiedConnection`
1206 :class:`.ConnectionPoolEntry`
1209 """
1211 __slots__ = (
1212 "dbapi_connection",
1213 "_connection_record",
1214 "_echo",
1215 "_pool",
1216 "_counter",
1217 "__weakref__",
1218 "__dict__",
1219 )
1221 pool: Pool
1222 dbapi_connection: DBAPIConnection
1223 _echo: log._EchoFlagType
1225 def __init__(
1226 self,
1227 pool: Pool,
1228 dbapi_connection: DBAPIConnection,
1229 connection_record: _ConnectionRecord,
1230 echo: log._EchoFlagType,
1231 ):
1232 self._pool = pool
1233 self._counter = 0
1234 self.dbapi_connection = dbapi_connection
1235 self._connection_record = connection_record
1236 self._echo = echo
1238 _connection_record: Optional[_ConnectionRecord]
1240 @property
1241 def driver_connection(self) -> Optional[Any]: # type: ignore[override] # mypy#4125 # noqa: E501
1242 if self._connection_record is None:
1243 return None
1244 return self._connection_record.driver_connection
1246 @property
1247 @util.deprecated(
1248 "2.0",
1249 "The _ConnectionFairy.connection attribute is deprecated; "
1250 "please use 'driver_connection'",
1251 )
1252 def connection(self) -> DBAPIConnection:
1253 return self.dbapi_connection
1255 @classmethod
1256 def _checkout(
1257 cls,
1258 pool: Pool,
1259 threadconns: Optional[threading.local] = None,
1260 fairy: Optional[_ConnectionFairy] = None,
1261 ) -> _ConnectionFairy:
1262 if not fairy:
1263 fairy = _ConnectionRecord.checkout(pool)
1265 if threadconns is not None:
1266 threadconns.current = weakref.ref(fairy)
1268 assert (
1269 fairy._connection_record is not None
1270 ), "can't 'checkout' a detached connection fairy"
1271 assert (
1272 fairy.dbapi_connection is not None
1273 ), "can't 'checkout' an invalidated connection fairy"
1275 fairy._counter += 1
1276 if (
1277 not pool.dispatch.checkout and not pool._pre_ping
1278 ) or fairy._counter != 1:
1279 return fairy
1281 # Pool listeners can trigger a reconnection on checkout, as well
1282 # as the pre-pinger.
1283 # there are three attempts made here, but note that if the database
1284 # is not accessible from a connection standpoint, those won't proceed
1285 # here.
1287 attempts = 2
1289 while attempts > 0:
1290 connection_is_fresh = fairy._connection_record.fresh
1291 fairy._connection_record.fresh = False
1292 try:
1293 if pool._pre_ping:
1294 if not connection_is_fresh:
1295 if fairy._echo:
1296 pool.logger.debug(
1297 "Pool pre-ping on connection %s",
1298 fairy.dbapi_connection,
1299 )
1300 result = pool._dialect._do_ping_w_event(
1301 fairy.dbapi_connection
1302 )
1303 if not result:
1304 if fairy._echo:
1305 pool.logger.debug(
1306 "Pool pre-ping on connection %s failed, "
1307 "will invalidate pool",
1308 fairy.dbapi_connection,
1309 )
1310 raise exc.InvalidatePoolError()
1311 elif fairy._echo:
1312 pool.logger.debug(
1313 "Connection %s is fresh, skipping pre-ping",
1314 fairy.dbapi_connection,
1315 )
1317 pool.dispatch.checkout(
1318 fairy.dbapi_connection, fairy._connection_record, fairy
1319 )
1320 return fairy
1321 except exc.DisconnectionError as e:
1322 if e.invalidate_pool:
1323 pool.logger.info(
1324 "Disconnection detected on checkout, "
1325 "invalidating all pooled connections prior to "
1326 "current timestamp (reason: %r)",
1327 e,
1328 )
1329 fairy._connection_record.invalidate(e)
1330 pool._invalidate(fairy, e, _checkin=False)
1331 else:
1332 pool.logger.info(
1333 "Disconnection detected on checkout, "
1334 "invalidating individual connection %s (reason: %r)",
1335 fairy.dbapi_connection,
1336 e,
1337 )
1338 fairy._connection_record.invalidate(e)
1339 try:
1340 fairy.dbapi_connection = (
1341 fairy._connection_record.get_connection()
1342 )
1343 except BaseException as err:
1344 with util.safe_reraise():
1345 fairy._connection_record._checkin_failed(
1346 err,
1347 _fairy_was_created=True,
1348 )
1350 # prevent _ConnectionFairy from being carried
1351 # in the stack trace. Do this after the
1352 # connection record has been checked in, so that
1353 # if the del triggers a finalize fairy, it won't
1354 # try to checkin a second time.
1355 del fairy
1357 # never called, this is for code linters
1358 raise
1360 attempts -= 1
1361 except BaseException as be_outer:
1362 with util.safe_reraise():
1363 rec = fairy._connection_record
1364 if rec is not None:
1365 rec._checkin_failed(
1366 be_outer,
1367 _fairy_was_created=True,
1368 )
1370 # prevent _ConnectionFairy from being carried
1371 # in the stack trace, see above
1372 del fairy
1374 # never called, this is for code linters
1375 raise
1377 pool.logger.info("Reconnection attempts exhausted on checkout")
1378 fairy.invalidate()
1379 raise exc.InvalidRequestError("This connection is closed")
1381 def _checkout_existing(self) -> _ConnectionFairy:
1382 return _ConnectionFairy._checkout(self._pool, fairy=self)
1384 def _checkin(self, transaction_was_reset: bool = False) -> None:
1385 _finalize_fairy(
1386 self.dbapi_connection,
1387 self._connection_record,
1388 self._pool,
1389 None,
1390 self._echo,
1391 transaction_was_reset=transaction_was_reset,
1392 fairy=self,
1393 )
1395 def _close(self) -> None:
1396 self._checkin()
1398 def _reset(
1399 self,
1400 pool: Pool,
1401 transaction_was_reset: bool,
1402 terminate_only: bool,
1403 asyncio_safe: bool,
1404 ) -> None:
1405 if pool.dispatch.reset:
1406 pool.dispatch.reset(
1407 self.dbapi_connection,
1408 self._connection_record,
1409 PoolResetState(
1410 transaction_was_reset=transaction_was_reset,
1411 terminate_only=terminate_only,
1412 asyncio_safe=asyncio_safe,
1413 ),
1414 )
1416 if not asyncio_safe:
1417 return
1419 if pool._reset_on_return is reset_rollback:
1420 if transaction_was_reset:
1421 if self._echo:
1422 pool.logger.debug(
1423 "Connection %s reset, transaction already reset",
1424 self.dbapi_connection,
1425 )
1426 else:
1427 if self._echo:
1428 pool.logger.debug(
1429 "Connection %s rollback-on-return",
1430 self.dbapi_connection,
1431 )
1432 pool._dialect.do_rollback(self)
1433 elif pool._reset_on_return is reset_commit:
1434 if self._echo:
1435 pool.logger.debug(
1436 "Connection %s commit-on-return",
1437 self.dbapi_connection,
1438 )
1439 pool._dialect.do_commit(self)
1441 @property
1442 def _logger(self) -> log._IdentifiedLoggerType:
1443 return self._pool.logger
1445 @property
1446 def is_valid(self) -> bool:
1447 return self.dbapi_connection is not None
1449 @property
1450 def is_detached(self) -> bool:
1451 return self._connection_record is None
1453 @util.ro_memoized_property
1454 def info(self) -> _InfoType:
1455 if self._connection_record is None:
1456 return {}
1457 else:
1458 return self._connection_record.info
1460 @util.ro_non_memoized_property
1461 def record_info(self) -> Optional[_InfoType]:
1462 if self._connection_record is None:
1463 return None
1464 else:
1465 return self._connection_record.record_info
1467 def invalidate(
1468 self, e: Optional[BaseException] = None, soft: bool = False
1469 ) -> None:
1470 if self.dbapi_connection is None:
1471 util.warn("Can't invalidate an already-closed connection.")
1472 return
1473 if self._connection_record:
1474 self._connection_record.invalidate(e=e, soft=soft)
1475 if not soft:
1476 # prevent any rollback / reset actions etc. on
1477 # the connection
1478 self.dbapi_connection = None # type: ignore
1480 # finalize
1481 self._checkin()
1483 def cursor(self, *args: Any, **kwargs: Any) -> DBAPICursor:
1484 assert self.dbapi_connection is not None
1485 return self.dbapi_connection.cursor(*args, **kwargs)
1487 def __getattr__(self, key: str) -> Any:
1488 return getattr(self.dbapi_connection, key)
1490 def detach(self) -> None:
1491 if self._connection_record is not None:
1492 rec = self._connection_record
1493 rec.fairy_ref = None
1494 rec.dbapi_connection = None
1495 # TODO: should this be _return_conn?
1496 self._pool._do_return_conn(self._connection_record)
1498 # can't get the descriptor assignment to work here
1499 # in pylance. mypy is OK w/ it
1500 self.info = self.info.copy() # type: ignore
1502 self._connection_record = None
1504 if self._pool.dispatch.detach:
1505 self._pool.dispatch.detach(self.dbapi_connection, rec)
1507 def close(self) -> None:
1508 self._counter -= 1
1509 if self._counter == 0:
1510 self._checkin()
1512 def _close_special(self, transaction_reset: bool = False) -> None:
1513 self._counter -= 1
1514 if self._counter == 0:
1515 self._checkin(transaction_was_reset=transaction_reset)