1# pool/base.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8
9"""Base constructs for connection pools.
10
11"""
12
13from __future__ import annotations
14
15from collections import deque
16import dataclasses
17from enum import Enum
18import threading
19import time
20import typing
21from typing import Any
22from typing import Callable
23from typing import cast
24from typing import Deque
25from typing import Dict
26from typing import List
27from typing import Optional
28from typing import Tuple
29from typing import TYPE_CHECKING
30from typing import Union
31import weakref
32
33from .. import event
34from .. import exc
35from .. import log
36from .. import util
37from ..util.typing import Literal
38from ..util.typing import Protocol
39
40if TYPE_CHECKING:
41 from ..engine.interfaces import DBAPIConnection
42 from ..engine.interfaces import DBAPICursor
43 from ..engine.interfaces import Dialect
44 from ..event import _DispatchCommon
45 from ..event import _ListenerFnType
46 from ..event import dispatcher
47 from ..sql._typing import _InfoType
48
49
50@dataclasses.dataclass(frozen=True)
51class PoolResetState:
52 """describes the state of a DBAPI connection as it is being passed to
53 the :meth:`.PoolEvents.reset` connection pool event.
54
55 .. versionadded:: 2.0.0b3
56
57 """
58
59 __slots__ = ("transaction_was_reset", "terminate_only", "asyncio_safe")
60
61 transaction_was_reset: bool
62 """Indicates if the transaction on the DBAPI connection was already
63 essentially "reset" back by the :class:`.Connection` object.
64
65 This boolean is True if the :class:`.Connection` had transactional
66 state present upon it, which was then not closed using the
67 :meth:`.Connection.rollback` or :meth:`.Connection.commit` method;
68 instead, the transaction was closed inline within the
69 :meth:`.Connection.close` method so is guaranteed to remain non-present
70 when this event is reached.
71
72 """
73
74 terminate_only: bool
75 """indicates if the connection is to be immediately terminated and
76 not checked in to the pool.
77
78 This occurs for connections that were invalidated, as well as asyncio
79 connections that were not cleanly handled by the calling code that
80 are instead being garbage collected. In the latter case,
81 operations can't be safely run on asyncio connections within garbage
82 collection as there is not necessarily an event loop present.
83
84 """
85
86 asyncio_safe: bool
87 """Indicates if the reset operation is occurring within a scope where
88 an enclosing event loop is expected to be present for asyncio applications.
89
90 Will be False in the case that the connection is being garbage collected.
91
92 """
93
94
95class ResetStyle(Enum):
96 """Describe options for "reset on return" behaviors."""
97
98 reset_rollback = 0
99 reset_commit = 1
100 reset_none = 2
101
102
103_ResetStyleArgType = Union[
104 ResetStyle,
105 Literal[True, None, False, "commit", "rollback"],
106]
107reset_rollback, reset_commit, reset_none = list(ResetStyle)
108
109
110class _ConnDialect:
111 """partial implementation of :class:`.Dialect`
112 which provides DBAPI connection methods.
113
114 When a :class:`_pool.Pool` is combined with an :class:`_engine.Engine`,
115 the :class:`_engine.Engine` replaces this with its own
116 :class:`.Dialect`.
117
118 """
119
120 is_async = False
121 has_terminate = False
122
123 def do_rollback(self, dbapi_connection: PoolProxiedConnection) -> None:
124 dbapi_connection.rollback()
125
126 def do_commit(self, dbapi_connection: PoolProxiedConnection) -> None:
127 dbapi_connection.commit()
128
129 def do_terminate(self, dbapi_connection: DBAPIConnection) -> None:
130 dbapi_connection.close()
131
132 def do_close(self, dbapi_connection: DBAPIConnection) -> None:
133 dbapi_connection.close()
134
135 def _do_ping_w_event(self, dbapi_connection: DBAPIConnection) -> bool:
136 raise NotImplementedError(
137 "The ping feature requires that a dialect is "
138 "passed to the connection pool."
139 )
140
141 def get_driver_connection(self, connection: DBAPIConnection) -> Any:
142 return connection
143
144
145class _AsyncConnDialect(_ConnDialect):
146 is_async = True
147
148
149class _CreatorFnType(Protocol):
150 def __call__(self) -> DBAPIConnection: ...
151
152
153class _CreatorWRecFnType(Protocol):
154 def __call__(self, rec: ConnectionPoolEntry) -> DBAPIConnection: ...
155
156
157class Pool(log.Identified, event.EventTarget):
158 """Abstract base class for connection pools."""
159
160 dispatch: dispatcher[Pool]
161 echo: log._EchoFlagType
162
163 _orig_logging_name: Optional[str]
164 _dialect: Union[_ConnDialect, Dialect] = _ConnDialect()
165 _creator_arg: Union[_CreatorFnType, _CreatorWRecFnType]
166 _invoke_creator: _CreatorWRecFnType
167 _invalidate_time: float
168
169 def __init__(
170 self,
171 creator: Union[_CreatorFnType, _CreatorWRecFnType],
172 recycle: int = -1,
173 echo: log._EchoFlagType = None,
174 logging_name: Optional[str] = None,
175 reset_on_return: _ResetStyleArgType = True,
176 events: Optional[List[Tuple[_ListenerFnType, str]]] = None,
177 dialect: Optional[Union[_ConnDialect, Dialect]] = None,
178 pre_ping: bool = False,
179 _dispatch: Optional[_DispatchCommon[Pool]] = None,
180 ):
181 """
182 Construct a Pool.
183
184 :param creator: a callable function that returns a DB-API
185 connection object. The function will be called with
186 parameters.
187
188 :param recycle: If set to a value other than -1, number of
189 seconds between connection recycling, which means upon
190 checkout, if this timeout is surpassed the connection will be
191 closed and replaced with a newly opened connection. Defaults to -1.
192
193 :param logging_name: String identifier which will be used within
194 the "name" field of logging records generated within the
195 "sqlalchemy.pool" logger. Defaults to a hexstring of the object's
196 id.
197
198 :param echo: if True, the connection pool will log
199 informational output such as when connections are invalidated
200 as well as when connections are recycled to the default log handler,
201 which defaults to ``sys.stdout`` for output.. If set to the string
202 ``"debug"``, the logging will include pool checkouts and checkins.
203
204 The :paramref:`_pool.Pool.echo` parameter can also be set from the
205 :func:`_sa.create_engine` call by using the
206 :paramref:`_sa.create_engine.echo_pool` parameter.
207
208 .. seealso::
209
210 :ref:`dbengine_logging` - further detail on how to configure
211 logging.
212
213 :param reset_on_return: Determine steps to take on
214 connections as they are returned to the pool, which were
215 not otherwise handled by a :class:`_engine.Connection`.
216 Available from :func:`_sa.create_engine` via the
217 :paramref:`_sa.create_engine.pool_reset_on_return` parameter.
218
219 :paramref:`_pool.Pool.reset_on_return` can have any of these values:
220
221 * ``"rollback"`` - call rollback() on the connection,
222 to release locks and transaction resources.
223 This is the default value. The vast majority
224 of use cases should leave this value set.
225 * ``"commit"`` - call commit() on the connection,
226 to release locks and transaction resources.
227 A commit here may be desirable for databases that
228 cache query plans if a commit is emitted,
229 such as Microsoft SQL Server. However, this
230 value is more dangerous than 'rollback' because
231 any data changes present on the transaction
232 are committed unconditionally.
233 * ``None`` - don't do anything on the connection.
234 This setting may be appropriate if the database / DBAPI
235 works in pure "autocommit" mode at all times, or if
236 a custom reset handler is established using the
237 :meth:`.PoolEvents.reset` event handler.
238
239 * ``True`` - same as 'rollback', this is here for
240 backwards compatibility.
241 * ``False`` - same as None, this is here for
242 backwards compatibility.
243
244 For further customization of reset on return, the
245 :meth:`.PoolEvents.reset` event hook may be used which can perform
246 any connection activity desired on reset.
247
248 .. seealso::
249
250 :ref:`pool_reset_on_return`
251
252 :meth:`.PoolEvents.reset`
253
254 :param events: a list of 2-tuples, each of the form
255 ``(callable, target)`` which will be passed to :func:`.event.listen`
256 upon construction. Provided here so that event listeners
257 can be assigned via :func:`_sa.create_engine` before dialect-level
258 listeners are applied.
259
260 :param dialect: a :class:`.Dialect` that will handle the job
261 of calling rollback(), close(), or commit() on DBAPI connections.
262 If omitted, a built-in "stub" dialect is used. Applications that
263 make use of :func:`_sa.create_engine` should not use this parameter
264 as it is handled by the engine creation strategy.
265
266 :param pre_ping: if True, the pool will emit a "ping" (typically
267 "SELECT 1", but is dialect-specific) on the connection
268 upon checkout, to test if the connection is alive or not. If not,
269 the connection is transparently re-connected and upon success, all
270 other pooled connections established prior to that timestamp are
271 invalidated. Requires that a dialect is passed as well to
272 interpret the disconnection error.
273
274 .. versionadded:: 1.2
275
276 """
277 if logging_name:
278 self.logging_name = self._orig_logging_name = logging_name
279 else:
280 self._orig_logging_name = None
281
282 log.instance_logger(self, echoflag=echo)
283 self._creator = creator
284 self._recycle = recycle
285 self._invalidate_time = 0
286 self._pre_ping = pre_ping
287 self._reset_on_return = util.parse_user_argument_for_enum(
288 reset_on_return,
289 {
290 ResetStyle.reset_rollback: ["rollback", True],
291 ResetStyle.reset_none: ["none", None, False],
292 ResetStyle.reset_commit: ["commit"],
293 },
294 "reset_on_return",
295 )
296
297 self.echo = echo
298
299 if _dispatch:
300 self.dispatch._update(_dispatch, only_propagate=False)
301 if dialect:
302 self._dialect = dialect
303 if events:
304 for fn, target in events:
305 event.listen(self, target, fn)
306
307 @util.hybridproperty
308 def _is_asyncio(self) -> bool:
309 return self._dialect.is_async
310
311 @property
312 def _creator(self) -> Union[_CreatorFnType, _CreatorWRecFnType]:
313 return self._creator_arg
314
315 @_creator.setter
316 def _creator(
317 self, creator: Union[_CreatorFnType, _CreatorWRecFnType]
318 ) -> None:
319 self._creator_arg = creator
320
321 # mypy seems to get super confused assigning functions to
322 # attributes
323 self._invoke_creator = self._should_wrap_creator(creator)
324
325 @_creator.deleter
326 def _creator(self) -> None:
327 # needed for mock testing
328 del self._creator_arg
329 del self._invoke_creator
330
331 def _should_wrap_creator(
332 self, creator: Union[_CreatorFnType, _CreatorWRecFnType]
333 ) -> _CreatorWRecFnType:
334 """Detect if creator accepts a single argument, or is sent
335 as a legacy style no-arg function.
336
337 """
338
339 try:
340 argspec = util.get_callable_argspec(self._creator, no_self=True)
341 except TypeError:
342 creator_fn = cast(_CreatorFnType, creator)
343 return lambda rec: creator_fn()
344
345 if argspec.defaults is not None:
346 defaulted = len(argspec.defaults)
347 else:
348 defaulted = 0
349 positionals = len(argspec[0]) - defaulted
350
351 # look for the exact arg signature that DefaultStrategy
352 # sends us
353 if (argspec[0], argspec[3]) == (["connection_record"], (None,)):
354 return cast(_CreatorWRecFnType, creator)
355 # or just a single positional
356 elif positionals == 1:
357 return cast(_CreatorWRecFnType, creator)
358 # all other cases, just wrap and assume legacy "creator" callable
359 # thing
360 else:
361 creator_fn = cast(_CreatorFnType, creator)
362 return lambda rec: creator_fn()
363
364 def _close_connection(
365 self, connection: DBAPIConnection, *, terminate: bool = False
366 ) -> None:
367 self.logger.debug(
368 "%s connection %r",
369 "Hard-closing" if terminate else "Closing",
370 connection,
371 )
372 try:
373 if terminate:
374 self._dialect.do_terminate(connection)
375 else:
376 self._dialect.do_close(connection)
377 except BaseException as e:
378 self.logger.error(
379 f"Exception {'terminating' if terminate else 'closing'} "
380 f"connection %r",
381 connection,
382 exc_info=True,
383 )
384 if not isinstance(e, Exception):
385 raise
386
387 def _create_connection(self) -> ConnectionPoolEntry:
388 """Called by subclasses to create a new ConnectionRecord."""
389
390 return _ConnectionRecord(self)
391
392 def _invalidate(
393 self,
394 connection: PoolProxiedConnection,
395 exception: Optional[BaseException] = None,
396 _checkin: bool = True,
397 ) -> None:
398 """Mark all connections established within the generation
399 of the given connection as invalidated.
400
401 If this pool's last invalidate time is before when the given
402 connection was created, update the timestamp til now. Otherwise,
403 no action is performed.
404
405 Connections with a start time prior to this pool's invalidation
406 time will be recycled upon next checkout.
407 """
408 rec = getattr(connection, "_connection_record", None)
409 if not rec or self._invalidate_time < rec.starttime:
410 self._invalidate_time = time.time()
411 if _checkin and getattr(connection, "is_valid", False):
412 connection.invalidate(exception)
413
414 def recreate(self) -> Pool:
415 """Return a new :class:`_pool.Pool`, of the same class as this one
416 and configured with identical creation arguments.
417
418 This method is used in conjunction with :meth:`dispose`
419 to close out an entire :class:`_pool.Pool` and create a new one in
420 its place.
421
422 """
423
424 raise NotImplementedError()
425
426 def dispose(self) -> None:
427 """Dispose of this pool.
428
429 This method leaves the possibility of checked-out connections
430 remaining open, as it only affects connections that are
431 idle in the pool.
432
433 .. seealso::
434
435 :meth:`Pool.recreate`
436
437 """
438
439 raise NotImplementedError()
440
441 def connect(self) -> PoolProxiedConnection:
442 """Return a DBAPI connection from the pool.
443
444 The connection is instrumented such that when its
445 ``close()`` method is called, the connection will be returned to
446 the pool.
447
448 """
449 return _ConnectionFairy._checkout(self)
450
451 def _return_conn(self, record: ConnectionPoolEntry) -> None:
452 """Given a _ConnectionRecord, return it to the :class:`_pool.Pool`.
453
454 This method is called when an instrumented DBAPI connection
455 has its ``close()`` method called.
456
457 """
458 self._do_return_conn(record)
459
460 def _do_get(self) -> ConnectionPoolEntry:
461 """Implementation for :meth:`get`, supplied by subclasses."""
462
463 raise NotImplementedError()
464
465 def _do_return_conn(self, record: ConnectionPoolEntry) -> None:
466 """Implementation for :meth:`return_conn`, supplied by subclasses."""
467
468 raise NotImplementedError()
469
470 def status(self) -> str:
471 """Returns a brief description of the state of this pool."""
472 raise NotImplementedError()
473
474
475class ManagesConnection:
476 """Common base for the two connection-management interfaces
477 :class:`.PoolProxiedConnection` and :class:`.ConnectionPoolEntry`.
478
479 These two objects are typically exposed in the public facing API
480 via the connection pool event hooks, documented at :class:`.PoolEvents`.
481
482 .. versionadded:: 2.0
483
484 """
485
486 __slots__ = ()
487
488 dbapi_connection: Optional[DBAPIConnection]
489 """A reference to the actual DBAPI connection being tracked.
490
491 This is a :pep:`249`-compliant object that for traditional sync-style
492 dialects is provided by the third-party
493 DBAPI implementation in use. For asyncio dialects, the implementation
494 is typically an adapter object provided by the SQLAlchemy dialect
495 itself; the underlying asyncio object is available via the
496 :attr:`.ManagesConnection.driver_connection` attribute.
497
498 SQLAlchemy's interface for the DBAPI connection is based on the
499 :class:`.DBAPIConnection` protocol object
500
501 .. seealso::
502
503 :attr:`.ManagesConnection.driver_connection`
504
505 :ref:`faq_dbapi_connection`
506
507 """
508
509 driver_connection: Optional[Any]
510 """The "driver level" connection object as used by the Python
511 DBAPI or database driver.
512
513 For traditional :pep:`249` DBAPI implementations, this object will
514 be the same object as that of
515 :attr:`.ManagesConnection.dbapi_connection`. For an asyncio database
516 driver, this will be the ultimate "connection" object used by that
517 driver, such as the ``asyncpg.Connection`` object which will not have
518 standard pep-249 methods.
519
520 .. versionadded:: 1.4.24
521
522 .. seealso::
523
524 :attr:`.ManagesConnection.dbapi_connection`
525
526 :ref:`faq_dbapi_connection`
527
528 """
529
530 @util.ro_memoized_property
531 def info(self) -> _InfoType:
532 """Info dictionary associated with the underlying DBAPI connection
533 referred to by this :class:`.ManagesConnection` instance, allowing
534 user-defined data to be associated with the connection.
535
536 The data in this dictionary is persistent for the lifespan
537 of the DBAPI connection itself, including across pool checkins
538 and checkouts. When the connection is invalidated
539 and replaced with a new one, this dictionary is cleared.
540
541 For a :class:`.PoolProxiedConnection` instance that's not associated
542 with a :class:`.ConnectionPoolEntry`, such as if it were detached, the
543 attribute returns a dictionary that is local to that
544 :class:`.ConnectionPoolEntry`. Therefore the
545 :attr:`.ManagesConnection.info` attribute will always provide a Python
546 dictionary.
547
548 .. seealso::
549
550 :attr:`.ManagesConnection.record_info`
551
552
553 """
554 raise NotImplementedError()
555
556 @util.ro_memoized_property
557 def record_info(self) -> Optional[_InfoType]:
558 """Persistent info dictionary associated with this
559 :class:`.ManagesConnection`.
560
561 Unlike the :attr:`.ManagesConnection.info` dictionary, the lifespan
562 of this dictionary is that of the :class:`.ConnectionPoolEntry`
563 which owns it; therefore this dictionary will persist across
564 reconnects and connection invalidation for a particular entry
565 in the connection pool.
566
567 For a :class:`.PoolProxiedConnection` instance that's not associated
568 with a :class:`.ConnectionPoolEntry`, such as if it were detached, the
569 attribute returns None. Contrast to the :attr:`.ManagesConnection.info`
570 dictionary which is never None.
571
572
573 .. seealso::
574
575 :attr:`.ManagesConnection.info`
576
577 """
578 raise NotImplementedError()
579
580 def invalidate(
581 self, e: Optional[BaseException] = None, soft: bool = False
582 ) -> None:
583 """Mark the managed connection as invalidated.
584
585 :param e: an exception object indicating a reason for the invalidation.
586
587 :param soft: if True, the connection isn't closed; instead, this
588 connection will be recycled on next checkout.
589
590 .. seealso::
591
592 :ref:`pool_connection_invalidation`
593
594
595 """
596 raise NotImplementedError()
597
598
599class ConnectionPoolEntry(ManagesConnection):
600 """Interface for the object that maintains an individual database
601 connection on behalf of a :class:`_pool.Pool` instance.
602
603 The :class:`.ConnectionPoolEntry` object represents the long term
604 maintainance of a particular connection for a pool, including expiring or
605 invalidating that connection to have it replaced with a new one, which will
606 continue to be maintained by that same :class:`.ConnectionPoolEntry`
607 instance. Compared to :class:`.PoolProxiedConnection`, which is the
608 short-term, per-checkout connection manager, this object lasts for the
609 lifespan of a particular "slot" within a connection pool.
610
611 The :class:`.ConnectionPoolEntry` object is mostly visible to public-facing
612 API code when it is delivered to connection pool event hooks, such as
613 :meth:`_events.PoolEvents.connect` and :meth:`_events.PoolEvents.checkout`.
614
615 .. versionadded:: 2.0 :class:`.ConnectionPoolEntry` provides the public
616 facing interface for the :class:`._ConnectionRecord` internal class.
617
618 """
619
620 __slots__ = ()
621
622 @property
623 def in_use(self) -> bool:
624 """Return True the connection is currently checked out"""
625
626 raise NotImplementedError()
627
628 def close(self) -> None:
629 """Close the DBAPI connection managed by this connection pool entry."""
630 raise NotImplementedError()
631
632
633class _ConnectionRecord(ConnectionPoolEntry):
634 """Maintains a position in a connection pool which references a pooled
635 connection.
636
637 This is an internal object used by the :class:`_pool.Pool` implementation
638 to provide context management to a DBAPI connection maintained by
639 that :class:`_pool.Pool`. The public facing interface for this class
640 is described by the :class:`.ConnectionPoolEntry` class. See that
641 class for public API details.
642
643 .. seealso::
644
645 :class:`.ConnectionPoolEntry`
646
647 :class:`.PoolProxiedConnection`
648
649 """
650
651 __slots__ = (
652 "__pool",
653 "fairy_ref",
654 "finalize_callback",
655 "fresh",
656 "starttime",
657 "dbapi_connection",
658 "__weakref__",
659 "__dict__",
660 )
661
662 finalize_callback: Deque[Callable[[DBAPIConnection], None]]
663 fresh: bool
664 fairy_ref: Optional[weakref.ref[_ConnectionFairy]]
665 starttime: float
666
667 def __init__(self, pool: Pool, connect: bool = True):
668 self.fresh = False
669 self.fairy_ref = None
670 self.starttime = 0
671 self.dbapi_connection = None
672
673 self.__pool = pool
674 if connect:
675 self.__connect()
676 self.finalize_callback = deque()
677
678 dbapi_connection: Optional[DBAPIConnection]
679
680 @property
681 def driver_connection(self) -> Optional[Any]: # type: ignore[override] # mypy#4125 # noqa: E501
682 if self.dbapi_connection is None:
683 return None
684 else:
685 return self.__pool._dialect.get_driver_connection(
686 self.dbapi_connection
687 )
688
689 @property
690 @util.deprecated(
691 "2.0",
692 "The _ConnectionRecord.connection attribute is deprecated; "
693 "please use 'driver_connection'",
694 )
695 def connection(self) -> Optional[DBAPIConnection]:
696 return self.dbapi_connection
697
698 _soft_invalidate_time: float = 0
699
700 @util.ro_memoized_property
701 def info(self) -> _InfoType:
702 return {}
703
704 @util.ro_memoized_property
705 def record_info(self) -> Optional[_InfoType]:
706 return {}
707
708 @classmethod
709 def checkout(cls, pool: Pool) -> _ConnectionFairy:
710 if TYPE_CHECKING:
711 rec = cast(_ConnectionRecord, pool._do_get())
712 else:
713 rec = pool._do_get()
714
715 try:
716 dbapi_connection = rec.get_connection()
717 except BaseException as err:
718 with util.safe_reraise():
719 rec._checkin_failed(err, _fairy_was_created=False)
720
721 # not reached, for code linters only
722 raise
723
724 echo = pool._should_log_debug()
725 fairy = _ConnectionFairy(pool, dbapi_connection, rec, echo)
726
727 rec.fairy_ref = ref = weakref.ref(
728 fairy,
729 lambda ref: (
730 _finalize_fairy(
731 None, rec, pool, ref, echo, transaction_was_reset=False
732 )
733 if _finalize_fairy is not None
734 else None
735 ),
736 )
737 _strong_ref_connection_records[ref] = rec
738 if echo:
739 pool.logger.debug(
740 "Connection %r checked out from pool", dbapi_connection
741 )
742 return fairy
743
744 def _checkin_failed(
745 self, err: BaseException, _fairy_was_created: bool = True
746 ) -> None:
747 self.invalidate(e=err)
748 self.checkin(
749 _fairy_was_created=_fairy_was_created,
750 )
751
752 def checkin(self, _fairy_was_created: bool = True) -> None:
753 if self.fairy_ref is None and _fairy_was_created:
754 # _fairy_was_created is False for the initial get connection phase;
755 # meaning there was no _ConnectionFairy and we must unconditionally
756 # do a checkin.
757 #
758 # otherwise, if fairy_was_created==True, if fairy_ref is None here
759 # that means we were checked in already, so this looks like
760 # a double checkin.
761 util.warn("Double checkin attempted on %s" % self)
762 return
763 self.fairy_ref = None
764 connection = self.dbapi_connection
765 pool = self.__pool
766 while self.finalize_callback:
767 finalizer = self.finalize_callback.pop()
768 if connection is not None:
769 finalizer(connection)
770 if pool.dispatch.checkin:
771 pool.dispatch.checkin(connection, self)
772
773 pool._return_conn(self)
774
775 @property
776 def in_use(self) -> bool:
777 return self.fairy_ref is not None
778
779 @property
780 def last_connect_time(self) -> float:
781 return self.starttime
782
783 def close(self) -> None:
784 if self.dbapi_connection is not None:
785 self.__close()
786
787 def invalidate(
788 self, e: Optional[BaseException] = None, soft: bool = False
789 ) -> None:
790 # already invalidated
791 if self.dbapi_connection is None:
792 return
793 if soft:
794 self.__pool.dispatch.soft_invalidate(
795 self.dbapi_connection, self, e
796 )
797 else:
798 self.__pool.dispatch.invalidate(self.dbapi_connection, self, e)
799 if e is not None:
800 self.__pool.logger.info(
801 "%sInvalidate connection %r (reason: %s:%s)",
802 "Soft " if soft else "",
803 self.dbapi_connection,
804 e.__class__.__name__,
805 e,
806 )
807 else:
808 self.__pool.logger.info(
809 "%sInvalidate connection %r",
810 "Soft " if soft else "",
811 self.dbapi_connection,
812 )
813
814 if soft:
815 self._soft_invalidate_time = time.time()
816 else:
817 self.__close(terminate=True)
818 self.dbapi_connection = None
819
820 def get_connection(self) -> DBAPIConnection:
821 recycle = False
822
823 # NOTE: the various comparisons here are assuming that measurable time
824 # passes between these state changes. however, time.time() is not
825 # guaranteed to have sub-second precision. comparisons of
826 # "invalidation time" to "starttime" should perhaps use >= so that the
827 # state change can take place assuming no measurable time has passed,
828 # however this does not guarantee correct behavior here as if time
829 # continues to not pass, it will try to reconnect repeatedly until
830 # these timestamps diverge, so in that sense using > is safer. Per
831 # https://stackoverflow.com/a/1938096/34549, Windows time.time() may be
832 # within 16 milliseconds accuracy, so unit tests for connection
833 # invalidation need a sleep of at least this long between initial start
834 # time and invalidation for the logic below to work reliably.
835
836 if self.dbapi_connection is None:
837 self.info.clear()
838 self.__connect()
839 elif (
840 self.__pool._recycle > -1
841 and time.time() - self.starttime > self.__pool._recycle
842 ):
843 self.__pool.logger.info(
844 "Connection %r exceeded timeout; recycling",
845 self.dbapi_connection,
846 )
847 recycle = True
848 elif self.__pool._invalidate_time > self.starttime:
849 self.__pool.logger.info(
850 "Connection %r invalidated due to pool invalidation; "
851 + "recycling",
852 self.dbapi_connection,
853 )
854 recycle = True
855 elif self._soft_invalidate_time > self.starttime:
856 self.__pool.logger.info(
857 "Connection %r invalidated due to local soft invalidation; "
858 + "recycling",
859 self.dbapi_connection,
860 )
861 recycle = True
862
863 if recycle:
864 self.__close(terminate=True)
865 self.info.clear()
866
867 self.__connect()
868
869 assert self.dbapi_connection is not None
870 return self.dbapi_connection
871
872 def _is_hard_or_soft_invalidated(self) -> bool:
873 return (
874 self.dbapi_connection is None
875 or self.__pool._invalidate_time > self.starttime
876 or (self._soft_invalidate_time > self.starttime)
877 )
878
879 def __close(self, *, terminate: bool = False) -> None:
880 self.finalize_callback.clear()
881 if self.__pool.dispatch.close:
882 self.__pool.dispatch.close(self.dbapi_connection, self)
883 assert self.dbapi_connection is not None
884 self.__pool._close_connection(
885 self.dbapi_connection, terminate=terminate
886 )
887 self.dbapi_connection = None
888
889 def __connect(self) -> None:
890 pool = self.__pool
891
892 # ensure any existing connection is removed, so that if
893 # creator fails, this attribute stays None
894 self.dbapi_connection = None
895 try:
896 self.starttime = time.time()
897 self.dbapi_connection = connection = pool._invoke_creator(self)
898 pool.logger.debug("Created new connection %r", connection)
899 self.fresh = True
900 except BaseException as e:
901 with util.safe_reraise():
902 pool.logger.debug("Error on connect(): %s", e)
903 else:
904 # in SQLAlchemy 1.4 the first_connect event is not used by
905 # the engine, so this will usually not be set
906 if pool.dispatch.first_connect:
907 pool.dispatch.first_connect.for_modify(
908 pool.dispatch
909 ).exec_once_unless_exception(self.dbapi_connection, self)
910
911 # init of the dialect now takes place within the connect
912 # event, so ensure a mutex is used on the first run
913 pool.dispatch.connect.for_modify(
914 pool.dispatch
915 )._exec_w_sync_on_first_run(self.dbapi_connection, self)
916
917
918def _finalize_fairy(
919 dbapi_connection: Optional[DBAPIConnection],
920 connection_record: Optional[_ConnectionRecord],
921 pool: Pool,
922 ref: Optional[
923 weakref.ref[_ConnectionFairy]
924 ], # this is None when called directly, not by the gc
925 echo: Optional[log._EchoFlagType],
926 transaction_was_reset: bool = False,
927 fairy: Optional[_ConnectionFairy] = None,
928) -> None:
929 """Cleanup for a :class:`._ConnectionFairy` whether or not it's already
930 been garbage collected.
931
932 When using an async dialect no IO can happen here (without using
933 a dedicated thread), since this is called outside the greenlet
934 context and with an already running loop. In this case function
935 will only log a message and raise a warning.
936 """
937
938 is_gc_cleanup = ref is not None
939
940 if is_gc_cleanup:
941 assert ref is not None
942 _strong_ref_connection_records.pop(ref, None)
943 assert connection_record is not None
944 if connection_record.fairy_ref is not ref:
945 return
946 assert dbapi_connection is None
947 dbapi_connection = connection_record.dbapi_connection
948
949 elif fairy:
950 _strong_ref_connection_records.pop(weakref.ref(fairy), None)
951
952 # null pool is not _is_asyncio but can be used also with async dialects
953 dont_restore_gced = pool._dialect.is_async
954
955 if dont_restore_gced:
956 detach = connection_record is None or is_gc_cleanup
957 can_manipulate_connection = not is_gc_cleanup
958 can_close_or_terminate_connection = (
959 not pool._dialect.is_async or pool._dialect.has_terminate
960 )
961 requires_terminate_for_close = (
962 pool._dialect.is_async and pool._dialect.has_terminate
963 )
964
965 else:
966 detach = connection_record is None
967 can_manipulate_connection = can_close_or_terminate_connection = True
968 requires_terminate_for_close = False
969
970 if dbapi_connection is not None:
971 if connection_record and echo:
972 pool.logger.debug(
973 "Connection %r being returned to pool", dbapi_connection
974 )
975
976 try:
977 if not fairy:
978 assert connection_record is not None
979 fairy = _ConnectionFairy(
980 pool,
981 dbapi_connection,
982 connection_record,
983 echo,
984 )
985 assert fairy.dbapi_connection is dbapi_connection
986
987 fairy._reset(
988 pool,
989 transaction_was_reset=transaction_was_reset,
990 terminate_only=detach,
991 asyncio_safe=can_manipulate_connection,
992 )
993
994 if detach:
995 if connection_record:
996 fairy._pool = pool
997 fairy.detach()
998
999 if can_close_or_terminate_connection:
1000 if pool.dispatch.close_detached:
1001 pool.dispatch.close_detached(dbapi_connection)
1002
1003 pool._close_connection(
1004 dbapi_connection,
1005 terminate=requires_terminate_for_close,
1006 )
1007
1008 except BaseException as e:
1009 pool.logger.error(
1010 "Exception during reset or similar", exc_info=True
1011 )
1012 if connection_record:
1013 connection_record.invalidate(e=e)
1014 if not isinstance(e, Exception):
1015 raise
1016 finally:
1017 if detach and is_gc_cleanup and dont_restore_gced:
1018 message = (
1019 "The garbage collector is trying to clean up "
1020 f"non-checked-in connection {dbapi_connection!r}, "
1021 f"""which will be {
1022 'dropped, as it cannot be safely terminated'
1023 if not can_close_or_terminate_connection
1024 else 'terminated'
1025 }. """
1026 "Please ensure that SQLAlchemy pooled connections are "
1027 "returned to "
1028 "the pool explicitly, either by calling ``close()`` "
1029 "or by using appropriate context managers to manage "
1030 "their lifecycle."
1031 )
1032 pool.logger.error(message)
1033 util.warn(message)
1034
1035 if connection_record and connection_record.fairy_ref is not None:
1036 connection_record.checkin()
1037
1038 # give gc some help. See
1039 # test/engine/test_pool.py::PoolEventsTest::test_checkin_event_gc[True]
1040 # which actually started failing when pytest warnings plugin was
1041 # turned on, due to util.warn() above
1042 if fairy is not None:
1043 fairy.dbapi_connection = None # type: ignore
1044 fairy._connection_record = None
1045 del dbapi_connection
1046 del connection_record
1047 del fairy
1048
1049
1050# a dictionary of the _ConnectionFairy weakrefs to _ConnectionRecord, so that
1051# GC under pypy will call ConnectionFairy finalizers. linked directly to the
1052# weakref that will empty itself when collected so that it should not create
1053# any unmanaged memory references.
1054_strong_ref_connection_records: Dict[
1055 weakref.ref[_ConnectionFairy], _ConnectionRecord
1056] = {}
1057
1058
1059class PoolProxiedConnection(ManagesConnection):
1060 """A connection-like adapter for a :pep:`249` DBAPI connection, which
1061 includes additional methods specific to the :class:`.Pool` implementation.
1062
1063 :class:`.PoolProxiedConnection` is the public-facing interface for the
1064 internal :class:`._ConnectionFairy` implementation object; users familiar
1065 with :class:`._ConnectionFairy` can consider this object to be equivalent.
1066
1067 .. versionadded:: 2.0 :class:`.PoolProxiedConnection` provides the public-
1068 facing interface for the :class:`._ConnectionFairy` internal class.
1069
1070 """
1071
1072 __slots__ = ()
1073
1074 if typing.TYPE_CHECKING:
1075
1076 def commit(self) -> None: ...
1077
1078 def cursor(self, *args: Any, **kwargs: Any) -> DBAPICursor: ...
1079
1080 def rollback(self) -> None: ...
1081
1082 @property
1083 def is_valid(self) -> bool:
1084 """Return True if this :class:`.PoolProxiedConnection` still refers
1085 to an active DBAPI connection."""
1086
1087 raise NotImplementedError()
1088
1089 @property
1090 def is_detached(self) -> bool:
1091 """Return True if this :class:`.PoolProxiedConnection` is detached
1092 from its pool."""
1093
1094 raise NotImplementedError()
1095
1096 def detach(self) -> None:
1097 """Separate this connection from its Pool.
1098
1099 This means that the connection will no longer be returned to the
1100 pool when closed, and will instead be literally closed. The
1101 associated :class:`.ConnectionPoolEntry` is de-associated from this
1102 DBAPI connection.
1103
1104 Note that any overall connection limiting constraints imposed by a
1105 Pool implementation may be violated after a detach, as the detached
1106 connection is removed from the pool's knowledge and control.
1107
1108 """
1109
1110 raise NotImplementedError()
1111
1112 def close(self) -> None:
1113 """Release this connection back to the pool.
1114
1115 The :meth:`.PoolProxiedConnection.close` method shadows the
1116 :pep:`249` ``.close()`` method, altering its behavior to instead
1117 :term:`release` the proxied connection back to the connection pool.
1118
1119 Upon release to the pool, whether the connection stays "opened" and
1120 pooled in the Python process, versus actually closed out and removed
1121 from the Python process, is based on the pool implementation in use and
1122 its configuration and current state.
1123
1124 """
1125 raise NotImplementedError()
1126
1127
1128class _AdhocProxiedConnection(PoolProxiedConnection):
1129 """provides the :class:`.PoolProxiedConnection` interface for cases where
1130 the DBAPI connection is not actually proxied.
1131
1132 This is used by the engine internals to pass a consistent
1133 :class:`.PoolProxiedConnection` object to consuming dialects in response to
1134 pool events that may not always have the :class:`._ConnectionFairy`
1135 available.
1136
1137 """
1138
1139 __slots__ = ("dbapi_connection", "_connection_record", "_is_valid")
1140
1141 dbapi_connection: DBAPIConnection
1142 _connection_record: ConnectionPoolEntry
1143
1144 def __init__(
1145 self,
1146 dbapi_connection: DBAPIConnection,
1147 connection_record: ConnectionPoolEntry,
1148 ):
1149 self.dbapi_connection = dbapi_connection
1150 self._connection_record = connection_record
1151 self._is_valid = True
1152
1153 @property
1154 def driver_connection(self) -> Any: # type: ignore[override] # mypy#4125
1155 return self._connection_record.driver_connection
1156
1157 @property
1158 def connection(self) -> DBAPIConnection:
1159 return self.dbapi_connection
1160
1161 @property
1162 def is_valid(self) -> bool:
1163 """Implement is_valid state attribute.
1164
1165 for the adhoc proxied connection it's assumed the connection is valid
1166 as there is no "invalidate" routine.
1167
1168 """
1169 return self._is_valid
1170
1171 def invalidate(
1172 self, e: Optional[BaseException] = None, soft: bool = False
1173 ) -> None:
1174 self._is_valid = False
1175
1176 @util.ro_non_memoized_property
1177 def record_info(self) -> Optional[_InfoType]:
1178 return self._connection_record.record_info
1179
1180 def cursor(self, *args: Any, **kwargs: Any) -> DBAPICursor:
1181 return self.dbapi_connection.cursor(*args, **kwargs)
1182
1183 def __getattr__(self, key: Any) -> Any:
1184 return getattr(self.dbapi_connection, key)
1185
1186
1187class _ConnectionFairy(PoolProxiedConnection):
1188 """Proxies a DBAPI connection and provides return-on-dereference
1189 support.
1190
1191 This is an internal object used by the :class:`_pool.Pool` implementation
1192 to provide context management to a DBAPI connection delivered by
1193 that :class:`_pool.Pool`. The public facing interface for this class
1194 is described by the :class:`.PoolProxiedConnection` class. See that
1195 class for public API details.
1196
1197 The name "fairy" is inspired by the fact that the
1198 :class:`._ConnectionFairy` object's lifespan is transitory, as it lasts
1199 only for the length of a specific DBAPI connection being checked out from
1200 the pool, and additionally that as a transparent proxy, it is mostly
1201 invisible.
1202
1203 .. seealso::
1204
1205 :class:`.PoolProxiedConnection`
1206
1207 :class:`.ConnectionPoolEntry`
1208
1209
1210 """
1211
1212 __slots__ = (
1213 "dbapi_connection",
1214 "_connection_record",
1215 "_echo",
1216 "_pool",
1217 "_counter",
1218 "__weakref__",
1219 "__dict__",
1220 )
1221
1222 pool: Pool
1223 dbapi_connection: DBAPIConnection
1224 _echo: log._EchoFlagType
1225
1226 def __init__(
1227 self,
1228 pool: Pool,
1229 dbapi_connection: DBAPIConnection,
1230 connection_record: _ConnectionRecord,
1231 echo: log._EchoFlagType,
1232 ):
1233 self._pool = pool
1234 self._counter = 0
1235 self.dbapi_connection = dbapi_connection
1236 self._connection_record = connection_record
1237 self._echo = echo
1238
1239 _connection_record: Optional[_ConnectionRecord]
1240
1241 @property
1242 def driver_connection(self) -> Optional[Any]: # type: ignore[override] # mypy#4125 # noqa: E501
1243 if self._connection_record is None:
1244 return None
1245 return self._connection_record.driver_connection
1246
1247 @property
1248 @util.deprecated(
1249 "2.0",
1250 "The _ConnectionFairy.connection attribute is deprecated; "
1251 "please use 'driver_connection'",
1252 )
1253 def connection(self) -> DBAPIConnection:
1254 return self.dbapi_connection
1255
1256 @classmethod
1257 def _checkout(
1258 cls,
1259 pool: Pool,
1260 threadconns: Optional[threading.local] = None,
1261 fairy: Optional[_ConnectionFairy] = None,
1262 ) -> _ConnectionFairy:
1263 if not fairy:
1264 fairy = _ConnectionRecord.checkout(pool)
1265
1266 if threadconns is not None:
1267 threadconns.current = weakref.ref(fairy)
1268
1269 assert (
1270 fairy._connection_record is not None
1271 ), "can't 'checkout' a detached connection fairy"
1272 assert (
1273 fairy.dbapi_connection is not None
1274 ), "can't 'checkout' an invalidated connection fairy"
1275
1276 fairy._counter += 1
1277 if (
1278 not pool.dispatch.checkout and not pool._pre_ping
1279 ) or fairy._counter != 1:
1280 return fairy
1281
1282 # Pool listeners can trigger a reconnection on checkout, as well
1283 # as the pre-pinger.
1284 # there are three attempts made here, but note that if the database
1285 # is not accessible from a connection standpoint, those won't proceed
1286 # here.
1287
1288 attempts = 2
1289
1290 while attempts > 0:
1291 connection_is_fresh = fairy._connection_record.fresh
1292 fairy._connection_record.fresh = False
1293 try:
1294 if pool._pre_ping:
1295 if not connection_is_fresh:
1296 if fairy._echo:
1297 pool.logger.debug(
1298 "Pool pre-ping on connection %s",
1299 fairy.dbapi_connection,
1300 )
1301 result = pool._dialect._do_ping_w_event(
1302 fairy.dbapi_connection
1303 )
1304 if not result:
1305 if fairy._echo:
1306 pool.logger.debug(
1307 "Pool pre-ping on connection %s failed, "
1308 "will invalidate pool",
1309 fairy.dbapi_connection,
1310 )
1311 raise exc.InvalidatePoolError()
1312 elif fairy._echo:
1313 pool.logger.debug(
1314 "Connection %s is fresh, skipping pre-ping",
1315 fairy.dbapi_connection,
1316 )
1317
1318 pool.dispatch.checkout(
1319 fairy.dbapi_connection, fairy._connection_record, fairy
1320 )
1321 return fairy
1322 except exc.DisconnectionError as e:
1323 if e.invalidate_pool:
1324 pool.logger.info(
1325 "Disconnection detected on checkout, "
1326 "invalidating all pooled connections prior to "
1327 "current timestamp (reason: %r)",
1328 e,
1329 )
1330 fairy._connection_record.invalidate(e)
1331 pool._invalidate(fairy, e, _checkin=False)
1332 else:
1333 pool.logger.info(
1334 "Disconnection detected on checkout, "
1335 "invalidating individual connection %s (reason: %r)",
1336 fairy.dbapi_connection,
1337 e,
1338 )
1339 fairy._connection_record.invalidate(e)
1340 try:
1341 fairy.dbapi_connection = (
1342 fairy._connection_record.get_connection()
1343 )
1344 except BaseException as err:
1345 with util.safe_reraise():
1346 fairy._connection_record._checkin_failed(
1347 err,
1348 _fairy_was_created=True,
1349 )
1350
1351 # prevent _ConnectionFairy from being carried
1352 # in the stack trace. Do this after the
1353 # connection record has been checked in, so that
1354 # if the del triggers a finalize fairy, it won't
1355 # try to checkin a second time.
1356 del fairy
1357
1358 # never called, this is for code linters
1359 raise
1360
1361 attempts -= 1
1362 except BaseException as be_outer:
1363 with util.safe_reraise():
1364 rec = fairy._connection_record
1365 if rec is not None:
1366 rec._checkin_failed(
1367 be_outer,
1368 _fairy_was_created=True,
1369 )
1370
1371 # prevent _ConnectionFairy from being carried
1372 # in the stack trace, see above
1373 del fairy
1374
1375 # never called, this is for code linters
1376 raise
1377
1378 pool.logger.info("Reconnection attempts exhausted on checkout")
1379 fairy.invalidate()
1380 raise exc.InvalidRequestError("This connection is closed")
1381
1382 def _checkout_existing(self) -> _ConnectionFairy:
1383 return _ConnectionFairy._checkout(self._pool, fairy=self)
1384
1385 def _checkin(self, transaction_was_reset: bool = False) -> None:
1386 _finalize_fairy(
1387 self.dbapi_connection,
1388 self._connection_record,
1389 self._pool,
1390 None,
1391 self._echo,
1392 transaction_was_reset=transaction_was_reset,
1393 fairy=self,
1394 )
1395
1396 def _close(self) -> None:
1397 self._checkin()
1398
1399 def _reset(
1400 self,
1401 pool: Pool,
1402 transaction_was_reset: bool,
1403 terminate_only: bool,
1404 asyncio_safe: bool,
1405 ) -> None:
1406 if pool.dispatch.reset:
1407 pool.dispatch.reset(
1408 self.dbapi_connection,
1409 self._connection_record,
1410 PoolResetState(
1411 transaction_was_reset=transaction_was_reset,
1412 terminate_only=terminate_only,
1413 asyncio_safe=asyncio_safe,
1414 ),
1415 )
1416
1417 if not asyncio_safe:
1418 return
1419
1420 if pool._reset_on_return is reset_rollback:
1421 if transaction_was_reset:
1422 if self._echo:
1423 pool.logger.debug(
1424 "Connection %s reset, transaction already reset",
1425 self.dbapi_connection,
1426 )
1427 else:
1428 if self._echo:
1429 pool.logger.debug(
1430 "Connection %s rollback-on-return",
1431 self.dbapi_connection,
1432 )
1433 pool._dialect.do_rollback(self)
1434 elif pool._reset_on_return is reset_commit:
1435 if self._echo:
1436 pool.logger.debug(
1437 "Connection %s commit-on-return",
1438 self.dbapi_connection,
1439 )
1440 pool._dialect.do_commit(self)
1441
1442 @property
1443 def _logger(self) -> log._IdentifiedLoggerType:
1444 return self._pool.logger
1445
1446 @property
1447 def is_valid(self) -> bool:
1448 return self.dbapi_connection is not None
1449
1450 @property
1451 def is_detached(self) -> bool:
1452 return self._connection_record is None
1453
1454 @util.ro_memoized_property
1455 def info(self) -> _InfoType:
1456 if self._connection_record is None:
1457 return {}
1458 else:
1459 return self._connection_record.info
1460
1461 @util.ro_non_memoized_property
1462 def record_info(self) -> Optional[_InfoType]:
1463 if self._connection_record is None:
1464 return None
1465 else:
1466 return self._connection_record.record_info
1467
1468 def invalidate(
1469 self, e: Optional[BaseException] = None, soft: bool = False
1470 ) -> None:
1471 if self.dbapi_connection is None:
1472 util.warn("Can't invalidate an already-closed connection.")
1473 return
1474 if self._connection_record:
1475 self._connection_record.invalidate(e=e, soft=soft)
1476 if not soft:
1477 # prevent any rollback / reset actions etc. on
1478 # the connection
1479 self.dbapi_connection = None # type: ignore
1480
1481 # finalize
1482 self._checkin()
1483
1484 def cursor(self, *args: Any, **kwargs: Any) -> DBAPICursor:
1485 assert self.dbapi_connection is not None
1486 return self.dbapi_connection.cursor(*args, **kwargs)
1487
1488 def __getattr__(self, key: str) -> Any:
1489 return getattr(self.dbapi_connection, key)
1490
1491 def detach(self) -> None:
1492 if self._connection_record is not None:
1493 rec = self._connection_record
1494 rec.fairy_ref = None
1495 rec.dbapi_connection = None
1496 # TODO: should this be _return_conn?
1497 self._pool._do_return_conn(self._connection_record)
1498
1499 # can't get the descriptor assignment to work here
1500 # in pylance. mypy is OK w/ it
1501 self.info = self.info.copy() # type: ignore
1502
1503 self._connection_record = None
1504
1505 if self._pool.dispatch.detach:
1506 self._pool.dispatch.detach(self.dbapi_connection, rec)
1507
1508 def close(self) -> None:
1509 self._counter -= 1
1510 if self._counter == 0:
1511 self._checkin()
1512
1513 def _close_special(self, transaction_reset: bool = False) -> None:
1514 self._counter -= 1
1515 if self._counter == 0:
1516 self._checkin(transaction_was_reset=transaction_reset)