1# pool/base.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8
9"""Base constructs for connection pools."""
10
11from __future__ import annotations
12
13from collections import deque
14import dataclasses
15from enum import Enum
16import threading
17import time
18import typing
19from typing import Any
20from typing import Callable
21from typing import cast
22from typing import Deque
23from typing import Dict
24from typing import List
25from typing import Optional
26from typing import Tuple
27from typing import TYPE_CHECKING
28from typing import Union
29import weakref
30
31from .. import event
32from .. import exc
33from .. import log
34from .. import util
35from ..util.typing import Literal
36from ..util.typing import Protocol
37
38if TYPE_CHECKING:
39 from ..engine.interfaces import DBAPIConnection
40 from ..engine.interfaces import DBAPICursor
41 from ..engine.interfaces import Dialect
42 from ..event import _DispatchCommon
43 from ..event import _ListenerFnType
44 from ..event import dispatcher
45 from ..sql._typing import _InfoType
46
47
48@dataclasses.dataclass(frozen=True)
49class PoolResetState:
50 """describes the state of a DBAPI connection as it is being passed to
51 the :meth:`.PoolEvents.reset` connection pool event.
52
53 .. versionadded:: 2.0.0b3
54
55 """
56
57 __slots__ = ("transaction_was_reset", "terminate_only", "asyncio_safe")
58
59 transaction_was_reset: bool
60 """Indicates if the transaction on the DBAPI connection was already
61 essentially "reset" back by the :class:`.Connection` object.
62
63 This boolean is True if the :class:`.Connection` had transactional
64 state present upon it, which was then not closed using the
65 :meth:`.Connection.rollback` or :meth:`.Connection.commit` method;
66 instead, the transaction was closed inline within the
67 :meth:`.Connection.close` method so is guaranteed to remain non-present
68 when this event is reached.
69
70 """
71
72 terminate_only: bool
73 """indicates if the connection is to be immediately terminated and
74 not checked in to the pool.
75
76 This occurs for connections that were invalidated, as well as asyncio
77 connections that were not cleanly handled by the calling code that
78 are instead being garbage collected. In the latter case,
79 operations can't be safely run on asyncio connections within garbage
80 collection as there is not necessarily an event loop present.
81
82 """
83
84 asyncio_safe: bool
85 """Indicates if the reset operation is occurring within a scope where
86 an enclosing event loop is expected to be present for asyncio applications.
87
88 Will be False in the case that the connection is being garbage collected.
89
90 """
91
92
93class ResetStyle(Enum):
94 """Describe options for "reset on return" behaviors."""
95
96 reset_rollback = 0
97 reset_commit = 1
98 reset_none = 2
99
100
101_ResetStyleArgType = Union[
102 ResetStyle,
103 Literal[True, None, False, "commit", "rollback"],
104]
105reset_rollback, reset_commit, reset_none = list(ResetStyle)
106
107
108class _ConnDialect:
109 """partial implementation of :class:`.Dialect`
110 which provides DBAPI connection methods.
111
112 When a :class:`_pool.Pool` is combined with an :class:`_engine.Engine`,
113 the :class:`_engine.Engine` replaces this with its own
114 :class:`.Dialect`.
115
116 """
117
118 is_async = False
119 has_terminate = False
120
121 def do_rollback(self, dbapi_connection: PoolProxiedConnection) -> None:
122 dbapi_connection.rollback()
123
124 def do_commit(self, dbapi_connection: PoolProxiedConnection) -> None:
125 dbapi_connection.commit()
126
127 def do_terminate(self, dbapi_connection: DBAPIConnection) -> None:
128 dbapi_connection.close()
129
130 def do_close(self, dbapi_connection: DBAPIConnection) -> None:
131 dbapi_connection.close()
132
133 def _do_ping_w_event(self, dbapi_connection: DBAPIConnection) -> bool:
134 raise NotImplementedError(
135 "The ping feature requires that a dialect is "
136 "passed to the connection pool."
137 )
138
139 def get_driver_connection(self, connection: DBAPIConnection) -> Any:
140 return connection
141
142
143class _AsyncConnDialect(_ConnDialect):
144 is_async = True
145
146
147class _CreatorFnType(Protocol):
148 def __call__(self) -> DBAPIConnection: ...
149
150
151class _CreatorWRecFnType(Protocol):
152 def __call__(self, rec: ConnectionPoolEntry) -> DBAPIConnection: ...
153
154
155class Pool(log.Identified, event.EventTarget):
156 """Abstract base class for connection pools."""
157
158 dispatch: dispatcher[Pool]
159 echo: log._EchoFlagType
160
161 _orig_logging_name: Optional[str]
162 _dialect: Union[_ConnDialect, Dialect] = _ConnDialect()
163 _creator_arg: Union[_CreatorFnType, _CreatorWRecFnType]
164 _invoke_creator: _CreatorWRecFnType
165 _invalidate_time: float
166
167 def __init__(
168 self,
169 creator: Union[_CreatorFnType, _CreatorWRecFnType],
170 recycle: int = -1,
171 echo: log._EchoFlagType = None,
172 logging_name: Optional[str] = None,
173 reset_on_return: _ResetStyleArgType = True,
174 events: Optional[List[Tuple[_ListenerFnType, str]]] = None,
175 dialect: Optional[Union[_ConnDialect, Dialect]] = None,
176 pre_ping: bool = False,
177 _dispatch: Optional[_DispatchCommon[Pool]] = None,
178 ):
179 """
180 Construct a Pool.
181
182 :param creator: a callable function that returns a DB-API
183 connection object. The function will be called with
184 parameters.
185
186 :param recycle: If set to a value other than -1, number of
187 seconds between connection recycling, which means upon
188 checkout, if this timeout is surpassed the connection will be
189 closed and replaced with a newly opened connection. Defaults to -1.
190
191 :param logging_name: String identifier which will be used within
192 the "name" field of logging records generated within the
193 "sqlalchemy.pool" logger. Defaults to a hexstring of the object's
194 id.
195
196 :param echo: if True, the connection pool will log
197 informational output such as when connections are invalidated
198 as well as when connections are recycled to the default log handler,
199 which defaults to ``sys.stdout`` for output.. If set to the string
200 ``"debug"``, the logging will include pool checkouts and checkins.
201
202 The :paramref:`_pool.Pool.echo` parameter can also be set from the
203 :func:`_sa.create_engine` call by using the
204 :paramref:`_sa.create_engine.echo_pool` parameter.
205
206 .. seealso::
207
208 :ref:`dbengine_logging` - further detail on how to configure
209 logging.
210
211 :param reset_on_return: Determine steps to take on
212 connections as they are returned to the pool, which were
213 not otherwise handled by a :class:`_engine.Connection`.
214 Available from :func:`_sa.create_engine` via the
215 :paramref:`_sa.create_engine.pool_reset_on_return` parameter.
216
217 :paramref:`_pool.Pool.reset_on_return` can have any of these values:
218
219 * ``"rollback"`` - call rollback() on the connection,
220 to release locks and transaction resources.
221 This is the default value. The vast majority
222 of use cases should leave this value set.
223 * ``"commit"`` - call commit() on the connection,
224 to release locks and transaction resources.
225 A commit here may be desirable for databases that
226 cache query plans if a commit is emitted,
227 such as Microsoft SQL Server. However, this
228 value is more dangerous than 'rollback' because
229 any data changes present on the transaction
230 are committed unconditionally.
231 * ``None`` - don't do anything on the connection.
232 This setting may be appropriate if the database / DBAPI
233 works in pure "autocommit" mode at all times, or if
234 a custom reset handler is established using the
235 :meth:`.PoolEvents.reset` event handler.
236
237 * ``True`` - same as 'rollback', this is here for
238 backwards compatibility.
239 * ``False`` - same as None, this is here for
240 backwards compatibility.
241
242 For further customization of reset on return, the
243 :meth:`.PoolEvents.reset` event hook may be used which can perform
244 any connection activity desired on reset.
245
246 .. seealso::
247
248 :ref:`pool_reset_on_return`
249
250 :meth:`.PoolEvents.reset`
251
252 :param events: a list of 2-tuples, each of the form
253 ``(callable, target)`` which will be passed to :func:`.event.listen`
254 upon construction. Provided here so that event listeners
255 can be assigned via :func:`_sa.create_engine` before dialect-level
256 listeners are applied.
257
258 :param dialect: a :class:`.Dialect` that will handle the job
259 of calling rollback(), close(), or commit() on DBAPI connections.
260 If omitted, a built-in "stub" dialect is used. Applications that
261 make use of :func:`_sa.create_engine` should not use this parameter
262 as it is handled by the engine creation strategy.
263
264 :param pre_ping: if True, the pool will emit a "ping" (typically
265 "SELECT 1", but is dialect-specific) on the connection
266 upon checkout, to test if the connection is alive or not. If not,
267 the connection is transparently re-connected and upon success, all
268 other pooled connections established prior to that timestamp are
269 invalidated. Requires that a dialect is passed as well to
270 interpret the disconnection error.
271
272 .. versionadded:: 1.2
273
274 """
275 if logging_name:
276 self.logging_name = self._orig_logging_name = logging_name
277 else:
278 self._orig_logging_name = None
279
280 log.instance_logger(self, echoflag=echo)
281 self._creator = creator
282 self._recycle = recycle
283 self._invalidate_time = 0
284 self._pre_ping = pre_ping
285 self._reset_on_return = util.parse_user_argument_for_enum(
286 reset_on_return,
287 {
288 ResetStyle.reset_rollback: ["rollback", True],
289 ResetStyle.reset_none: ["none", None, False],
290 ResetStyle.reset_commit: ["commit"],
291 },
292 "reset_on_return",
293 )
294
295 self.echo = echo
296
297 if _dispatch:
298 self.dispatch._update(_dispatch, only_propagate=False)
299 if dialect:
300 self._dialect = dialect
301 if events:
302 for fn, target in events:
303 event.listen(self, target, fn)
304
305 @util.hybridproperty
306 def _is_asyncio(self) -> bool:
307 return self._dialect.is_async
308
309 @property
310 def _creator(self) -> Union[_CreatorFnType, _CreatorWRecFnType]:
311 return self._creator_arg
312
313 @_creator.setter
314 def _creator(
315 self, creator: Union[_CreatorFnType, _CreatorWRecFnType]
316 ) -> None:
317 self._creator_arg = creator
318
319 # mypy seems to get super confused assigning functions to
320 # attributes
321 self._invoke_creator = self._should_wrap_creator(creator)
322
323 @_creator.deleter
324 def _creator(self) -> None:
325 # needed for mock testing
326 del self._creator_arg
327 del self._invoke_creator
328
329 def _should_wrap_creator(
330 self, creator: Union[_CreatorFnType, _CreatorWRecFnType]
331 ) -> _CreatorWRecFnType:
332 """Detect if creator accepts a single argument, or is sent
333 as a legacy style no-arg function.
334
335 """
336
337 try:
338 argspec = util.get_callable_argspec(self._creator, no_self=True)
339 except TypeError:
340 creator_fn = cast(_CreatorFnType, creator)
341 return lambda rec: creator_fn()
342
343 if argspec.defaults is not None:
344 defaulted = len(argspec.defaults)
345 else:
346 defaulted = 0
347 positionals = len(argspec[0]) - defaulted
348
349 # look for the exact arg signature that DefaultStrategy
350 # sends us
351 if (argspec[0], argspec[3]) == (["connection_record"], (None,)):
352 return cast(_CreatorWRecFnType, creator)
353 # or just a single positional
354 elif positionals == 1:
355 return cast(_CreatorWRecFnType, creator)
356 # all other cases, just wrap and assume legacy "creator" callable
357 # thing
358 else:
359 creator_fn = cast(_CreatorFnType, creator)
360 return lambda rec: creator_fn()
361
362 def _close_connection(
363 self, connection: DBAPIConnection, *, terminate: bool = False
364 ) -> None:
365 self.logger.debug(
366 "%s connection %r",
367 "Hard-closing" if terminate else "Closing",
368 connection,
369 )
370 try:
371 if terminate:
372 self._dialect.do_terminate(connection)
373 else:
374 self._dialect.do_close(connection)
375 except BaseException as e:
376 self.logger.error(
377 f"Exception {'terminating' if terminate else 'closing'} "
378 f"connection %r",
379 connection,
380 exc_info=True,
381 )
382 if not isinstance(e, Exception):
383 raise
384
385 def _create_connection(self) -> ConnectionPoolEntry:
386 """Called by subclasses to create a new ConnectionRecord."""
387
388 return _ConnectionRecord(self)
389
390 def _invalidate(
391 self,
392 connection: PoolProxiedConnection,
393 exception: Optional[BaseException] = None,
394 _checkin: bool = True,
395 ) -> None:
396 """Mark all connections established within the generation
397 of the given connection as invalidated.
398
399 If this pool's last invalidate time is before when the given
400 connection was created, update the timestamp til now. Otherwise,
401 no action is performed.
402
403 Connections with a start time prior to this pool's invalidation
404 time will be recycled upon next checkout.
405 """
406 rec = getattr(connection, "_connection_record", None)
407 if not rec or self._invalidate_time < rec.starttime:
408 self._invalidate_time = time.time()
409 if _checkin and getattr(connection, "is_valid", False):
410 connection.invalidate(exception)
411
412 def recreate(self) -> Pool:
413 """Return a new :class:`_pool.Pool`, of the same class as this one
414 and configured with identical creation arguments.
415
416 This method is used in conjunction with :meth:`dispose`
417 to close out an entire :class:`_pool.Pool` and create a new one in
418 its place.
419
420 """
421
422 raise NotImplementedError()
423
424 def dispose(self) -> None:
425 """Dispose of this pool.
426
427 This method leaves the possibility of checked-out connections
428 remaining open, as it only affects connections that are
429 idle in the pool.
430
431 .. seealso::
432
433 :meth:`Pool.recreate`
434
435 """
436
437 raise NotImplementedError()
438
439 def connect(self) -> PoolProxiedConnection:
440 """Return a DBAPI connection from the pool.
441
442 The connection is instrumented such that when its
443 ``close()`` method is called, the connection will be returned to
444 the pool.
445
446 """
447 return _ConnectionFairy._checkout(self)
448
449 def _return_conn(self, record: ConnectionPoolEntry) -> None:
450 """Given a _ConnectionRecord, return it to the :class:`_pool.Pool`.
451
452 This method is called when an instrumented DBAPI connection
453 has its ``close()`` method called.
454
455 """
456 self._do_return_conn(record)
457
458 def _do_get(self) -> ConnectionPoolEntry:
459 """Implementation for :meth:`get`, supplied by subclasses."""
460
461 raise NotImplementedError()
462
463 def _do_return_conn(self, record: ConnectionPoolEntry) -> None:
464 """Implementation for :meth:`return_conn`, supplied by subclasses."""
465
466 raise NotImplementedError()
467
468 def status(self) -> str:
469 """Returns a brief description of the state of this pool."""
470 raise NotImplementedError()
471
472
473class ManagesConnection:
474 """Common base for the two connection-management interfaces
475 :class:`.PoolProxiedConnection` and :class:`.ConnectionPoolEntry`.
476
477 These two objects are typically exposed in the public facing API
478 via the connection pool event hooks, documented at :class:`.PoolEvents`.
479
480 .. versionadded:: 2.0
481
482 """
483
484 __slots__ = ()
485
486 dbapi_connection: Optional[DBAPIConnection]
487 """A reference to the actual DBAPI connection being tracked.
488
489 This is a :pep:`249`-compliant object that for traditional sync-style
490 dialects is provided by the third-party
491 DBAPI implementation in use. For asyncio dialects, the implementation
492 is typically an adapter object provided by the SQLAlchemy dialect
493 itself; the underlying asyncio object is available via the
494 :attr:`.ManagesConnection.driver_connection` attribute.
495
496 SQLAlchemy's interface for the DBAPI connection is based on the
497 :class:`.DBAPIConnection` protocol object
498
499 .. seealso::
500
501 :attr:`.ManagesConnection.driver_connection`
502
503 :ref:`faq_dbapi_connection`
504
505 """
506
507 driver_connection: Optional[Any]
508 """The "driver level" connection object as used by the Python
509 DBAPI or database driver.
510
511 For traditional :pep:`249` DBAPI implementations, this object will
512 be the same object as that of
513 :attr:`.ManagesConnection.dbapi_connection`. For an asyncio database
514 driver, this will be the ultimate "connection" object used by that
515 driver, such as the ``asyncpg.Connection`` object which will not have
516 standard pep-249 methods.
517
518 .. versionadded:: 1.4.24
519
520 .. seealso::
521
522 :attr:`.ManagesConnection.dbapi_connection`
523
524 :ref:`faq_dbapi_connection`
525
526 """
527
528 @util.ro_memoized_property
529 def info(self) -> _InfoType:
530 """Info dictionary associated with the underlying DBAPI connection
531 referred to by this :class:`.ManagesConnection` instance, allowing
532 user-defined data to be associated with the connection.
533
534 The data in this dictionary is persistent for the lifespan
535 of the DBAPI connection itself, including across pool checkins
536 and checkouts. When the connection is invalidated
537 and replaced with a new one, this dictionary is cleared.
538
539 For a :class:`.PoolProxiedConnection` instance that's not associated
540 with a :class:`.ConnectionPoolEntry`, such as if it were detached, the
541 attribute returns a dictionary that is local to that
542 :class:`.ConnectionPoolEntry`. Therefore the
543 :attr:`.ManagesConnection.info` attribute will always provide a Python
544 dictionary.
545
546 .. seealso::
547
548 :attr:`.ManagesConnection.record_info`
549
550
551 """
552 raise NotImplementedError()
553
554 @util.ro_memoized_property
555 def record_info(self) -> Optional[_InfoType]:
556 """Persistent info dictionary associated with this
557 :class:`.ManagesConnection`.
558
559 Unlike the :attr:`.ManagesConnection.info` dictionary, the lifespan
560 of this dictionary is that of the :class:`.ConnectionPoolEntry`
561 which owns it; therefore this dictionary will persist across
562 reconnects and connection invalidation for a particular entry
563 in the connection pool.
564
565 For a :class:`.PoolProxiedConnection` instance that's not associated
566 with a :class:`.ConnectionPoolEntry`, such as if it were detached, the
567 attribute returns None. Contrast to the :attr:`.ManagesConnection.info`
568 dictionary which is never None.
569
570
571 .. seealso::
572
573 :attr:`.ManagesConnection.info`
574
575 """
576 raise NotImplementedError()
577
578 def invalidate(
579 self, e: Optional[BaseException] = None, soft: bool = False
580 ) -> None:
581 """Mark the managed connection as invalidated.
582
583 :param e: an exception object indicating a reason for the invalidation.
584
585 :param soft: if True, the connection isn't closed; instead, this
586 connection will be recycled on next checkout.
587
588 .. seealso::
589
590 :ref:`pool_connection_invalidation`
591
592
593 """
594 raise NotImplementedError()
595
596
597class ConnectionPoolEntry(ManagesConnection):
598 """Interface for the object that maintains an individual database
599 connection on behalf of a :class:`_pool.Pool` instance.
600
601 The :class:`.ConnectionPoolEntry` object represents the long term
602 maintainance of a particular connection for a pool, including expiring or
603 invalidating that connection to have it replaced with a new one, which will
604 continue to be maintained by that same :class:`.ConnectionPoolEntry`
605 instance. Compared to :class:`.PoolProxiedConnection`, which is the
606 short-term, per-checkout connection manager, this object lasts for the
607 lifespan of a particular "slot" within a connection pool.
608
609 The :class:`.ConnectionPoolEntry` object is mostly visible to public-facing
610 API code when it is delivered to connection pool event hooks, such as
611 :meth:`_events.PoolEvents.connect` and :meth:`_events.PoolEvents.checkout`.
612
613 .. versionadded:: 2.0 :class:`.ConnectionPoolEntry` provides the public
614 facing interface for the :class:`._ConnectionRecord` internal class.
615
616 """
617
618 __slots__ = ()
619
620 @property
621 def in_use(self) -> bool:
622 """Return True the connection is currently checked out"""
623
624 raise NotImplementedError()
625
626 def close(self) -> None:
627 """Close the DBAPI connection managed by this connection pool entry."""
628 raise NotImplementedError()
629
630
631class _ConnectionRecord(ConnectionPoolEntry):
632 """Maintains a position in a connection pool which references a pooled
633 connection.
634
635 This is an internal object used by the :class:`_pool.Pool` implementation
636 to provide context management to a DBAPI connection maintained by
637 that :class:`_pool.Pool`. The public facing interface for this class
638 is described by the :class:`.ConnectionPoolEntry` class. See that
639 class for public API details.
640
641 .. seealso::
642
643 :class:`.ConnectionPoolEntry`
644
645 :class:`.PoolProxiedConnection`
646
647 """
648
649 __slots__ = (
650 "__pool",
651 "fairy_ref",
652 "finalize_callback",
653 "fresh",
654 "starttime",
655 "dbapi_connection",
656 "__weakref__",
657 "__dict__",
658 )
659
660 finalize_callback: Deque[Callable[[DBAPIConnection], None]]
661 fresh: bool
662 fairy_ref: Optional[weakref.ref[_ConnectionFairy]]
663 starttime: float
664
665 def __init__(self, pool: Pool, connect: bool = True):
666 self.fresh = False
667 self.fairy_ref = None
668 self.starttime = 0
669 self.dbapi_connection = None
670
671 self.__pool = pool
672 if connect:
673 self.__connect()
674 self.finalize_callback = deque()
675
676 dbapi_connection: Optional[DBAPIConnection]
677
678 @property
679 def driver_connection(self) -> Optional[Any]: # type: ignore[override] # mypy#4125 # noqa: E501
680 if self.dbapi_connection is None:
681 return None
682 else:
683 return self.__pool._dialect.get_driver_connection(
684 self.dbapi_connection
685 )
686
687 @property
688 @util.deprecated(
689 "2.0",
690 "The _ConnectionRecord.connection attribute is deprecated; "
691 "please use 'driver_connection'",
692 )
693 def connection(self) -> Optional[DBAPIConnection]:
694 return self.dbapi_connection
695
696 _soft_invalidate_time: float = 0
697
698 @util.ro_memoized_property
699 def info(self) -> _InfoType:
700 return {}
701
702 @util.ro_memoized_property
703 def record_info(self) -> Optional[_InfoType]:
704 return {}
705
706 @classmethod
707 def checkout(cls, pool: Pool) -> _ConnectionFairy:
708 if TYPE_CHECKING:
709 rec = cast(_ConnectionRecord, pool._do_get())
710 else:
711 rec = pool._do_get()
712
713 try:
714 dbapi_connection = rec.get_connection()
715 except BaseException as err:
716 with util.safe_reraise():
717 rec._checkin_failed(err, _fairy_was_created=False)
718
719 # not reached, for code linters only
720 raise
721
722 echo = pool._should_log_debug()
723 fairy = _ConnectionFairy(pool, dbapi_connection, rec, echo)
724
725 rec.fairy_ref = ref = weakref.ref(
726 fairy,
727 lambda ref: (
728 _finalize_fairy(
729 None, rec, pool, ref, echo, transaction_was_reset=False
730 )
731 if _finalize_fairy is not None
732 else None
733 ),
734 )
735 _strong_ref_connection_records[ref] = rec
736 if echo:
737 pool.logger.debug(
738 "Connection %r checked out from pool", dbapi_connection
739 )
740 return fairy
741
742 def _checkin_failed(
743 self, err: BaseException, _fairy_was_created: bool = True
744 ) -> None:
745 self.invalidate(e=err)
746 self.checkin(
747 _fairy_was_created=_fairy_was_created,
748 )
749
750 def checkin(self, _fairy_was_created: bool = True) -> None:
751 if self.fairy_ref is None and _fairy_was_created:
752 # _fairy_was_created is False for the initial get connection phase;
753 # meaning there was no _ConnectionFairy and we must unconditionally
754 # do a checkin.
755 #
756 # otherwise, if fairy_was_created==True, if fairy_ref is None here
757 # that means we were checked in already, so this looks like
758 # a double checkin.
759 util.warn("Double checkin attempted on %s" % self)
760 return
761 self.fairy_ref = None
762 connection = self.dbapi_connection
763 pool = self.__pool
764 while self.finalize_callback:
765 finalizer = self.finalize_callback.pop()
766 if connection is not None:
767 finalizer(connection)
768 if pool.dispatch.checkin:
769 pool.dispatch.checkin(connection, self)
770
771 pool._return_conn(self)
772
773 @property
774 def in_use(self) -> bool:
775 return self.fairy_ref is not None
776
777 @property
778 def last_connect_time(self) -> float:
779 return self.starttime
780
781 def close(self) -> None:
782 if self.dbapi_connection is not None:
783 self.__close()
784
785 def invalidate(
786 self, e: Optional[BaseException] = None, soft: bool = False
787 ) -> None:
788 # already invalidated
789 if self.dbapi_connection is None:
790 return
791 if soft:
792 self.__pool.dispatch.soft_invalidate(
793 self.dbapi_connection, self, e
794 )
795 else:
796 self.__pool.dispatch.invalidate(self.dbapi_connection, self, e)
797 if e is not None:
798 self.__pool.logger.info(
799 "%sInvalidate connection %r (reason: %s:%s)",
800 "Soft " if soft else "",
801 self.dbapi_connection,
802 e.__class__.__name__,
803 e,
804 )
805 else:
806 self.__pool.logger.info(
807 "%sInvalidate connection %r",
808 "Soft " if soft else "",
809 self.dbapi_connection,
810 )
811
812 if soft:
813 self._soft_invalidate_time = time.time()
814 else:
815 self.__close(terminate=True)
816 self.dbapi_connection = None
817
818 def get_connection(self) -> DBAPIConnection:
819 recycle = False
820
821 # NOTE: the various comparisons here are assuming that measurable time
822 # passes between these state changes. however, time.time() is not
823 # guaranteed to have sub-second precision. comparisons of
824 # "invalidation time" to "starttime" should perhaps use >= so that the
825 # state change can take place assuming no measurable time has passed,
826 # however this does not guarantee correct behavior here as if time
827 # continues to not pass, it will try to reconnect repeatedly until
828 # these timestamps diverge, so in that sense using > is safer. Per
829 # https://stackoverflow.com/a/1938096/34549, Windows time.time() may be
830 # within 16 milliseconds accuracy, so unit tests for connection
831 # invalidation need a sleep of at least this long between initial start
832 # time and invalidation for the logic below to work reliably.
833
834 if self.dbapi_connection is None:
835 self.info.clear()
836 self.__connect()
837 elif (
838 self.__pool._recycle > -1
839 and time.time() - self.starttime > self.__pool._recycle
840 ):
841 self.__pool.logger.info(
842 "Connection %r exceeded timeout; recycling",
843 self.dbapi_connection,
844 )
845 recycle = True
846 elif self.__pool._invalidate_time > self.starttime:
847 self.__pool.logger.info(
848 "Connection %r invalidated due to pool invalidation; "
849 + "recycling",
850 self.dbapi_connection,
851 )
852 recycle = True
853 elif self._soft_invalidate_time > self.starttime:
854 self.__pool.logger.info(
855 "Connection %r invalidated due to local soft invalidation; "
856 + "recycling",
857 self.dbapi_connection,
858 )
859 recycle = True
860
861 if recycle:
862 self.__close(terminate=True)
863 self.info.clear()
864
865 self.__connect()
866
867 assert self.dbapi_connection is not None
868 return self.dbapi_connection
869
870 def _is_hard_or_soft_invalidated(self) -> bool:
871 return (
872 self.dbapi_connection is None
873 or self.__pool._invalidate_time > self.starttime
874 or (self._soft_invalidate_time > self.starttime)
875 )
876
877 def __close(self, *, terminate: bool = False) -> None:
878 self.finalize_callback.clear()
879 if self.__pool.dispatch.close:
880 self.__pool.dispatch.close(self.dbapi_connection, self)
881 assert self.dbapi_connection is not None
882 self.__pool._close_connection(
883 self.dbapi_connection, terminate=terminate
884 )
885 self.dbapi_connection = None
886
887 def __connect(self) -> None:
888 pool = self.__pool
889
890 # ensure any existing connection is removed, so that if
891 # creator fails, this attribute stays None
892 self.dbapi_connection = None
893 try:
894 self.starttime = time.time()
895 self.dbapi_connection = connection = pool._invoke_creator(self)
896 pool.logger.debug("Created new connection %r", connection)
897 self.fresh = True
898 except BaseException as e:
899 with util.safe_reraise():
900 pool.logger.debug("Error on connect(): %s", e)
901 else:
902 # in SQLAlchemy 1.4 the first_connect event is not used by
903 # the engine, so this will usually not be set
904 if pool.dispatch.first_connect:
905 pool.dispatch.first_connect.for_modify(
906 pool.dispatch
907 ).exec_once_unless_exception(self.dbapi_connection, self)
908
909 # init of the dialect now takes place within the connect
910 # event, so ensure a mutex is used on the first run
911 pool.dispatch.connect.for_modify(
912 pool.dispatch
913 )._exec_w_sync_on_first_run(self.dbapi_connection, self)
914
915
916def _finalize_fairy(
917 dbapi_connection: Optional[DBAPIConnection],
918 connection_record: Optional[_ConnectionRecord],
919 pool: Pool,
920 ref: Optional[
921 weakref.ref[_ConnectionFairy]
922 ], # this is None when called directly, not by the gc
923 echo: Optional[log._EchoFlagType],
924 transaction_was_reset: bool = False,
925 fairy: Optional[_ConnectionFairy] = None,
926) -> None:
927 """Cleanup for a :class:`._ConnectionFairy` whether or not it's already
928 been garbage collected.
929
930 When using an async dialect no IO can happen here (without using
931 a dedicated thread), since this is called outside the greenlet
932 context and with an already running loop. In this case function
933 will only log a message and raise a warning.
934 """
935
936 is_gc_cleanup = ref is not None
937
938 if is_gc_cleanup:
939 assert ref is not None
940 _strong_ref_connection_records.pop(ref, None)
941 assert connection_record is not None
942 if connection_record.fairy_ref is not ref:
943 return
944 assert dbapi_connection is None
945 dbapi_connection = connection_record.dbapi_connection
946
947 elif fairy:
948 _strong_ref_connection_records.pop(weakref.ref(fairy), None)
949
950 # null pool is not _is_asyncio but can be used also with async dialects
951 dont_restore_gced = pool._dialect.is_async
952
953 if dont_restore_gced:
954 detach = connection_record is None or is_gc_cleanup
955 can_manipulate_connection = not is_gc_cleanup
956 can_close_or_terminate_connection = (
957 not pool._dialect.is_async or pool._dialect.has_terminate
958 )
959 requires_terminate_for_close = (
960 pool._dialect.is_async and pool._dialect.has_terminate
961 )
962
963 else:
964 detach = connection_record is None
965 can_manipulate_connection = can_close_or_terminate_connection = True
966 requires_terminate_for_close = False
967
968 if dbapi_connection is not None:
969 if connection_record and echo:
970 pool.logger.debug(
971 "Connection %r being returned to pool", dbapi_connection
972 )
973
974 try:
975 if not fairy:
976 assert connection_record is not None
977 fairy = _ConnectionFairy(
978 pool,
979 dbapi_connection,
980 connection_record,
981 echo,
982 )
983 assert fairy.dbapi_connection is dbapi_connection
984
985 fairy._reset(
986 pool,
987 transaction_was_reset=transaction_was_reset,
988 terminate_only=detach,
989 asyncio_safe=can_manipulate_connection,
990 )
991
992 if detach:
993 if connection_record:
994 fairy._pool = pool
995 fairy.detach()
996
997 if can_close_or_terminate_connection:
998 if pool.dispatch.close_detached:
999 pool.dispatch.close_detached(dbapi_connection)
1000
1001 pool._close_connection(
1002 dbapi_connection,
1003 terminate=requires_terminate_for_close,
1004 )
1005
1006 except BaseException as e:
1007 pool.logger.error(
1008 "Exception during reset or similar", exc_info=True
1009 )
1010 if connection_record:
1011 connection_record.invalidate(e=e)
1012 if not isinstance(e, Exception):
1013 raise
1014 finally:
1015 if detach and is_gc_cleanup and dont_restore_gced:
1016 message = (
1017 "The garbage collector is trying to clean up "
1018 f"non-checked-in connection {dbapi_connection!r}, "
1019 f"""which will be {
1020 'dropped, as it cannot be safely terminated'
1021 if not can_close_or_terminate_connection
1022 else 'terminated'
1023 }. """
1024 "Please ensure that SQLAlchemy pooled connections are "
1025 "returned to "
1026 "the pool explicitly, either by calling ``close()`` "
1027 "or by using appropriate context managers to manage "
1028 "their lifecycle."
1029 )
1030 pool.logger.error(message)
1031 util.warn(message)
1032
1033 if connection_record and connection_record.fairy_ref is not None:
1034 connection_record.checkin()
1035
1036 # give gc some help. See
1037 # test/engine/test_pool.py::PoolEventsTest::test_checkin_event_gc[True]
1038 # which actually started failing when pytest warnings plugin was
1039 # turned on, due to util.warn() above
1040 if fairy is not None:
1041 fairy.dbapi_connection = None # type: ignore
1042 fairy._connection_record = None
1043 del dbapi_connection
1044 del connection_record
1045 del fairy
1046
1047
1048# a dictionary of the _ConnectionFairy weakrefs to _ConnectionRecord, so that
1049# GC under pypy will call ConnectionFairy finalizers. linked directly to the
1050# weakref that will empty itself when collected so that it should not create
1051# any unmanaged memory references.
1052_strong_ref_connection_records: Dict[
1053 weakref.ref[_ConnectionFairy], _ConnectionRecord
1054] = {}
1055
1056
1057class PoolProxiedConnection(ManagesConnection):
1058 """A connection-like adapter for a :pep:`249` DBAPI connection, which
1059 includes additional methods specific to the :class:`.Pool` implementation.
1060
1061 :class:`.PoolProxiedConnection` is the public-facing interface for the
1062 internal :class:`._ConnectionFairy` implementation object; users familiar
1063 with :class:`._ConnectionFairy` can consider this object to be equivalent.
1064
1065 .. versionadded:: 2.0 :class:`.PoolProxiedConnection` provides the public-
1066 facing interface for the :class:`._ConnectionFairy` internal class.
1067
1068 """
1069
1070 __slots__ = ()
1071
1072 if typing.TYPE_CHECKING:
1073
1074 def commit(self) -> None: ...
1075
1076 def cursor(self, *args: Any, **kwargs: Any) -> DBAPICursor: ...
1077
1078 def rollback(self) -> None: ...
1079
1080 def __getattr__(self, key: str) -> Any: ...
1081
1082 @property
1083 def is_valid(self) -> bool:
1084 """Return True if this :class:`.PoolProxiedConnection` still refers
1085 to an active DBAPI connection."""
1086
1087 raise NotImplementedError()
1088
1089 @property
1090 def is_detached(self) -> bool:
1091 """Return True if this :class:`.PoolProxiedConnection` is detached
1092 from its pool."""
1093
1094 raise NotImplementedError()
1095
1096 def detach(self) -> None:
1097 """Separate this connection from its Pool.
1098
1099 This means that the connection will no longer be returned to the
1100 pool when closed, and will instead be literally closed. The
1101 associated :class:`.ConnectionPoolEntry` is de-associated from this
1102 DBAPI connection.
1103
1104 Note that any overall connection limiting constraints imposed by a
1105 Pool implementation may be violated after a detach, as the detached
1106 connection is removed from the pool's knowledge and control.
1107
1108 """
1109
1110 raise NotImplementedError()
1111
1112 def close(self) -> None:
1113 """Release this connection back to the pool.
1114
1115 The :meth:`.PoolProxiedConnection.close` method shadows the
1116 :pep:`249` ``.close()`` method, altering its behavior to instead
1117 :term:`release` the proxied connection back to the connection pool.
1118
1119 Upon release to the pool, whether the connection stays "opened" and
1120 pooled in the Python process, versus actually closed out and removed
1121 from the Python process, is based on the pool implementation in use and
1122 its configuration and current state.
1123
1124 """
1125 raise NotImplementedError()
1126
1127
1128class _AdhocProxiedConnection(PoolProxiedConnection):
1129 """provides the :class:`.PoolProxiedConnection` interface for cases where
1130 the DBAPI connection is not actually proxied.
1131
1132 This is used by the engine internals to pass a consistent
1133 :class:`.PoolProxiedConnection` object to consuming dialects in response to
1134 pool events that may not always have the :class:`._ConnectionFairy`
1135 available.
1136
1137 """
1138
1139 __slots__ = ("dbapi_connection", "_connection_record", "_is_valid")
1140
1141 dbapi_connection: DBAPIConnection
1142 _connection_record: ConnectionPoolEntry
1143
1144 def __init__(
1145 self,
1146 dbapi_connection: DBAPIConnection,
1147 connection_record: ConnectionPoolEntry,
1148 ):
1149 self.dbapi_connection = dbapi_connection
1150 self._connection_record = connection_record
1151 self._is_valid = True
1152
1153 @property
1154 def driver_connection(self) -> Any: # type: ignore[override] # mypy#4125
1155 return self._connection_record.driver_connection
1156
1157 @property
1158 def connection(self) -> DBAPIConnection:
1159 return self.dbapi_connection
1160
1161 @property
1162 def is_valid(self) -> bool:
1163 """Implement is_valid state attribute.
1164
1165 for the adhoc proxied connection it's assumed the connection is valid
1166 as there is no "invalidate" routine.
1167
1168 """
1169 return self._is_valid
1170
1171 def invalidate(
1172 self, e: Optional[BaseException] = None, soft: bool = False
1173 ) -> None:
1174 self._is_valid = False
1175
1176 @util.ro_non_memoized_property
1177 def record_info(self) -> Optional[_InfoType]:
1178 return self._connection_record.record_info
1179
1180 def cursor(self, *args: Any, **kwargs: Any) -> DBAPICursor:
1181 return self.dbapi_connection.cursor(*args, **kwargs)
1182
1183 def __getattr__(self, key: Any) -> Any:
1184 return getattr(self.dbapi_connection, key)
1185
1186
1187class _ConnectionFairy(PoolProxiedConnection):
1188 """Proxies a DBAPI connection and provides return-on-dereference
1189 support.
1190
1191 This is an internal object used by the :class:`_pool.Pool` implementation
1192 to provide context management to a DBAPI connection delivered by
1193 that :class:`_pool.Pool`. The public facing interface for this class
1194 is described by the :class:`.PoolProxiedConnection` class. See that
1195 class for public API details.
1196
1197 The name "fairy" is inspired by the fact that the
1198 :class:`._ConnectionFairy` object's lifespan is transitory, as it lasts
1199 only for the length of a specific DBAPI connection being checked out from
1200 the pool, and additionally that as a transparent proxy, it is mostly
1201 invisible.
1202
1203 .. seealso::
1204
1205 :class:`.PoolProxiedConnection`
1206
1207 :class:`.ConnectionPoolEntry`
1208
1209
1210 """
1211
1212 __slots__ = (
1213 "dbapi_connection",
1214 "_connection_record",
1215 "_echo",
1216 "_pool",
1217 "_counter",
1218 "__weakref__",
1219 "__dict__",
1220 )
1221
1222 pool: Pool
1223 dbapi_connection: DBAPIConnection
1224 _echo: log._EchoFlagType
1225
1226 def __init__(
1227 self,
1228 pool: Pool,
1229 dbapi_connection: DBAPIConnection,
1230 connection_record: _ConnectionRecord,
1231 echo: log._EchoFlagType,
1232 ):
1233 self._pool = pool
1234 self._counter = 0
1235 self.dbapi_connection = dbapi_connection
1236 self._connection_record = connection_record
1237 self._echo = echo
1238
1239 _connection_record: Optional[_ConnectionRecord]
1240
1241 @property
1242 def driver_connection(self) -> Optional[Any]: # type: ignore[override] # mypy#4125 # noqa: E501
1243 if self._connection_record is None:
1244 return None
1245 return self._connection_record.driver_connection
1246
1247 @property
1248 @util.deprecated(
1249 "2.0",
1250 "The _ConnectionFairy.connection attribute is deprecated; "
1251 "please use 'driver_connection'",
1252 )
1253 def connection(self) -> DBAPIConnection:
1254 return self.dbapi_connection
1255
1256 @classmethod
1257 def _checkout(
1258 cls,
1259 pool: Pool,
1260 threadconns: Optional[threading.local] = None,
1261 fairy: Optional[_ConnectionFairy] = None,
1262 ) -> _ConnectionFairy:
1263 if not fairy:
1264 fairy = _ConnectionRecord.checkout(pool)
1265
1266 if threadconns is not None:
1267 threadconns.current = weakref.ref(fairy)
1268
1269 assert (
1270 fairy._connection_record is not None
1271 ), "can't 'checkout' a detached connection fairy"
1272 assert (
1273 fairy.dbapi_connection is not None
1274 ), "can't 'checkout' an invalidated connection fairy"
1275
1276 fairy._counter += 1
1277 if (
1278 not pool.dispatch.checkout and not pool._pre_ping
1279 ) or fairy._counter != 1:
1280 return fairy
1281
1282 # Pool listeners can trigger a reconnection on checkout, as well
1283 # as the pre-pinger.
1284 # there are three attempts made here, but note that if the database
1285 # is not accessible from a connection standpoint, those won't proceed
1286 # here.
1287
1288 attempts = 2
1289
1290 while attempts > 0:
1291 connection_is_fresh = fairy._connection_record.fresh
1292 fairy._connection_record.fresh = False
1293 try:
1294 if pool._pre_ping:
1295 if not connection_is_fresh:
1296 if fairy._echo:
1297 pool.logger.debug(
1298 "Pool pre-ping on connection %s",
1299 fairy.dbapi_connection,
1300 )
1301 result = pool._dialect._do_ping_w_event(
1302 fairy.dbapi_connection
1303 )
1304 if not result:
1305 if fairy._echo:
1306 pool.logger.debug(
1307 "Pool pre-ping on connection %s failed, "
1308 "will invalidate pool",
1309 fairy.dbapi_connection,
1310 )
1311 raise exc.InvalidatePoolError()
1312 elif fairy._echo:
1313 pool.logger.debug(
1314 "Connection %s is fresh, skipping pre-ping",
1315 fairy.dbapi_connection,
1316 )
1317
1318 pool.dispatch.checkout(
1319 fairy.dbapi_connection, fairy._connection_record, fairy
1320 )
1321 return fairy
1322 except exc.DisconnectionError as e:
1323 if e.invalidate_pool:
1324 pool.logger.info(
1325 "Disconnection detected on checkout, "
1326 "invalidating all pooled connections prior to "
1327 "current timestamp (reason: %r)",
1328 e,
1329 )
1330 fairy._connection_record.invalidate(e)
1331 pool._invalidate(fairy, e, _checkin=False)
1332 else:
1333 pool.logger.info(
1334 "Disconnection detected on checkout, "
1335 "invalidating individual connection %s (reason: %r)",
1336 fairy.dbapi_connection,
1337 e,
1338 )
1339 fairy._connection_record.invalidate(e)
1340 try:
1341 fairy.dbapi_connection = (
1342 fairy._connection_record.get_connection()
1343 )
1344 except BaseException as err:
1345 with util.safe_reraise():
1346 fairy._connection_record._checkin_failed(
1347 err,
1348 _fairy_was_created=True,
1349 )
1350
1351 # prevent _ConnectionFairy from being carried
1352 # in the stack trace. Do this after the
1353 # connection record has been checked in, so that
1354 # if the del triggers a finalize fairy, it won't
1355 # try to checkin a second time.
1356 del fairy
1357
1358 # never called, this is for code linters
1359 raise
1360
1361 attempts -= 1
1362 except BaseException as be_outer:
1363 with util.safe_reraise():
1364 rec = fairy._connection_record
1365 if rec is not None:
1366 rec._checkin_failed(
1367 be_outer,
1368 _fairy_was_created=True,
1369 )
1370
1371 # prevent _ConnectionFairy from being carried
1372 # in the stack trace, see above
1373 del fairy
1374
1375 # never called, this is for code linters
1376 raise
1377
1378 pool.logger.info("Reconnection attempts exhausted on checkout")
1379 fairy.invalidate()
1380 raise exc.InvalidRequestError("This connection is closed")
1381
1382 def _checkout_existing(self) -> _ConnectionFairy:
1383 return _ConnectionFairy._checkout(self._pool, fairy=self)
1384
1385 def _checkin(self, transaction_was_reset: bool = False) -> None:
1386 _finalize_fairy(
1387 self.dbapi_connection,
1388 self._connection_record,
1389 self._pool,
1390 None,
1391 self._echo,
1392 transaction_was_reset=transaction_was_reset,
1393 fairy=self,
1394 )
1395
1396 def _close(self) -> None:
1397 self._checkin()
1398
1399 def _reset(
1400 self,
1401 pool: Pool,
1402 transaction_was_reset: bool,
1403 terminate_only: bool,
1404 asyncio_safe: bool,
1405 ) -> None:
1406 if pool.dispatch.reset:
1407 pool.dispatch.reset(
1408 self.dbapi_connection,
1409 self._connection_record,
1410 PoolResetState(
1411 transaction_was_reset=transaction_was_reset,
1412 terminate_only=terminate_only,
1413 asyncio_safe=asyncio_safe,
1414 ),
1415 )
1416
1417 if not asyncio_safe:
1418 return
1419
1420 if pool._reset_on_return is reset_rollback:
1421 if transaction_was_reset:
1422 if self._echo:
1423 pool.logger.debug(
1424 "Connection %s reset, transaction already reset",
1425 self.dbapi_connection,
1426 )
1427 else:
1428 if self._echo:
1429 pool.logger.debug(
1430 "Connection %s rollback-on-return",
1431 self.dbapi_connection,
1432 )
1433 pool._dialect.do_rollback(self)
1434 elif pool._reset_on_return is reset_commit:
1435 if self._echo:
1436 pool.logger.debug(
1437 "Connection %s commit-on-return",
1438 self.dbapi_connection,
1439 )
1440 pool._dialect.do_commit(self)
1441
1442 @property
1443 def _logger(self) -> log._IdentifiedLoggerType:
1444 return self._pool.logger
1445
1446 @property
1447 def is_valid(self) -> bool:
1448 return self.dbapi_connection is not None
1449
1450 @property
1451 def is_detached(self) -> bool:
1452 return self._connection_record is None
1453
1454 @util.ro_memoized_property
1455 def info(self) -> _InfoType:
1456 if self._connection_record is None:
1457 return {}
1458 else:
1459 return self._connection_record.info
1460
1461 @util.ro_non_memoized_property
1462 def record_info(self) -> Optional[_InfoType]:
1463 if self._connection_record is None:
1464 return None
1465 else:
1466 return self._connection_record.record_info
1467
1468 def invalidate(
1469 self, e: Optional[BaseException] = None, soft: bool = False
1470 ) -> None:
1471 if self.dbapi_connection is None:
1472 util.warn("Can't invalidate an already-closed connection.")
1473 return
1474 if self._connection_record:
1475 self._connection_record.invalidate(e=e, soft=soft)
1476 if not soft:
1477 # prevent any rollback / reset actions etc. on
1478 # the connection
1479 self.dbapi_connection = None # type: ignore
1480
1481 # finalize
1482 self._checkin()
1483
1484 def cursor(self, *args: Any, **kwargs: Any) -> DBAPICursor:
1485 assert self.dbapi_connection is not None
1486 return self.dbapi_connection.cursor(*args, **kwargs)
1487
1488 def __getattr__(self, key: str) -> Any:
1489 return getattr(self.dbapi_connection, key)
1490
1491 def detach(self) -> None:
1492 if self._connection_record is not None:
1493 rec = self._connection_record
1494 rec.fairy_ref = None
1495 rec.dbapi_connection = None
1496 # TODO: should this be _return_conn?
1497 self._pool._do_return_conn(self._connection_record)
1498
1499 # can't get the descriptor assignment to work here
1500 # in pylance. mypy is OK w/ it
1501 self.info = self.info.copy() # type: ignore
1502
1503 self._connection_record = None
1504
1505 if self._pool.dispatch.detach:
1506 self._pool.dispatch.detach(self.dbapi_connection, rec)
1507
1508 def close(self) -> None:
1509 self._counter -= 1
1510 if self._counter == 0:
1511 self._checkin()
1512
1513 def _close_special(self, transaction_reset: bool = False) -> None:
1514 self._counter -= 1
1515 if self._counter == 0:
1516 self._checkin(transaction_was_reset=transaction_reset)