1# pool/base.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8
9"""Base constructs for connection pools."""
10
11from __future__ import annotations
12
13from collections import deque
14import dataclasses
15from enum import Enum
16import threading
17import time
18import typing
19from typing import Any
20from typing import Callable
21from typing import cast
22from typing import Deque
23from typing import Dict
24from typing import List
25from typing import Optional
26from typing import Protocol
27from typing import Tuple
28from typing import TYPE_CHECKING
29from typing import Union
30import weakref
31
32from .. import event
33from .. import exc
34from .. import log
35from .. import util
36from ..util.typing import Literal
37
38if TYPE_CHECKING:
39 from ..engine.interfaces import DBAPIConnection
40 from ..engine.interfaces import DBAPICursor
41 from ..engine.interfaces import Dialect
42 from ..event import _DispatchCommon
43 from ..event import _ListenerFnType
44 from ..event import dispatcher
45 from ..sql._typing import _InfoType
46
47
48@dataclasses.dataclass(frozen=True)
49class PoolResetState:
50 """describes the state of a DBAPI connection as it is being passed to
51 the :meth:`.PoolEvents.reset` connection pool event.
52
53 .. versionadded:: 2.0.0b3
54
55 """
56
57 __slots__ = ("transaction_was_reset", "terminate_only", "asyncio_safe")
58
59 transaction_was_reset: bool
60 """Indicates if the transaction on the DBAPI connection was already
61 essentially "reset" back by the :class:`.Connection` object.
62
63 This boolean is True if the :class:`.Connection` had transactional
64 state present upon it, which was then not closed using the
65 :meth:`.Connection.rollback` or :meth:`.Connection.commit` method;
66 instead, the transaction was closed inline within the
67 :meth:`.Connection.close` method so is guaranteed to remain non-present
68 when this event is reached.
69
70 """
71
72 terminate_only: bool
73 """indicates if the connection is to be immediately terminated and
74 not checked in to the pool.
75
76 This occurs for connections that were invalidated, as well as asyncio
77 connections that were not cleanly handled by the calling code that
78 are instead being garbage collected. In the latter case,
79 operations can't be safely run on asyncio connections within garbage
80 collection as there is not necessarily an event loop present.
81
82 """
83
84 asyncio_safe: bool
85 """Indicates if the reset operation is occurring within a scope where
86 an enclosing event loop is expected to be present for asyncio applications.
87
88 Will be False in the case that the connection is being garbage collected.
89
90 """
91
92
93class ResetStyle(Enum):
94 """Describe options for "reset on return" behaviors."""
95
96 reset_rollback = 0
97 reset_commit = 1
98 reset_none = 2
99
100
101_ResetStyleArgType = Union[
102 ResetStyle,
103 Literal[True, None, False, "commit", "rollback"],
104]
105reset_rollback, reset_commit, reset_none = list(ResetStyle)
106
107
108class _ConnDialect:
109 """partial implementation of :class:`.Dialect`
110 which provides DBAPI connection methods.
111
112 When a :class:`_pool.Pool` is combined with an :class:`_engine.Engine`,
113 the :class:`_engine.Engine` replaces this with its own
114 :class:`.Dialect`.
115
116 """
117
118 is_async = False
119 has_terminate = False
120
121 def do_rollback(self, dbapi_connection: PoolProxiedConnection) -> None:
122 dbapi_connection.rollback()
123
124 def do_commit(self, dbapi_connection: PoolProxiedConnection) -> None:
125 dbapi_connection.commit()
126
127 def do_terminate(self, dbapi_connection: DBAPIConnection) -> None:
128 dbapi_connection.close()
129
130 def do_close(self, dbapi_connection: DBAPIConnection) -> None:
131 dbapi_connection.close()
132
133 def _do_ping_w_event(self, dbapi_connection: DBAPIConnection) -> bool:
134 raise NotImplementedError(
135 "The ping feature requires that a dialect is "
136 "passed to the connection pool."
137 )
138
139 def get_driver_connection(self, connection: DBAPIConnection) -> Any:
140 return connection
141
142
143class _AsyncConnDialect(_ConnDialect):
144 is_async = True
145
146
147class _CreatorFnType(Protocol):
148 def __call__(self) -> DBAPIConnection: ...
149
150
151class _CreatorWRecFnType(Protocol):
152 def __call__(self, rec: ConnectionPoolEntry) -> DBAPIConnection: ...
153
154
155class Pool(log.Identified, event.EventTarget):
156 """Abstract base class for connection pools."""
157
158 dispatch: dispatcher[Pool]
159 echo: log._EchoFlagType
160
161 _orig_logging_name: Optional[str]
162 _dialect: Union[_ConnDialect, Dialect] = _ConnDialect()
163 _creator_arg: Union[_CreatorFnType, _CreatorWRecFnType]
164 _invoke_creator: _CreatorWRecFnType
165 _invalidate_time: float
166
167 def __init__(
168 self,
169 creator: Union[_CreatorFnType, _CreatorWRecFnType],
170 recycle: int = -1,
171 echo: log._EchoFlagType = None,
172 logging_name: Optional[str] = None,
173 reset_on_return: _ResetStyleArgType = True,
174 events: Optional[List[Tuple[_ListenerFnType, str]]] = None,
175 dialect: Optional[Union[_ConnDialect, Dialect]] = None,
176 pre_ping: bool = False,
177 _dispatch: Optional[_DispatchCommon[Pool]] = None,
178 ):
179 """
180 Construct a Pool.
181
182 :param creator: a callable function that returns a DB-API
183 connection object. The function will be called with
184 parameters.
185
186 :param recycle: If set to a value other than -1, number of
187 seconds between connection recycling, which means upon
188 checkout, if this timeout is surpassed the connection will be
189 closed and replaced with a newly opened connection. Defaults to -1.
190
191 :param logging_name: String identifier which will be used within
192 the "name" field of logging records generated within the
193 "sqlalchemy.pool" logger. Defaults to a hexstring of the object's
194 id.
195
196 :param echo: if True, the connection pool will log
197 informational output such as when connections are invalidated
198 as well as when connections are recycled to the default log handler,
199 which defaults to ``sys.stdout`` for output.. If set to the string
200 ``"debug"``, the logging will include pool checkouts and checkins.
201
202 The :paramref:`_pool.Pool.echo` parameter can also be set from the
203 :func:`_sa.create_engine` call by using the
204 :paramref:`_sa.create_engine.echo_pool` parameter.
205
206 .. seealso::
207
208 :ref:`dbengine_logging` - further detail on how to configure
209 logging.
210
211 :param reset_on_return: Determine steps to take on
212 connections as they are returned to the pool, which were
213 not otherwise handled by a :class:`_engine.Connection`.
214 Available from :func:`_sa.create_engine` via the
215 :paramref:`_sa.create_engine.pool_reset_on_return` parameter.
216
217 :paramref:`_pool.Pool.reset_on_return` can have any of these values:
218
219 * ``"rollback"`` - call rollback() on the connection,
220 to release locks and transaction resources.
221 This is the default value. The vast majority
222 of use cases should leave this value set.
223 * ``"commit"`` - call commit() on the connection,
224 to release locks and transaction resources.
225 A commit here may be desirable for databases that
226 cache query plans if a commit is emitted,
227 such as Microsoft SQL Server. However, this
228 value is more dangerous than 'rollback' because
229 any data changes present on the transaction
230 are committed unconditionally.
231 * ``None`` - don't do anything on the connection.
232 This setting may be appropriate if the database / DBAPI
233 works in pure "autocommit" mode at all times, or if
234 a custom reset handler is established using the
235 :meth:`.PoolEvents.reset` event handler.
236
237 * ``True`` - same as 'rollback', this is here for
238 backwards compatibility.
239 * ``False`` - same as None, this is here for
240 backwards compatibility.
241
242 For further customization of reset on return, the
243 :meth:`.PoolEvents.reset` event hook may be used which can perform
244 any connection activity desired on reset.
245
246 .. seealso::
247
248 :ref:`pool_reset_on_return`
249
250 :meth:`.PoolEvents.reset`
251
252 :param events: a list of 2-tuples, each of the form
253 ``(callable, target)`` which will be passed to :func:`.event.listen`
254 upon construction. Provided here so that event listeners
255 can be assigned via :func:`_sa.create_engine` before dialect-level
256 listeners are applied.
257
258 :param dialect: a :class:`.Dialect` that will handle the job
259 of calling rollback(), close(), or commit() on DBAPI connections.
260 If omitted, a built-in "stub" dialect is used. Applications that
261 make use of :func:`_sa.create_engine` should not use this parameter
262 as it is handled by the engine creation strategy.
263
264 :param pre_ping: if True, the pool will emit a "ping" (typically
265 "SELECT 1", but is dialect-specific) on the connection
266 upon checkout, to test if the connection is alive or not. If not,
267 the connection is transparently re-connected and upon success, all
268 other pooled connections established prior to that timestamp are
269 invalidated. Requires that a dialect is passed as well to
270 interpret the disconnection error.
271
272 """
273 if logging_name:
274 self.logging_name = self._orig_logging_name = logging_name
275 else:
276 self._orig_logging_name = None
277
278 log.instance_logger(self, echoflag=echo)
279 self._creator = creator
280 self._recycle = recycle
281 self._invalidate_time = 0
282 self._pre_ping = pre_ping
283 self._reset_on_return = util.parse_user_argument_for_enum(
284 reset_on_return,
285 {
286 ResetStyle.reset_rollback: ["rollback", True],
287 ResetStyle.reset_none: ["none", None, False],
288 ResetStyle.reset_commit: ["commit"],
289 },
290 "reset_on_return",
291 )
292
293 self.echo = echo
294
295 if _dispatch:
296 self.dispatch._update(_dispatch, only_propagate=False)
297 if dialect:
298 self._dialect = dialect
299 if events:
300 for fn, target in events:
301 event.listen(self, target, fn)
302
303 @util.hybridproperty
304 def _is_asyncio(self) -> bool:
305 return self._dialect.is_async
306
307 @property
308 def _creator(self) -> Union[_CreatorFnType, _CreatorWRecFnType]:
309 return self._creator_arg
310
311 @_creator.setter
312 def _creator(
313 self, creator: Union[_CreatorFnType, _CreatorWRecFnType]
314 ) -> None:
315 self._creator_arg = creator
316
317 # mypy seems to get super confused assigning functions to
318 # attributes
319 self._invoke_creator = self._should_wrap_creator(creator)
320
321 @_creator.deleter
322 def _creator(self) -> None:
323 # needed for mock testing
324 del self._creator_arg
325 del self._invoke_creator
326
327 def _should_wrap_creator(
328 self, creator: Union[_CreatorFnType, _CreatorWRecFnType]
329 ) -> _CreatorWRecFnType:
330 """Detect if creator accepts a single argument, or is sent
331 as a legacy style no-arg function.
332
333 """
334
335 try:
336 argspec = util.get_callable_argspec(self._creator, no_self=True)
337 except TypeError:
338 creator_fn = cast(_CreatorFnType, creator)
339 return lambda rec: creator_fn()
340
341 if argspec.defaults is not None:
342 defaulted = len(argspec.defaults)
343 else:
344 defaulted = 0
345 positionals = len(argspec[0]) - defaulted
346
347 # look for the exact arg signature that DefaultStrategy
348 # sends us
349 if (argspec[0], argspec[3]) == (["connection_record"], (None,)):
350 return cast(_CreatorWRecFnType, creator)
351 # or just a single positional
352 elif positionals == 1:
353 return cast(_CreatorWRecFnType, creator)
354 # all other cases, just wrap and assume legacy "creator" callable
355 # thing
356 else:
357 creator_fn = cast(_CreatorFnType, creator)
358 return lambda rec: creator_fn()
359
360 def _close_connection(
361 self, connection: DBAPIConnection, *, terminate: bool = False
362 ) -> None:
363 self.logger.debug(
364 "%s connection %r",
365 "Hard-closing" if terminate else "Closing",
366 connection,
367 )
368 try:
369 if terminate:
370 self._dialect.do_terminate(connection)
371 else:
372 self._dialect.do_close(connection)
373 except BaseException as e:
374 self.logger.error(
375 f"Exception {'terminating' if terminate else 'closing'} "
376 f"connection %r",
377 connection,
378 exc_info=True,
379 )
380 if not isinstance(e, Exception):
381 raise
382
383 def _create_connection(self) -> ConnectionPoolEntry:
384 """Called by subclasses to create a new ConnectionRecord."""
385
386 return _ConnectionRecord(self)
387
388 def _invalidate(
389 self,
390 connection: PoolProxiedConnection,
391 exception: Optional[BaseException] = None,
392 _checkin: bool = True,
393 ) -> None:
394 """Mark all connections established within the generation
395 of the given connection as invalidated.
396
397 If this pool's last invalidate time is before when the given
398 connection was created, update the timestamp til now. Otherwise,
399 no action is performed.
400
401 Connections with a start time prior to this pool's invalidation
402 time will be recycled upon next checkout.
403 """
404 rec = getattr(connection, "_connection_record", None)
405 if not rec or self._invalidate_time < rec.starttime:
406 self._invalidate_time = time.time()
407 if _checkin and getattr(connection, "is_valid", False):
408 connection.invalidate(exception)
409
410 def recreate(self) -> Pool:
411 """Return a new :class:`_pool.Pool`, of the same class as this one
412 and configured with identical creation arguments.
413
414 This method is used in conjunction with :meth:`dispose`
415 to close out an entire :class:`_pool.Pool` and create a new one in
416 its place.
417
418 """
419
420 raise NotImplementedError()
421
422 def dispose(self) -> None:
423 """Dispose of this pool.
424
425 This method leaves the possibility of checked-out connections
426 remaining open, as it only affects connections that are
427 idle in the pool.
428
429 .. seealso::
430
431 :meth:`Pool.recreate`
432
433 """
434
435 raise NotImplementedError()
436
437 def connect(self) -> PoolProxiedConnection:
438 """Return a DBAPI connection from the pool.
439
440 The connection is instrumented such that when its
441 ``close()`` method is called, the connection will be returned to
442 the pool.
443
444 """
445 return _ConnectionFairy._checkout(self)
446
447 def _return_conn(self, record: ConnectionPoolEntry) -> None:
448 """Given a _ConnectionRecord, return it to the :class:`_pool.Pool`.
449
450 This method is called when an instrumented DBAPI connection
451 has its ``close()`` method called.
452
453 """
454 self._do_return_conn(record)
455
456 def _do_get(self) -> ConnectionPoolEntry:
457 """Implementation for :meth:`get`, supplied by subclasses."""
458
459 raise NotImplementedError()
460
461 def _do_return_conn(self, record: ConnectionPoolEntry) -> None:
462 """Implementation for :meth:`return_conn`, supplied by subclasses."""
463
464 raise NotImplementedError()
465
466 def status(self) -> str:
467 """Returns a brief description of the state of this pool."""
468 raise NotImplementedError()
469
470
471class ManagesConnection:
472 """Common base for the two connection-management interfaces
473 :class:`.PoolProxiedConnection` and :class:`.ConnectionPoolEntry`.
474
475 These two objects are typically exposed in the public facing API
476 via the connection pool event hooks, documented at :class:`.PoolEvents`.
477
478 .. versionadded:: 2.0
479
480 """
481
482 __slots__ = ()
483
484 dbapi_connection: Optional[DBAPIConnection]
485 """A reference to the actual DBAPI connection being tracked.
486
487 This is a :pep:`249`-compliant object that for traditional sync-style
488 dialects is provided by the third-party
489 DBAPI implementation in use. For asyncio dialects, the implementation
490 is typically an adapter object provided by the SQLAlchemy dialect
491 itself; the underlying asyncio object is available via the
492 :attr:`.ManagesConnection.driver_connection` attribute.
493
494 SQLAlchemy's interface for the DBAPI connection is based on the
495 :class:`.DBAPIConnection` protocol object
496
497 .. seealso::
498
499 :attr:`.ManagesConnection.driver_connection`
500
501 :ref:`faq_dbapi_connection`
502
503 """
504
505 driver_connection: Optional[Any]
506 """The "driver level" connection object as used by the Python
507 DBAPI or database driver.
508
509 For traditional :pep:`249` DBAPI implementations, this object will
510 be the same object as that of
511 :attr:`.ManagesConnection.dbapi_connection`. For an asyncio database
512 driver, this will be the ultimate "connection" object used by that
513 driver, such as the ``asyncpg.Connection`` object which will not have
514 standard pep-249 methods.
515
516 .. versionadded:: 1.4.24
517
518 .. seealso::
519
520 :attr:`.ManagesConnection.dbapi_connection`
521
522 :ref:`faq_dbapi_connection`
523
524 """
525
526 @util.ro_memoized_property
527 def info(self) -> _InfoType:
528 """Info dictionary associated with the underlying DBAPI connection
529 referred to by this :class:`.ManagesConnection` instance, allowing
530 user-defined data to be associated with the connection.
531
532 The data in this dictionary is persistent for the lifespan
533 of the DBAPI connection itself, including across pool checkins
534 and checkouts. When the connection is invalidated
535 and replaced with a new one, this dictionary is cleared.
536
537 For a :class:`.PoolProxiedConnection` instance that's not associated
538 with a :class:`.ConnectionPoolEntry`, such as if it were detached, the
539 attribute returns a dictionary that is local to that
540 :class:`.ConnectionPoolEntry`. Therefore the
541 :attr:`.ManagesConnection.info` attribute will always provide a Python
542 dictionary.
543
544 .. seealso::
545
546 :attr:`.ManagesConnection.record_info`
547
548
549 """
550 raise NotImplementedError()
551
552 @util.ro_memoized_property
553 def record_info(self) -> Optional[_InfoType]:
554 """Persistent info dictionary associated with this
555 :class:`.ManagesConnection`.
556
557 Unlike the :attr:`.ManagesConnection.info` dictionary, the lifespan
558 of this dictionary is that of the :class:`.ConnectionPoolEntry`
559 which owns it; therefore this dictionary will persist across
560 reconnects and connection invalidation for a particular entry
561 in the connection pool.
562
563 For a :class:`.PoolProxiedConnection` instance that's not associated
564 with a :class:`.ConnectionPoolEntry`, such as if it were detached, the
565 attribute returns None. Contrast to the :attr:`.ManagesConnection.info`
566 dictionary which is never None.
567
568
569 .. seealso::
570
571 :attr:`.ManagesConnection.info`
572
573 """
574 raise NotImplementedError()
575
576 def invalidate(
577 self, e: Optional[BaseException] = None, soft: bool = False
578 ) -> None:
579 """Mark the managed connection as invalidated.
580
581 :param e: an exception object indicating a reason for the invalidation.
582
583 :param soft: if True, the connection isn't closed; instead, this
584 connection will be recycled on next checkout.
585
586 .. seealso::
587
588 :ref:`pool_connection_invalidation`
589
590
591 """
592 raise NotImplementedError()
593
594
595class ConnectionPoolEntry(ManagesConnection):
596 """Interface for the object that maintains an individual database
597 connection on behalf of a :class:`_pool.Pool` instance.
598
599 The :class:`.ConnectionPoolEntry` object represents the long term
600 maintainance of a particular connection for a pool, including expiring or
601 invalidating that connection to have it replaced with a new one, which will
602 continue to be maintained by that same :class:`.ConnectionPoolEntry`
603 instance. Compared to :class:`.PoolProxiedConnection`, which is the
604 short-term, per-checkout connection manager, this object lasts for the
605 lifespan of a particular "slot" within a connection pool.
606
607 The :class:`.ConnectionPoolEntry` object is mostly visible to public-facing
608 API code when it is delivered to connection pool event hooks, such as
609 :meth:`_events.PoolEvents.connect` and :meth:`_events.PoolEvents.checkout`.
610
611 .. versionadded:: 2.0 :class:`.ConnectionPoolEntry` provides the public
612 facing interface for the :class:`._ConnectionRecord` internal class.
613
614 """
615
616 __slots__ = ()
617
618 @property
619 def in_use(self) -> bool:
620 """Return True the connection is currently checked out"""
621
622 raise NotImplementedError()
623
624 def close(self) -> None:
625 """Close the DBAPI connection managed by this connection pool entry."""
626 raise NotImplementedError()
627
628
629class _ConnectionRecord(ConnectionPoolEntry):
630 """Maintains a position in a connection pool which references a pooled
631 connection.
632
633 This is an internal object used by the :class:`_pool.Pool` implementation
634 to provide context management to a DBAPI connection maintained by
635 that :class:`_pool.Pool`. The public facing interface for this class
636 is described by the :class:`.ConnectionPoolEntry` class. See that
637 class for public API details.
638
639 .. seealso::
640
641 :class:`.ConnectionPoolEntry`
642
643 :class:`.PoolProxiedConnection`
644
645 """
646
647 __slots__ = (
648 "__pool",
649 "fairy_ref",
650 "finalize_callback",
651 "fresh",
652 "starttime",
653 "dbapi_connection",
654 "__weakref__",
655 "__dict__",
656 )
657
658 finalize_callback: Deque[Callable[[DBAPIConnection], None]]
659 fresh: bool
660 fairy_ref: Optional[weakref.ref[_ConnectionFairy]]
661 starttime: float
662
663 def __init__(self, pool: Pool, connect: bool = True):
664 self.fresh = False
665 self.fairy_ref = None
666 self.starttime = 0
667 self.dbapi_connection = None
668
669 self.__pool = pool
670 if connect:
671 self.__connect()
672 self.finalize_callback = deque()
673
674 dbapi_connection: Optional[DBAPIConnection]
675
676 @property
677 def driver_connection(self) -> Optional[Any]: # type: ignore[override] # mypy#4125 # noqa: E501
678 if self.dbapi_connection is None:
679 return None
680 else:
681 return self.__pool._dialect.get_driver_connection(
682 self.dbapi_connection
683 )
684
685 @property
686 @util.deprecated(
687 "2.0",
688 "The _ConnectionRecord.connection attribute is deprecated; "
689 "please use 'driver_connection'",
690 )
691 def connection(self) -> Optional[DBAPIConnection]:
692 return self.dbapi_connection
693
694 _soft_invalidate_time: float = 0
695
696 @util.ro_memoized_property
697 def info(self) -> _InfoType:
698 return {}
699
700 @util.ro_memoized_property
701 def record_info(self) -> Optional[_InfoType]:
702 return {}
703
704 @classmethod
705 def checkout(cls, pool: Pool) -> _ConnectionFairy:
706 if TYPE_CHECKING:
707 rec = cast(_ConnectionRecord, pool._do_get())
708 else:
709 rec = pool._do_get()
710
711 try:
712 dbapi_connection = rec.get_connection()
713 except BaseException as err:
714 with util.safe_reraise():
715 rec._checkin_failed(err, _fairy_was_created=False)
716
717 # not reached, for code linters only
718 raise
719
720 echo = pool._should_log_debug()
721 fairy = _ConnectionFairy(pool, dbapi_connection, rec, echo)
722
723 rec.fairy_ref = ref = weakref.ref(
724 fairy,
725 lambda ref: (
726 _finalize_fairy(
727 None, rec, pool, ref, echo, transaction_was_reset=False
728 )
729 if _finalize_fairy is not None
730 else None
731 ),
732 )
733 _strong_ref_connection_records[ref] = rec
734 if echo:
735 pool.logger.debug(
736 "Connection %r checked out from pool", dbapi_connection
737 )
738 return fairy
739
740 def _checkin_failed(
741 self, err: BaseException, _fairy_was_created: bool = True
742 ) -> None:
743 self.invalidate(e=err)
744 self.checkin(
745 _fairy_was_created=_fairy_was_created,
746 )
747
748 def checkin(self, _fairy_was_created: bool = True) -> None:
749 if self.fairy_ref is None and _fairy_was_created:
750 # _fairy_was_created is False for the initial get connection phase;
751 # meaning there was no _ConnectionFairy and we must unconditionally
752 # do a checkin.
753 #
754 # otherwise, if fairy_was_created==True, if fairy_ref is None here
755 # that means we were checked in already, so this looks like
756 # a double checkin.
757 util.warn("Double checkin attempted on %s" % self)
758 return
759 self.fairy_ref = None
760 connection = self.dbapi_connection
761 pool = self.__pool
762 while self.finalize_callback:
763 finalizer = self.finalize_callback.pop()
764 if connection is not None:
765 finalizer(connection)
766 if pool.dispatch.checkin:
767 pool.dispatch.checkin(connection, self)
768
769 pool._return_conn(self)
770
771 @property
772 def in_use(self) -> bool:
773 return self.fairy_ref is not None
774
775 @property
776 def last_connect_time(self) -> float:
777 return self.starttime
778
779 def close(self) -> None:
780 if self.dbapi_connection is not None:
781 self.__close()
782
783 def invalidate(
784 self, e: Optional[BaseException] = None, soft: bool = False
785 ) -> None:
786 # already invalidated
787 if self.dbapi_connection is None:
788 return
789 if soft:
790 self.__pool.dispatch.soft_invalidate(
791 self.dbapi_connection, self, e
792 )
793 else:
794 self.__pool.dispatch.invalidate(self.dbapi_connection, self, e)
795 if e is not None:
796 self.__pool.logger.info(
797 "%sInvalidate connection %r (reason: %s:%s)",
798 "Soft " if soft else "",
799 self.dbapi_connection,
800 e.__class__.__name__,
801 e,
802 )
803 else:
804 self.__pool.logger.info(
805 "%sInvalidate connection %r",
806 "Soft " if soft else "",
807 self.dbapi_connection,
808 )
809
810 if soft:
811 self._soft_invalidate_time = time.time()
812 else:
813 self.__close(terminate=True)
814 self.dbapi_connection = None
815
816 def get_connection(self) -> DBAPIConnection:
817 recycle = False
818
819 # NOTE: the various comparisons here are assuming that measurable time
820 # passes between these state changes. however, time.time() is not
821 # guaranteed to have sub-second precision. comparisons of
822 # "invalidation time" to "starttime" should perhaps use >= so that the
823 # state change can take place assuming no measurable time has passed,
824 # however this does not guarantee correct behavior here as if time
825 # continues to not pass, it will try to reconnect repeatedly until
826 # these timestamps diverge, so in that sense using > is safer. Per
827 # https://stackoverflow.com/a/1938096/34549, Windows time.time() may be
828 # within 16 milliseconds accuracy, so unit tests for connection
829 # invalidation need a sleep of at least this long between initial start
830 # time and invalidation for the logic below to work reliably.
831
832 if self.dbapi_connection is None:
833 self.info.clear()
834 self.__connect()
835 elif (
836 self.__pool._recycle > -1
837 and time.time() - self.starttime > self.__pool._recycle
838 ):
839 self.__pool.logger.info(
840 "Connection %r exceeded timeout; recycling",
841 self.dbapi_connection,
842 )
843 recycle = True
844 elif self.__pool._invalidate_time > self.starttime:
845 self.__pool.logger.info(
846 "Connection %r invalidated due to pool invalidation; "
847 + "recycling",
848 self.dbapi_connection,
849 )
850 recycle = True
851 elif self._soft_invalidate_time > self.starttime:
852 self.__pool.logger.info(
853 "Connection %r invalidated due to local soft invalidation; "
854 + "recycling",
855 self.dbapi_connection,
856 )
857 recycle = True
858
859 if recycle:
860 self.__close(terminate=True)
861 self.info.clear()
862
863 self.__connect()
864
865 assert self.dbapi_connection is not None
866 return self.dbapi_connection
867
868 def _is_hard_or_soft_invalidated(self) -> bool:
869 return (
870 self.dbapi_connection is None
871 or self.__pool._invalidate_time > self.starttime
872 or (self._soft_invalidate_time > self.starttime)
873 )
874
875 def __close(self, *, terminate: bool = False) -> None:
876 self.finalize_callback.clear()
877 if self.__pool.dispatch.close:
878 self.__pool.dispatch.close(self.dbapi_connection, self)
879 assert self.dbapi_connection is not None
880 self.__pool._close_connection(
881 self.dbapi_connection, terminate=terminate
882 )
883 self.dbapi_connection = None
884
885 def __connect(self) -> None:
886 pool = self.__pool
887
888 # ensure any existing connection is removed, so that if
889 # creator fails, this attribute stays None
890 self.dbapi_connection = None
891 try:
892 self.starttime = time.time()
893 self.dbapi_connection = connection = pool._invoke_creator(self)
894 pool.logger.debug("Created new connection %r", connection)
895 self.fresh = True
896 except BaseException as e:
897 with util.safe_reraise():
898 pool.logger.debug("Error on connect(): %s", e)
899 else:
900 # in SQLAlchemy 1.4 the first_connect event is not used by
901 # the engine, so this will usually not be set
902 if pool.dispatch.first_connect:
903 pool.dispatch.first_connect.for_modify(
904 pool.dispatch
905 ).exec_once_unless_exception(self.dbapi_connection, self)
906
907 # init of the dialect now takes place within the connect
908 # event, so ensure a mutex is used on the first run
909 pool.dispatch.connect.for_modify(
910 pool.dispatch
911 )._exec_w_sync_on_first_run(self.dbapi_connection, self)
912
913
914def _finalize_fairy(
915 dbapi_connection: Optional[DBAPIConnection],
916 connection_record: Optional[_ConnectionRecord],
917 pool: Pool,
918 ref: Optional[
919 weakref.ref[_ConnectionFairy]
920 ], # this is None when called directly, not by the gc
921 echo: Optional[log._EchoFlagType],
922 transaction_was_reset: bool = False,
923 fairy: Optional[_ConnectionFairy] = None,
924) -> None:
925 """Cleanup for a :class:`._ConnectionFairy` whether or not it's already
926 been garbage collected.
927
928 When using an async dialect no IO can happen here (without using
929 a dedicated thread), since this is called outside the greenlet
930 context and with an already running loop. In this case function
931 will only log a message and raise a warning.
932 """
933
934 is_gc_cleanup = ref is not None
935
936 if is_gc_cleanup:
937 assert ref is not None
938 _strong_ref_connection_records.pop(ref, None)
939 assert connection_record is not None
940 if connection_record.fairy_ref is not ref:
941 return
942 assert dbapi_connection is None
943 dbapi_connection = connection_record.dbapi_connection
944
945 elif fairy:
946 _strong_ref_connection_records.pop(weakref.ref(fairy), None)
947
948 # null pool is not _is_asyncio but can be used also with async dialects
949 dont_restore_gced = pool._dialect.is_async
950
951 if dont_restore_gced:
952 detach = connection_record is None or is_gc_cleanup
953 can_manipulate_connection = not is_gc_cleanup
954 can_close_or_terminate_connection = (
955 not pool._dialect.is_async or pool._dialect.has_terminate
956 )
957 requires_terminate_for_close = (
958 pool._dialect.is_async and pool._dialect.has_terminate
959 )
960
961 else:
962 detach = connection_record is None
963 can_manipulate_connection = can_close_or_terminate_connection = True
964 requires_terminate_for_close = False
965
966 if dbapi_connection is not None:
967 if connection_record and echo:
968 pool.logger.debug(
969 "Connection %r being returned to pool", dbapi_connection
970 )
971
972 try:
973 if not fairy:
974 assert connection_record is not None
975 fairy = _ConnectionFairy(
976 pool,
977 dbapi_connection,
978 connection_record,
979 echo,
980 )
981 assert fairy.dbapi_connection is dbapi_connection
982
983 fairy._reset(
984 pool,
985 transaction_was_reset=transaction_was_reset,
986 terminate_only=detach,
987 asyncio_safe=can_manipulate_connection,
988 )
989
990 if detach:
991 if connection_record:
992 fairy._pool = pool
993 fairy.detach()
994
995 if can_close_or_terminate_connection:
996 if pool.dispatch.close_detached:
997 pool.dispatch.close_detached(dbapi_connection)
998
999 pool._close_connection(
1000 dbapi_connection,
1001 terminate=requires_terminate_for_close,
1002 )
1003
1004 except BaseException as e:
1005 pool.logger.error(
1006 "Exception during reset or similar", exc_info=True
1007 )
1008 if connection_record:
1009 connection_record.invalidate(e=e)
1010 if not isinstance(e, Exception):
1011 raise
1012 finally:
1013 if detach and is_gc_cleanup and dont_restore_gced:
1014 message = (
1015 "The garbage collector is trying to clean up "
1016 f"non-checked-in connection {dbapi_connection!r}, "
1017 f"""which will be {
1018 'dropped, as it cannot be safely terminated'
1019 if not can_close_or_terminate_connection
1020 else 'terminated'
1021 }. """
1022 "Please ensure that SQLAlchemy pooled connections are "
1023 "returned to "
1024 "the pool explicitly, either by calling ``close()`` "
1025 "or by using appropriate context managers to manage "
1026 "their lifecycle."
1027 )
1028 pool.logger.error(message)
1029 util.warn(message)
1030
1031 if connection_record and connection_record.fairy_ref is not None:
1032 connection_record.checkin()
1033
1034 # give gc some help. See
1035 # test/engine/test_pool.py::PoolEventsTest::test_checkin_event_gc[True]
1036 # which actually started failing when pytest warnings plugin was
1037 # turned on, due to util.warn() above
1038 if fairy is not None:
1039 fairy.dbapi_connection = None # type: ignore
1040 fairy._connection_record = None
1041 del dbapi_connection
1042 del connection_record
1043 del fairy
1044
1045
1046# a dictionary of the _ConnectionFairy weakrefs to _ConnectionRecord, so that
1047# GC under pypy will call ConnectionFairy finalizers. linked directly to the
1048# weakref that will empty itself when collected so that it should not create
1049# any unmanaged memory references.
1050_strong_ref_connection_records: Dict[
1051 weakref.ref[_ConnectionFairy], _ConnectionRecord
1052] = {}
1053
1054
1055class PoolProxiedConnection(ManagesConnection):
1056 """A connection-like adapter for a :pep:`249` DBAPI connection, which
1057 includes additional methods specific to the :class:`.Pool` implementation.
1058
1059 :class:`.PoolProxiedConnection` is the public-facing interface for the
1060 internal :class:`._ConnectionFairy` implementation object; users familiar
1061 with :class:`._ConnectionFairy` can consider this object to be equivalent.
1062
1063 .. versionadded:: 2.0 :class:`.PoolProxiedConnection` provides the public-
1064 facing interface for the :class:`._ConnectionFairy` internal class.
1065
1066 """
1067
1068 __slots__ = ()
1069
1070 if typing.TYPE_CHECKING:
1071
1072 def commit(self) -> None: ...
1073
1074 def cursor(self, *args: Any, **kwargs: Any) -> DBAPICursor: ...
1075
1076 def rollback(self) -> None: ...
1077
1078 def __getattr__(self, key: str) -> Any: ...
1079
1080 @property
1081 def is_valid(self) -> bool:
1082 """Return True if this :class:`.PoolProxiedConnection` still refers
1083 to an active DBAPI connection."""
1084
1085 raise NotImplementedError()
1086
1087 @property
1088 def is_detached(self) -> bool:
1089 """Return True if this :class:`.PoolProxiedConnection` is detached
1090 from its pool."""
1091
1092 raise NotImplementedError()
1093
1094 def detach(self) -> None:
1095 """Separate this connection from its Pool.
1096
1097 This means that the connection will no longer be returned to the
1098 pool when closed, and will instead be literally closed. The
1099 associated :class:`.ConnectionPoolEntry` is de-associated from this
1100 DBAPI connection.
1101
1102 Note that any overall connection limiting constraints imposed by a
1103 Pool implementation may be violated after a detach, as the detached
1104 connection is removed from the pool's knowledge and control.
1105
1106 """
1107
1108 raise NotImplementedError()
1109
1110 def close(self) -> None:
1111 """Release this connection back to the pool.
1112
1113 The :meth:`.PoolProxiedConnection.close` method shadows the
1114 :pep:`249` ``.close()`` method, altering its behavior to instead
1115 :term:`release` the proxied connection back to the connection pool.
1116
1117 Upon release to the pool, whether the connection stays "opened" and
1118 pooled in the Python process, versus actually closed out and removed
1119 from the Python process, is based on the pool implementation in use and
1120 its configuration and current state.
1121
1122 """
1123 raise NotImplementedError()
1124
1125
1126class _AdhocProxiedConnection(PoolProxiedConnection):
1127 """provides the :class:`.PoolProxiedConnection` interface for cases where
1128 the DBAPI connection is not actually proxied.
1129
1130 This is used by the engine internals to pass a consistent
1131 :class:`.PoolProxiedConnection` object to consuming dialects in response to
1132 pool events that may not always have the :class:`._ConnectionFairy`
1133 available.
1134
1135 """
1136
1137 __slots__ = ("dbapi_connection", "_connection_record", "_is_valid")
1138
1139 dbapi_connection: DBAPIConnection
1140 _connection_record: ConnectionPoolEntry
1141
1142 def __init__(
1143 self,
1144 dbapi_connection: DBAPIConnection,
1145 connection_record: ConnectionPoolEntry,
1146 ):
1147 self.dbapi_connection = dbapi_connection
1148 self._connection_record = connection_record
1149 self._is_valid = True
1150
1151 @property
1152 def driver_connection(self) -> Any: # type: ignore[override] # mypy#4125
1153 return self._connection_record.driver_connection
1154
1155 @property
1156 def connection(self) -> DBAPIConnection:
1157 return self.dbapi_connection
1158
1159 @property
1160 def is_valid(self) -> bool:
1161 """Implement is_valid state attribute.
1162
1163 for the adhoc proxied connection it's assumed the connection is valid
1164 as there is no "invalidate" routine.
1165
1166 """
1167 return self._is_valid
1168
1169 def invalidate(
1170 self, e: Optional[BaseException] = None, soft: bool = False
1171 ) -> None:
1172 self._is_valid = False
1173
1174 @util.ro_non_memoized_property
1175 def record_info(self) -> Optional[_InfoType]:
1176 return self._connection_record.record_info
1177
1178 def cursor(self, *args: Any, **kwargs: Any) -> DBAPICursor:
1179 return self.dbapi_connection.cursor(*args, **kwargs)
1180
1181 def __getattr__(self, key: Any) -> Any:
1182 return getattr(self.dbapi_connection, key)
1183
1184
1185class _ConnectionFairy(PoolProxiedConnection):
1186 """Proxies a DBAPI connection and provides return-on-dereference
1187 support.
1188
1189 This is an internal object used by the :class:`_pool.Pool` implementation
1190 to provide context management to a DBAPI connection delivered by
1191 that :class:`_pool.Pool`. The public facing interface for this class
1192 is described by the :class:`.PoolProxiedConnection` class. See that
1193 class for public API details.
1194
1195 The name "fairy" is inspired by the fact that the
1196 :class:`._ConnectionFairy` object's lifespan is transitory, as it lasts
1197 only for the length of a specific DBAPI connection being checked out from
1198 the pool, and additionally that as a transparent proxy, it is mostly
1199 invisible.
1200
1201 .. seealso::
1202
1203 :class:`.PoolProxiedConnection`
1204
1205 :class:`.ConnectionPoolEntry`
1206
1207
1208 """
1209
1210 __slots__ = (
1211 "dbapi_connection",
1212 "_connection_record",
1213 "_echo",
1214 "_pool",
1215 "_counter",
1216 "__weakref__",
1217 "__dict__",
1218 )
1219
1220 pool: Pool
1221 dbapi_connection: DBAPIConnection
1222 _echo: log._EchoFlagType
1223
1224 def __init__(
1225 self,
1226 pool: Pool,
1227 dbapi_connection: DBAPIConnection,
1228 connection_record: _ConnectionRecord,
1229 echo: log._EchoFlagType,
1230 ):
1231 self._pool = pool
1232 self._counter = 0
1233 self.dbapi_connection = dbapi_connection
1234 self._connection_record = connection_record
1235 self._echo = echo
1236
1237 _connection_record: Optional[_ConnectionRecord]
1238
1239 @property
1240 def driver_connection(self) -> Optional[Any]: # type: ignore[override] # mypy#4125 # noqa: E501
1241 if self._connection_record is None:
1242 return None
1243 return self._connection_record.driver_connection
1244
1245 @property
1246 @util.deprecated(
1247 "2.0",
1248 "The _ConnectionFairy.connection attribute is deprecated; "
1249 "please use 'driver_connection'",
1250 )
1251 def connection(self) -> DBAPIConnection:
1252 return self.dbapi_connection
1253
1254 @classmethod
1255 def _checkout(
1256 cls,
1257 pool: Pool,
1258 threadconns: Optional[threading.local] = None,
1259 fairy: Optional[_ConnectionFairy] = None,
1260 ) -> _ConnectionFairy:
1261 if not fairy:
1262 fairy = _ConnectionRecord.checkout(pool)
1263
1264 if threadconns is not None:
1265 threadconns.current = weakref.ref(fairy)
1266
1267 assert (
1268 fairy._connection_record is not None
1269 ), "can't 'checkout' a detached connection fairy"
1270 assert (
1271 fairy.dbapi_connection is not None
1272 ), "can't 'checkout' an invalidated connection fairy"
1273
1274 fairy._counter += 1
1275 if (
1276 not pool.dispatch.checkout and not pool._pre_ping
1277 ) or fairy._counter != 1:
1278 return fairy
1279
1280 # Pool listeners can trigger a reconnection on checkout, as well
1281 # as the pre-pinger.
1282 # there are three attempts made here, but note that if the database
1283 # is not accessible from a connection standpoint, those won't proceed
1284 # here.
1285
1286 attempts = 2
1287
1288 while attempts > 0:
1289 connection_is_fresh = fairy._connection_record.fresh
1290 fairy._connection_record.fresh = False
1291 try:
1292 if pool._pre_ping:
1293 if not connection_is_fresh:
1294 if fairy._echo:
1295 pool.logger.debug(
1296 "Pool pre-ping on connection %s",
1297 fairy.dbapi_connection,
1298 )
1299 result = pool._dialect._do_ping_w_event(
1300 fairy.dbapi_connection
1301 )
1302 if not result:
1303 if fairy._echo:
1304 pool.logger.debug(
1305 "Pool pre-ping on connection %s failed, "
1306 "will invalidate pool",
1307 fairy.dbapi_connection,
1308 )
1309 raise exc.InvalidatePoolError()
1310 elif fairy._echo:
1311 pool.logger.debug(
1312 "Connection %s is fresh, skipping pre-ping",
1313 fairy.dbapi_connection,
1314 )
1315
1316 pool.dispatch.checkout(
1317 fairy.dbapi_connection, fairy._connection_record, fairy
1318 )
1319 return fairy
1320 except exc.DisconnectionError as e:
1321 if e.invalidate_pool:
1322 pool.logger.info(
1323 "Disconnection detected on checkout, "
1324 "invalidating all pooled connections prior to "
1325 "current timestamp (reason: %r)",
1326 e,
1327 )
1328 fairy._connection_record.invalidate(e)
1329 pool._invalidate(fairy, e, _checkin=False)
1330 else:
1331 pool.logger.info(
1332 "Disconnection detected on checkout, "
1333 "invalidating individual connection %s (reason: %r)",
1334 fairy.dbapi_connection,
1335 e,
1336 )
1337 fairy._connection_record.invalidate(e)
1338 try:
1339 fairy.dbapi_connection = (
1340 fairy._connection_record.get_connection()
1341 )
1342 except BaseException as err:
1343 with util.safe_reraise():
1344 fairy._connection_record._checkin_failed(
1345 err,
1346 _fairy_was_created=True,
1347 )
1348
1349 # prevent _ConnectionFairy from being carried
1350 # in the stack trace. Do this after the
1351 # connection record has been checked in, so that
1352 # if the del triggers a finalize fairy, it won't
1353 # try to checkin a second time.
1354 del fairy
1355
1356 # never called, this is for code linters
1357 raise
1358
1359 attempts -= 1
1360 except BaseException as be_outer:
1361 with util.safe_reraise():
1362 rec = fairy._connection_record
1363 if rec is not None:
1364 rec._checkin_failed(
1365 be_outer,
1366 _fairy_was_created=True,
1367 )
1368
1369 # prevent _ConnectionFairy from being carried
1370 # in the stack trace, see above
1371 del fairy
1372
1373 # never called, this is for code linters
1374 raise
1375
1376 pool.logger.info("Reconnection attempts exhausted on checkout")
1377 fairy.invalidate()
1378 raise exc.InvalidRequestError("This connection is closed")
1379
1380 def _checkout_existing(self) -> _ConnectionFairy:
1381 return _ConnectionFairy._checkout(self._pool, fairy=self)
1382
1383 def _checkin(self, transaction_was_reset: bool = False) -> None:
1384 _finalize_fairy(
1385 self.dbapi_connection,
1386 self._connection_record,
1387 self._pool,
1388 None,
1389 self._echo,
1390 transaction_was_reset=transaction_was_reset,
1391 fairy=self,
1392 )
1393
1394 def _close(self) -> None:
1395 self._checkin()
1396
1397 def _reset(
1398 self,
1399 pool: Pool,
1400 transaction_was_reset: bool,
1401 terminate_only: bool,
1402 asyncio_safe: bool,
1403 ) -> None:
1404 if pool.dispatch.reset:
1405 pool.dispatch.reset(
1406 self.dbapi_connection,
1407 self._connection_record,
1408 PoolResetState(
1409 transaction_was_reset=transaction_was_reset,
1410 terminate_only=terminate_only,
1411 asyncio_safe=asyncio_safe,
1412 ),
1413 )
1414
1415 if not asyncio_safe:
1416 return
1417
1418 if pool._reset_on_return is reset_rollback:
1419 if transaction_was_reset:
1420 if self._echo:
1421 pool.logger.debug(
1422 "Connection %s reset, transaction already reset",
1423 self.dbapi_connection,
1424 )
1425 else:
1426 if self._echo:
1427 pool.logger.debug(
1428 "Connection %s rollback-on-return",
1429 self.dbapi_connection,
1430 )
1431 pool._dialect.do_rollback(self)
1432 elif pool._reset_on_return is reset_commit:
1433 if self._echo:
1434 pool.logger.debug(
1435 "Connection %s commit-on-return",
1436 self.dbapi_connection,
1437 )
1438 pool._dialect.do_commit(self)
1439
1440 @property
1441 def _logger(self) -> log._IdentifiedLoggerType:
1442 return self._pool.logger
1443
1444 @property
1445 def is_valid(self) -> bool:
1446 return self.dbapi_connection is not None
1447
1448 @property
1449 def is_detached(self) -> bool:
1450 return self._connection_record is None
1451
1452 @util.ro_memoized_property
1453 def info(self) -> _InfoType:
1454 if self._connection_record is None:
1455 return {}
1456 else:
1457 return self._connection_record.info
1458
1459 @util.ro_non_memoized_property
1460 def record_info(self) -> Optional[_InfoType]:
1461 if self._connection_record is None:
1462 return None
1463 else:
1464 return self._connection_record.record_info
1465
1466 def invalidate(
1467 self, e: Optional[BaseException] = None, soft: bool = False
1468 ) -> None:
1469 if self.dbapi_connection is None:
1470 util.warn("Can't invalidate an already-closed connection.")
1471 return
1472 if self._connection_record:
1473 self._connection_record.invalidate(e=e, soft=soft)
1474 if not soft:
1475 # prevent any rollback / reset actions etc. on
1476 # the connection
1477 self.dbapi_connection = None # type: ignore
1478
1479 # finalize
1480 self._checkin()
1481
1482 def cursor(self, *args: Any, **kwargs: Any) -> DBAPICursor:
1483 assert self.dbapi_connection is not None
1484 return self.dbapi_connection.cursor(*args, **kwargs)
1485
1486 def __getattr__(self, key: str) -> Any:
1487 return getattr(self.dbapi_connection, key)
1488
1489 def detach(self) -> None:
1490 if self._connection_record is not None:
1491 rec = self._connection_record
1492 rec.fairy_ref = None
1493 rec.dbapi_connection = None
1494 # TODO: should this be _return_conn?
1495 self._pool._do_return_conn(self._connection_record)
1496
1497 # can't get the descriptor assignment to work here
1498 # in pylance. mypy is OK w/ it
1499 self.info = self.info.copy() # type: ignore
1500
1501 self._connection_record = None
1502
1503 if self._pool.dispatch.detach:
1504 self._pool.dispatch.detach(self.dbapi_connection, rec)
1505
1506 def close(self) -> None:
1507 self._counter -= 1
1508 if self._counter == 0:
1509 self._checkin()
1510
1511 def _close_special(self, transaction_reset: bool = False) -> None:
1512 self._counter -= 1
1513 if self._counter == 0:
1514 self._checkin(transaction_was_reset=transaction_reset)