1# pool/base.py
2# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8
9"""Base constructs for connection pools.
10
11"""
12
13from __future__ import annotations
14
15from collections import deque
16import dataclasses
17from enum import Enum
18import threading
19import time
20import typing
21from typing import Any
22from typing import Callable
23from typing import cast
24from typing import Deque
25from typing import Dict
26from typing import List
27from typing import Optional
28from typing import Protocol
29from typing import Tuple
30from typing import TYPE_CHECKING
31from typing import Union
32import weakref
33
34from .. import event
35from .. import exc
36from .. import log
37from .. import util
38from ..util.typing import Literal
39
40if TYPE_CHECKING:
41 from ..engine.interfaces import DBAPIConnection
42 from ..engine.interfaces import DBAPICursor
43 from ..engine.interfaces import Dialect
44 from ..event import _DispatchCommon
45 from ..event import _ListenerFnType
46 from ..event import dispatcher
47 from ..sql._typing import _InfoType
48
49
50@dataclasses.dataclass(frozen=True)
51class PoolResetState:
52 """describes the state of a DBAPI connection as it is being passed to
53 the :meth:`.PoolEvents.reset` connection pool event.
54
55 .. versionadded:: 2.0.0b3
56
57 """
58
59 __slots__ = ("transaction_was_reset", "terminate_only", "asyncio_safe")
60
61 transaction_was_reset: bool
62 """Indicates if the transaction on the DBAPI connection was already
63 essentially "reset" back by the :class:`.Connection` object.
64
65 This boolean is True if the :class:`.Connection` had transactional
66 state present upon it, which was then not closed using the
67 :meth:`.Connection.rollback` or :meth:`.Connection.commit` method;
68 instead, the transaction was closed inline within the
69 :meth:`.Connection.close` method so is guaranteed to remain non-present
70 when this event is reached.
71
72 """
73
74 terminate_only: bool
75 """indicates if the connection is to be immediately terminated and
76 not checked in to the pool.
77
78 This occurs for connections that were invalidated, as well as asyncio
79 connections that were not cleanly handled by the calling code that
80 are instead being garbage collected. In the latter case,
81 operations can't be safely run on asyncio connections within garbage
82 collection as there is not necessarily an event loop present.
83
84 """
85
86 asyncio_safe: bool
87 """Indicates if the reset operation is occurring within a scope where
88 an enclosing event loop is expected to be present for asyncio applications.
89
90 Will be False in the case that the connection is being garbage collected.
91
92 """
93
94
95class ResetStyle(Enum):
96 """Describe options for "reset on return" behaviors."""
97
98 reset_rollback = 0
99 reset_commit = 1
100 reset_none = 2
101
102
103_ResetStyleArgType = Union[
104 ResetStyle,
105 Literal[True, None, False, "commit", "rollback"],
106]
107reset_rollback, reset_commit, reset_none = list(ResetStyle)
108
109
110class _ConnDialect:
111 """partial implementation of :class:`.Dialect`
112 which provides DBAPI connection methods.
113
114 When a :class:`_pool.Pool` is combined with an :class:`_engine.Engine`,
115 the :class:`_engine.Engine` replaces this with its own
116 :class:`.Dialect`.
117
118 """
119
120 is_async = False
121 has_terminate = False
122
123 def do_rollback(self, dbapi_connection: PoolProxiedConnection) -> None:
124 dbapi_connection.rollback()
125
126 def do_commit(self, dbapi_connection: PoolProxiedConnection) -> None:
127 dbapi_connection.commit()
128
129 def do_terminate(self, dbapi_connection: DBAPIConnection) -> None:
130 dbapi_connection.close()
131
132 def do_close(self, dbapi_connection: DBAPIConnection) -> None:
133 dbapi_connection.close()
134
135 def _do_ping_w_event(self, dbapi_connection: DBAPIConnection) -> bool:
136 raise NotImplementedError(
137 "The ping feature requires that a dialect is "
138 "passed to the connection pool."
139 )
140
141 def get_driver_connection(self, connection: DBAPIConnection) -> Any:
142 return connection
143
144
145class _AsyncConnDialect(_ConnDialect):
146 is_async = True
147
148
149class _CreatorFnType(Protocol):
150 def __call__(self) -> DBAPIConnection: ...
151
152
153class _CreatorWRecFnType(Protocol):
154 def __call__(self, rec: ConnectionPoolEntry) -> DBAPIConnection: ...
155
156
157class Pool(log.Identified, event.EventTarget):
158 """Abstract base class for connection pools."""
159
160 dispatch: dispatcher[Pool]
161 echo: log._EchoFlagType
162
163 _orig_logging_name: Optional[str]
164 _dialect: Union[_ConnDialect, Dialect] = _ConnDialect()
165 _creator_arg: Union[_CreatorFnType, _CreatorWRecFnType]
166 _invoke_creator: _CreatorWRecFnType
167 _invalidate_time: float
168
169 def __init__(
170 self,
171 creator: Union[_CreatorFnType, _CreatorWRecFnType],
172 recycle: int = -1,
173 echo: log._EchoFlagType = None,
174 logging_name: Optional[str] = None,
175 reset_on_return: _ResetStyleArgType = True,
176 events: Optional[List[Tuple[_ListenerFnType, str]]] = None,
177 dialect: Optional[Union[_ConnDialect, Dialect]] = None,
178 pre_ping: bool = False,
179 _dispatch: Optional[_DispatchCommon[Pool]] = None,
180 ):
181 """
182 Construct a Pool.
183
184 :param creator: a callable function that returns a DB-API
185 connection object. The function will be called with
186 parameters.
187
188 :param recycle: If set to a value other than -1, number of
189 seconds between connection recycling, which means upon
190 checkout, if this timeout is surpassed the connection will be
191 closed and replaced with a newly opened connection. Defaults to -1.
192
193 :param logging_name: String identifier which will be used within
194 the "name" field of logging records generated within the
195 "sqlalchemy.pool" logger. Defaults to a hexstring of the object's
196 id.
197
198 :param echo: if True, the connection pool will log
199 informational output such as when connections are invalidated
200 as well as when connections are recycled to the default log handler,
201 which defaults to ``sys.stdout`` for output.. If set to the string
202 ``"debug"``, the logging will include pool checkouts and checkins.
203
204 The :paramref:`_pool.Pool.echo` parameter can also be set from the
205 :func:`_sa.create_engine` call by using the
206 :paramref:`_sa.create_engine.echo_pool` parameter.
207
208 .. seealso::
209
210 :ref:`dbengine_logging` - further detail on how to configure
211 logging.
212
213 :param reset_on_return: Determine steps to take on
214 connections as they are returned to the pool, which were
215 not otherwise handled by a :class:`_engine.Connection`.
216 Available from :func:`_sa.create_engine` via the
217 :paramref:`_sa.create_engine.pool_reset_on_return` parameter.
218
219 :paramref:`_pool.Pool.reset_on_return` can have any of these values:
220
221 * ``"rollback"`` - call rollback() on the connection,
222 to release locks and transaction resources.
223 This is the default value. The vast majority
224 of use cases should leave this value set.
225 * ``"commit"`` - call commit() on the connection,
226 to release locks and transaction resources.
227 A commit here may be desirable for databases that
228 cache query plans if a commit is emitted,
229 such as Microsoft SQL Server. However, this
230 value is more dangerous than 'rollback' because
231 any data changes present on the transaction
232 are committed unconditionally.
233 * ``None`` - don't do anything on the connection.
234 This setting may be appropriate if the database / DBAPI
235 works in pure "autocommit" mode at all times, or if
236 a custom reset handler is established using the
237 :meth:`.PoolEvents.reset` event handler.
238
239 * ``True`` - same as 'rollback', this is here for
240 backwards compatibility.
241 * ``False`` - same as None, this is here for
242 backwards compatibility.
243
244 For further customization of reset on return, the
245 :meth:`.PoolEvents.reset` event hook may be used which can perform
246 any connection activity desired on reset.
247
248 .. seealso::
249
250 :ref:`pool_reset_on_return`
251
252 :meth:`.PoolEvents.reset`
253
254 :param events: a list of 2-tuples, each of the form
255 ``(callable, target)`` which will be passed to :func:`.event.listen`
256 upon construction. Provided here so that event listeners
257 can be assigned via :func:`_sa.create_engine` before dialect-level
258 listeners are applied.
259
260 :param dialect: a :class:`.Dialect` that will handle the job
261 of calling rollback(), close(), or commit() on DBAPI connections.
262 If omitted, a built-in "stub" dialect is used. Applications that
263 make use of :func:`_sa.create_engine` should not use this parameter
264 as it is handled by the engine creation strategy.
265
266 :param pre_ping: if True, the pool will emit a "ping" (typically
267 "SELECT 1", but is dialect-specific) on the connection
268 upon checkout, to test if the connection is alive or not. If not,
269 the connection is transparently re-connected and upon success, all
270 other pooled connections established prior to that timestamp are
271 invalidated. Requires that a dialect is passed as well to
272 interpret the disconnection error.
273
274 .. versionadded:: 1.2
275
276 """
277 if logging_name:
278 self.logging_name = self._orig_logging_name = logging_name
279 else:
280 self._orig_logging_name = None
281
282 log.instance_logger(self, echoflag=echo)
283 self._creator = creator
284 self._recycle = recycle
285 self._invalidate_time = 0
286 self._pre_ping = pre_ping
287 self._reset_on_return = util.parse_user_argument_for_enum(
288 reset_on_return,
289 {
290 ResetStyle.reset_rollback: ["rollback", True],
291 ResetStyle.reset_none: ["none", None, False],
292 ResetStyle.reset_commit: ["commit"],
293 },
294 "reset_on_return",
295 )
296
297 self.echo = echo
298
299 if _dispatch:
300 self.dispatch._update(_dispatch, only_propagate=False)
301 if dialect:
302 self._dialect = dialect
303 if events:
304 for fn, target in events:
305 event.listen(self, target, fn)
306
307 @util.hybridproperty
308 def _is_asyncio(self) -> bool:
309 return self._dialect.is_async
310
311 @property
312 def _creator(self) -> Union[_CreatorFnType, _CreatorWRecFnType]:
313 return self._creator_arg
314
315 @_creator.setter
316 def _creator(
317 self, creator: Union[_CreatorFnType, _CreatorWRecFnType]
318 ) -> None:
319 self._creator_arg = creator
320
321 # mypy seems to get super confused assigning functions to
322 # attributes
323 self._invoke_creator = self._should_wrap_creator(creator)
324
325 @_creator.deleter
326 def _creator(self) -> None:
327 # needed for mock testing
328 del self._creator_arg
329 del self._invoke_creator
330
331 def _should_wrap_creator(
332 self, creator: Union[_CreatorFnType, _CreatorWRecFnType]
333 ) -> _CreatorWRecFnType:
334 """Detect if creator accepts a single argument, or is sent
335 as a legacy style no-arg function.
336
337 """
338
339 try:
340 argspec = util.get_callable_argspec(self._creator, no_self=True)
341 except TypeError:
342 creator_fn = cast(_CreatorFnType, creator)
343 return lambda rec: creator_fn()
344
345 if argspec.defaults is not None:
346 defaulted = len(argspec.defaults)
347 else:
348 defaulted = 0
349 positionals = len(argspec[0]) - defaulted
350
351 # look for the exact arg signature that DefaultStrategy
352 # sends us
353 if (argspec[0], argspec[3]) == (["connection_record"], (None,)):
354 return cast(_CreatorWRecFnType, creator)
355 # or just a single positional
356 elif positionals == 1:
357 return cast(_CreatorWRecFnType, creator)
358 # all other cases, just wrap and assume legacy "creator" callable
359 # thing
360 else:
361 creator_fn = cast(_CreatorFnType, creator)
362 return lambda rec: creator_fn()
363
364 def _close_connection(
365 self, connection: DBAPIConnection, *, terminate: bool = False
366 ) -> None:
367 self.logger.debug(
368 "%s connection %r",
369 "Hard-closing" if terminate else "Closing",
370 connection,
371 )
372 try:
373 if terminate:
374 self._dialect.do_terminate(connection)
375 else:
376 self._dialect.do_close(connection)
377 except BaseException as e:
378 self.logger.error(
379 f"Exception {'terminating' if terminate else 'closing'} "
380 f"connection %r",
381 connection,
382 exc_info=True,
383 )
384 if not isinstance(e, Exception):
385 raise
386
387 def _create_connection(self) -> ConnectionPoolEntry:
388 """Called by subclasses to create a new ConnectionRecord."""
389
390 return _ConnectionRecord(self)
391
392 def _invalidate(
393 self,
394 connection: PoolProxiedConnection,
395 exception: Optional[BaseException] = None,
396 _checkin: bool = True,
397 ) -> None:
398 """Mark all connections established within the generation
399 of the given connection as invalidated.
400
401 If this pool's last invalidate time is before when the given
402 connection was created, update the timestamp til now. Otherwise,
403 no action is performed.
404
405 Connections with a start time prior to this pool's invalidation
406 time will be recycled upon next checkout.
407 """
408 rec = getattr(connection, "_connection_record", None)
409 if not rec or self._invalidate_time < rec.starttime:
410 self._invalidate_time = time.time()
411 if _checkin and getattr(connection, "is_valid", False):
412 connection.invalidate(exception)
413
414 def recreate(self) -> Pool:
415 """Return a new :class:`_pool.Pool`, of the same class as this one
416 and configured with identical creation arguments.
417
418 This method is used in conjunction with :meth:`dispose`
419 to close out an entire :class:`_pool.Pool` and create a new one in
420 its place.
421
422 """
423
424 raise NotImplementedError()
425
426 def dispose(self) -> None:
427 """Dispose of this pool.
428
429 This method leaves the possibility of checked-out connections
430 remaining open, as it only affects connections that are
431 idle in the pool.
432
433 .. seealso::
434
435 :meth:`Pool.recreate`
436
437 """
438
439 raise NotImplementedError()
440
441 def connect(self) -> PoolProxiedConnection:
442 """Return a DBAPI connection from the pool.
443
444 The connection is instrumented such that when its
445 ``close()`` method is called, the connection will be returned to
446 the pool.
447
448 """
449 return _ConnectionFairy._checkout(self)
450
451 def _return_conn(self, record: ConnectionPoolEntry) -> None:
452 """Given a _ConnectionRecord, return it to the :class:`_pool.Pool`.
453
454 This method is called when an instrumented DBAPI connection
455 has its ``close()`` method called.
456
457 """
458 self._do_return_conn(record)
459
460 def _do_get(self) -> ConnectionPoolEntry:
461 """Implementation for :meth:`get`, supplied by subclasses."""
462
463 raise NotImplementedError()
464
465 def _do_return_conn(self, record: ConnectionPoolEntry) -> None:
466 """Implementation for :meth:`return_conn`, supplied by subclasses."""
467
468 raise NotImplementedError()
469
470 def status(self) -> str:
471 raise NotImplementedError()
472
473
474class ManagesConnection:
475 """Common base for the two connection-management interfaces
476 :class:`.PoolProxiedConnection` and :class:`.ConnectionPoolEntry`.
477
478 These two objects are typically exposed in the public facing API
479 via the connection pool event hooks, documented at :class:`.PoolEvents`.
480
481 .. versionadded:: 2.0
482
483 """
484
485 __slots__ = ()
486
487 dbapi_connection: Optional[DBAPIConnection]
488 """A reference to the actual DBAPI connection being tracked.
489
490 This is a :pep:`249`-compliant object that for traditional sync-style
491 dialects is provided by the third-party
492 DBAPI implementation in use. For asyncio dialects, the implementation
493 is typically an adapter object provided by the SQLAlchemy dialect
494 itself; the underlying asyncio object is available via the
495 :attr:`.ManagesConnection.driver_connection` attribute.
496
497 SQLAlchemy's interface for the DBAPI connection is based on the
498 :class:`.DBAPIConnection` protocol object
499
500 .. seealso::
501
502 :attr:`.ManagesConnection.driver_connection`
503
504 :ref:`faq_dbapi_connection`
505
506 """
507
508 driver_connection: Optional[Any]
509 """The "driver level" connection object as used by the Python
510 DBAPI or database driver.
511
512 For traditional :pep:`249` DBAPI implementations, this object will
513 be the same object as that of
514 :attr:`.ManagesConnection.dbapi_connection`. For an asyncio database
515 driver, this will be the ultimate "connection" object used by that
516 driver, such as the ``asyncpg.Connection`` object which will not have
517 standard pep-249 methods.
518
519 .. versionadded:: 1.4.24
520
521 .. seealso::
522
523 :attr:`.ManagesConnection.dbapi_connection`
524
525 :ref:`faq_dbapi_connection`
526
527 """
528
529 @util.ro_memoized_property
530 def info(self) -> _InfoType:
531 """Info dictionary associated with the underlying DBAPI connection
532 referred to by this :class:`.ManagesConnection` instance, allowing
533 user-defined data to be associated with the connection.
534
535 The data in this dictionary is persistent for the lifespan
536 of the DBAPI connection itself, including across pool checkins
537 and checkouts. When the connection is invalidated
538 and replaced with a new one, this dictionary is cleared.
539
540 For a :class:`.PoolProxiedConnection` instance that's not associated
541 with a :class:`.ConnectionPoolEntry`, such as if it were detached, the
542 attribute returns a dictionary that is local to that
543 :class:`.ConnectionPoolEntry`. Therefore the
544 :attr:`.ManagesConnection.info` attribute will always provide a Python
545 dictionary.
546
547 .. seealso::
548
549 :attr:`.ManagesConnection.record_info`
550
551
552 """
553 raise NotImplementedError()
554
555 @util.ro_memoized_property
556 def record_info(self) -> Optional[_InfoType]:
557 """Persistent info dictionary associated with this
558 :class:`.ManagesConnection`.
559
560 Unlike the :attr:`.ManagesConnection.info` dictionary, the lifespan
561 of this dictionary is that of the :class:`.ConnectionPoolEntry`
562 which owns it; therefore this dictionary will persist across
563 reconnects and connection invalidation for a particular entry
564 in the connection pool.
565
566 For a :class:`.PoolProxiedConnection` instance that's not associated
567 with a :class:`.ConnectionPoolEntry`, such as if it were detached, the
568 attribute returns None. Contrast to the :attr:`.ManagesConnection.info`
569 dictionary which is never None.
570
571
572 .. seealso::
573
574 :attr:`.ManagesConnection.info`
575
576 """
577 raise NotImplementedError()
578
579 def invalidate(
580 self, e: Optional[BaseException] = None, soft: bool = False
581 ) -> None:
582 """Mark the managed connection as invalidated.
583
584 :param e: an exception object indicating a reason for the invalidation.
585
586 :param soft: if True, the connection isn't closed; instead, this
587 connection will be recycled on next checkout.
588
589 .. seealso::
590
591 :ref:`pool_connection_invalidation`
592
593
594 """
595 raise NotImplementedError()
596
597
598class ConnectionPoolEntry(ManagesConnection):
599 """Interface for the object that maintains an individual database
600 connection on behalf of a :class:`_pool.Pool` instance.
601
602 The :class:`.ConnectionPoolEntry` object represents the long term
603 maintainance of a particular connection for a pool, including expiring or
604 invalidating that connection to have it replaced with a new one, which will
605 continue to be maintained by that same :class:`.ConnectionPoolEntry`
606 instance. Compared to :class:`.PoolProxiedConnection`, which is the
607 short-term, per-checkout connection manager, this object lasts for the
608 lifespan of a particular "slot" within a connection pool.
609
610 The :class:`.ConnectionPoolEntry` object is mostly visible to public-facing
611 API code when it is delivered to connection pool event hooks, such as
612 :meth:`_events.PoolEvents.connect` and :meth:`_events.PoolEvents.checkout`.
613
614 .. versionadded:: 2.0 :class:`.ConnectionPoolEntry` provides the public
615 facing interface for the :class:`._ConnectionRecord` internal class.
616
617 """
618
619 __slots__ = ()
620
621 @property
622 def in_use(self) -> bool:
623 """Return True the connection is currently checked out"""
624
625 raise NotImplementedError()
626
627 def close(self) -> None:
628 """Close the DBAPI connection managed by this connection pool entry."""
629 raise NotImplementedError()
630
631
632class _ConnectionRecord(ConnectionPoolEntry):
633 """Maintains a position in a connection pool which references a pooled
634 connection.
635
636 This is an internal object used by the :class:`_pool.Pool` implementation
637 to provide context management to a DBAPI connection maintained by
638 that :class:`_pool.Pool`. The public facing interface for this class
639 is described by the :class:`.ConnectionPoolEntry` class. See that
640 class for public API details.
641
642 .. seealso::
643
644 :class:`.ConnectionPoolEntry`
645
646 :class:`.PoolProxiedConnection`
647
648 """
649
650 __slots__ = (
651 "__pool",
652 "fairy_ref",
653 "finalize_callback",
654 "fresh",
655 "starttime",
656 "dbapi_connection",
657 "__weakref__",
658 "__dict__",
659 )
660
661 finalize_callback: Deque[Callable[[DBAPIConnection], None]]
662 fresh: bool
663 fairy_ref: Optional[weakref.ref[_ConnectionFairy]]
664 starttime: float
665
666 def __init__(self, pool: Pool, connect: bool = True):
667 self.fresh = False
668 self.fairy_ref = None
669 self.starttime = 0
670 self.dbapi_connection = None
671
672 self.__pool = pool
673 if connect:
674 self.__connect()
675 self.finalize_callback = deque()
676
677 dbapi_connection: Optional[DBAPIConnection]
678
679 @property
680 def driver_connection(self) -> Optional[Any]: # type: ignore[override] # mypy#4125 # noqa: E501
681 if self.dbapi_connection is None:
682 return None
683 else:
684 return self.__pool._dialect.get_driver_connection(
685 self.dbapi_connection
686 )
687
688 @property
689 @util.deprecated(
690 "2.0",
691 "The _ConnectionRecord.connection attribute is deprecated; "
692 "please use 'driver_connection'",
693 )
694 def connection(self) -> Optional[DBAPIConnection]:
695 return self.dbapi_connection
696
697 _soft_invalidate_time: float = 0
698
699 @util.ro_memoized_property
700 def info(self) -> _InfoType:
701 return {}
702
703 @util.ro_memoized_property
704 def record_info(self) -> Optional[_InfoType]:
705 return {}
706
707 @classmethod
708 def checkout(cls, pool: Pool) -> _ConnectionFairy:
709 if TYPE_CHECKING:
710 rec = cast(_ConnectionRecord, pool._do_get())
711 else:
712 rec = pool._do_get()
713
714 try:
715 dbapi_connection = rec.get_connection()
716 except BaseException as err:
717 with util.safe_reraise():
718 rec._checkin_failed(err, _fairy_was_created=False)
719
720 # not reached, for code linters only
721 raise
722
723 echo = pool._should_log_debug()
724 fairy = _ConnectionFairy(pool, dbapi_connection, rec, echo)
725
726 rec.fairy_ref = ref = weakref.ref(
727 fairy,
728 lambda ref: (
729 _finalize_fairy(
730 None, rec, pool, ref, echo, transaction_was_reset=False
731 )
732 if _finalize_fairy is not None
733 else None
734 ),
735 )
736 _strong_ref_connection_records[ref] = rec
737 if echo:
738 pool.logger.debug(
739 "Connection %r checked out from pool", dbapi_connection
740 )
741 return fairy
742
743 def _checkin_failed(
744 self, err: BaseException, _fairy_was_created: bool = True
745 ) -> None:
746 self.invalidate(e=err)
747 self.checkin(
748 _fairy_was_created=_fairy_was_created,
749 )
750
751 def checkin(self, _fairy_was_created: bool = True) -> None:
752 if self.fairy_ref is None and _fairy_was_created:
753 # _fairy_was_created is False for the initial get connection phase;
754 # meaning there was no _ConnectionFairy and we must unconditionally
755 # do a checkin.
756 #
757 # otherwise, if fairy_was_created==True, if fairy_ref is None here
758 # that means we were checked in already, so this looks like
759 # a double checkin.
760 util.warn("Double checkin attempted on %s" % self)
761 return
762 self.fairy_ref = None
763 connection = self.dbapi_connection
764 pool = self.__pool
765 while self.finalize_callback:
766 finalizer = self.finalize_callback.pop()
767 if connection is not None:
768 finalizer(connection)
769 if pool.dispatch.checkin:
770 pool.dispatch.checkin(connection, self)
771
772 pool._return_conn(self)
773
774 @property
775 def in_use(self) -> bool:
776 return self.fairy_ref is not None
777
778 @property
779 def last_connect_time(self) -> float:
780 return self.starttime
781
782 def close(self) -> None:
783 if self.dbapi_connection is not None:
784 self.__close()
785
786 def invalidate(
787 self, e: Optional[BaseException] = None, soft: bool = False
788 ) -> None:
789 # already invalidated
790 if self.dbapi_connection is None:
791 return
792 if soft:
793 self.__pool.dispatch.soft_invalidate(
794 self.dbapi_connection, self, e
795 )
796 else:
797 self.__pool.dispatch.invalidate(self.dbapi_connection, self, e)
798 if e is not None:
799 self.__pool.logger.info(
800 "%sInvalidate connection %r (reason: %s:%s)",
801 "Soft " if soft else "",
802 self.dbapi_connection,
803 e.__class__.__name__,
804 e,
805 )
806 else:
807 self.__pool.logger.info(
808 "%sInvalidate connection %r",
809 "Soft " if soft else "",
810 self.dbapi_connection,
811 )
812
813 if soft:
814 self._soft_invalidate_time = time.time()
815 else:
816 self.__close(terminate=True)
817 self.dbapi_connection = None
818
819 def get_connection(self) -> DBAPIConnection:
820 recycle = False
821
822 # NOTE: the various comparisons here are assuming that measurable time
823 # passes between these state changes. however, time.time() is not
824 # guaranteed to have sub-second precision. comparisons of
825 # "invalidation time" to "starttime" should perhaps use >= so that the
826 # state change can take place assuming no measurable time has passed,
827 # however this does not guarantee correct behavior here as if time
828 # continues to not pass, it will try to reconnect repeatedly until
829 # these timestamps diverge, so in that sense using > is safer. Per
830 # https://stackoverflow.com/a/1938096/34549, Windows time.time() may be
831 # within 16 milliseconds accuracy, so unit tests for connection
832 # invalidation need a sleep of at least this long between initial start
833 # time and invalidation for the logic below to work reliably.
834
835 if self.dbapi_connection is None:
836 self.info.clear()
837 self.__connect()
838 elif (
839 self.__pool._recycle > -1
840 and time.time() - self.starttime > self.__pool._recycle
841 ):
842 self.__pool.logger.info(
843 "Connection %r exceeded timeout; recycling",
844 self.dbapi_connection,
845 )
846 recycle = True
847 elif self.__pool._invalidate_time > self.starttime:
848 self.__pool.logger.info(
849 "Connection %r invalidated due to pool invalidation; "
850 + "recycling",
851 self.dbapi_connection,
852 )
853 recycle = True
854 elif self._soft_invalidate_time > self.starttime:
855 self.__pool.logger.info(
856 "Connection %r invalidated due to local soft invalidation; "
857 + "recycling",
858 self.dbapi_connection,
859 )
860 recycle = True
861
862 if recycle:
863 self.__close(terminate=True)
864 self.info.clear()
865
866 self.__connect()
867
868 assert self.dbapi_connection is not None
869 return self.dbapi_connection
870
871 def _is_hard_or_soft_invalidated(self) -> bool:
872 return (
873 self.dbapi_connection is None
874 or self.__pool._invalidate_time > self.starttime
875 or (self._soft_invalidate_time > self.starttime)
876 )
877
878 def __close(self, *, terminate: bool = False) -> None:
879 self.finalize_callback.clear()
880 if self.__pool.dispatch.close:
881 self.__pool.dispatch.close(self.dbapi_connection, self)
882 assert self.dbapi_connection is not None
883 self.__pool._close_connection(
884 self.dbapi_connection, terminate=terminate
885 )
886 self.dbapi_connection = None
887
888 def __connect(self) -> None:
889 pool = self.__pool
890
891 # ensure any existing connection is removed, so that if
892 # creator fails, this attribute stays None
893 self.dbapi_connection = None
894 try:
895 self.starttime = time.time()
896 self.dbapi_connection = connection = pool._invoke_creator(self)
897 pool.logger.debug("Created new connection %r", connection)
898 self.fresh = True
899 except BaseException as e:
900 with util.safe_reraise():
901 pool.logger.debug("Error on connect(): %s", e)
902 else:
903 # in SQLAlchemy 1.4 the first_connect event is not used by
904 # the engine, so this will usually not be set
905 if pool.dispatch.first_connect:
906 pool.dispatch.first_connect.for_modify(
907 pool.dispatch
908 ).exec_once_unless_exception(self.dbapi_connection, self)
909
910 # init of the dialect now takes place within the connect
911 # event, so ensure a mutex is used on the first run
912 pool.dispatch.connect.for_modify(
913 pool.dispatch
914 )._exec_w_sync_on_first_run(self.dbapi_connection, self)
915
916
917def _finalize_fairy(
918 dbapi_connection: Optional[DBAPIConnection],
919 connection_record: Optional[_ConnectionRecord],
920 pool: Pool,
921 ref: Optional[
922 weakref.ref[_ConnectionFairy]
923 ], # this is None when called directly, not by the gc
924 echo: Optional[log._EchoFlagType],
925 transaction_was_reset: bool = False,
926 fairy: Optional[_ConnectionFairy] = None,
927) -> None:
928 """Cleanup for a :class:`._ConnectionFairy` whether or not it's already
929 been garbage collected.
930
931 When using an async dialect no IO can happen here (without using
932 a dedicated thread), since this is called outside the greenlet
933 context and with an already running loop. In this case function
934 will only log a message and raise a warning.
935 """
936
937 is_gc_cleanup = ref is not None
938
939 if is_gc_cleanup:
940 assert ref is not None
941 _strong_ref_connection_records.pop(ref, None)
942 assert connection_record is not None
943 if connection_record.fairy_ref is not ref:
944 return
945 assert dbapi_connection is None
946 dbapi_connection = connection_record.dbapi_connection
947
948 elif fairy:
949 _strong_ref_connection_records.pop(weakref.ref(fairy), None)
950
951 # null pool is not _is_asyncio but can be used also with async dialects
952 dont_restore_gced = pool._dialect.is_async
953
954 if dont_restore_gced:
955 detach = connection_record is None or is_gc_cleanup
956 can_manipulate_connection = not is_gc_cleanup
957 can_close_or_terminate_connection = (
958 not pool._dialect.is_async or pool._dialect.has_terminate
959 )
960 requires_terminate_for_close = (
961 pool._dialect.is_async and pool._dialect.has_terminate
962 )
963
964 else:
965 detach = connection_record is None
966 can_manipulate_connection = can_close_or_terminate_connection = True
967 requires_terminate_for_close = False
968
969 if dbapi_connection is not None:
970 if connection_record and echo:
971 pool.logger.debug(
972 "Connection %r being returned to pool", dbapi_connection
973 )
974
975 try:
976 if not fairy:
977 assert connection_record is not None
978 fairy = _ConnectionFairy(
979 pool,
980 dbapi_connection,
981 connection_record,
982 echo,
983 )
984 assert fairy.dbapi_connection is dbapi_connection
985
986 fairy._reset(
987 pool,
988 transaction_was_reset=transaction_was_reset,
989 terminate_only=detach,
990 asyncio_safe=can_manipulate_connection,
991 )
992
993 if detach:
994 if connection_record:
995 fairy._pool = pool
996 fairy.detach()
997
998 if can_close_or_terminate_connection:
999 if pool.dispatch.close_detached:
1000 pool.dispatch.close_detached(dbapi_connection)
1001
1002 pool._close_connection(
1003 dbapi_connection,
1004 terminate=requires_terminate_for_close,
1005 )
1006
1007 except BaseException as e:
1008 pool.logger.error(
1009 "Exception during reset or similar", exc_info=True
1010 )
1011 if connection_record:
1012 connection_record.invalidate(e=e)
1013 if not isinstance(e, Exception):
1014 raise
1015 finally:
1016 if detach and is_gc_cleanup and dont_restore_gced:
1017 message = (
1018 "The garbage collector is trying to clean up "
1019 f"non-checked-in connection {dbapi_connection!r}, "
1020 f"""which will be {
1021 'dropped, as it cannot be safely terminated'
1022 if not can_close_or_terminate_connection
1023 else 'terminated'
1024 }. """
1025 "Please ensure that SQLAlchemy pooled connections are "
1026 "returned to "
1027 "the pool explicitly, either by calling ``close()`` "
1028 "or by using appropriate context managers to manage "
1029 "their lifecycle."
1030 )
1031 pool.logger.error(message)
1032 util.warn(message)
1033
1034 if connection_record and connection_record.fairy_ref is not None:
1035 connection_record.checkin()
1036
1037 # give gc some help. See
1038 # test/engine/test_pool.py::PoolEventsTest::test_checkin_event_gc[True]
1039 # which actually started failing when pytest warnings plugin was
1040 # turned on, due to util.warn() above
1041 if fairy is not None:
1042 fairy.dbapi_connection = None # type: ignore
1043 fairy._connection_record = None
1044 del dbapi_connection
1045 del connection_record
1046 del fairy
1047
1048
1049# a dictionary of the _ConnectionFairy weakrefs to _ConnectionRecord, so that
1050# GC under pypy will call ConnectionFairy finalizers. linked directly to the
1051# weakref that will empty itself when collected so that it should not create
1052# any unmanaged memory references.
1053_strong_ref_connection_records: Dict[
1054 weakref.ref[_ConnectionFairy], _ConnectionRecord
1055] = {}
1056
1057
1058class PoolProxiedConnection(ManagesConnection):
1059 """A connection-like adapter for a :pep:`249` DBAPI connection, which
1060 includes additional methods specific to the :class:`.Pool` implementation.
1061
1062 :class:`.PoolProxiedConnection` is the public-facing interface for the
1063 internal :class:`._ConnectionFairy` implementation object; users familiar
1064 with :class:`._ConnectionFairy` can consider this object to be equivalent.
1065
1066 .. versionadded:: 2.0 :class:`.PoolProxiedConnection` provides the public-
1067 facing interface for the :class:`._ConnectionFairy` internal class.
1068
1069 """
1070
1071 __slots__ = ()
1072
1073 if typing.TYPE_CHECKING:
1074
1075 def commit(self) -> None: ...
1076
1077 def cursor(self) -> DBAPICursor: ...
1078
1079 def rollback(self) -> None: ...
1080
1081 @property
1082 def is_valid(self) -> bool:
1083 """Return True if this :class:`.PoolProxiedConnection` still refers
1084 to an active DBAPI connection."""
1085
1086 raise NotImplementedError()
1087
1088 @property
1089 def is_detached(self) -> bool:
1090 """Return True if this :class:`.PoolProxiedConnection` is detached
1091 from its pool."""
1092
1093 raise NotImplementedError()
1094
1095 def detach(self) -> None:
1096 """Separate this connection from its Pool.
1097
1098 This means that the connection will no longer be returned to the
1099 pool when closed, and will instead be literally closed. The
1100 associated :class:`.ConnectionPoolEntry` is de-associated from this
1101 DBAPI connection.
1102
1103 Note that any overall connection limiting constraints imposed by a
1104 Pool implementation may be violated after a detach, as the detached
1105 connection is removed from the pool's knowledge and control.
1106
1107 """
1108
1109 raise NotImplementedError()
1110
1111 def close(self) -> None:
1112 """Release this connection back to the pool.
1113
1114 The :meth:`.PoolProxiedConnection.close` method shadows the
1115 :pep:`249` ``.close()`` method, altering its behavior to instead
1116 :term:`release` the proxied connection back to the connection pool.
1117
1118 Upon release to the pool, whether the connection stays "opened" and
1119 pooled in the Python process, versus actually closed out and removed
1120 from the Python process, is based on the pool implementation in use and
1121 its configuration and current state.
1122
1123 """
1124 raise NotImplementedError()
1125
1126
1127class _AdhocProxiedConnection(PoolProxiedConnection):
1128 """provides the :class:`.PoolProxiedConnection` interface for cases where
1129 the DBAPI connection is not actually proxied.
1130
1131 This is used by the engine internals to pass a consistent
1132 :class:`.PoolProxiedConnection` object to consuming dialects in response to
1133 pool events that may not always have the :class:`._ConnectionFairy`
1134 available.
1135
1136 """
1137
1138 __slots__ = ("dbapi_connection", "_connection_record", "_is_valid")
1139
1140 dbapi_connection: DBAPIConnection
1141 _connection_record: ConnectionPoolEntry
1142
1143 def __init__(
1144 self,
1145 dbapi_connection: DBAPIConnection,
1146 connection_record: ConnectionPoolEntry,
1147 ):
1148 self.dbapi_connection = dbapi_connection
1149 self._connection_record = connection_record
1150 self._is_valid = True
1151
1152 @property
1153 def driver_connection(self) -> Any: # type: ignore[override] # mypy#4125
1154 return self._connection_record.driver_connection
1155
1156 @property
1157 def connection(self) -> DBAPIConnection:
1158 return self.dbapi_connection
1159
1160 @property
1161 def is_valid(self) -> bool:
1162 """Implement is_valid state attribute.
1163
1164 for the adhoc proxied connection it's assumed the connection is valid
1165 as there is no "invalidate" routine.
1166
1167 """
1168 return self._is_valid
1169
1170 def invalidate(
1171 self, e: Optional[BaseException] = None, soft: bool = False
1172 ) -> None:
1173 self._is_valid = False
1174
1175 @util.ro_non_memoized_property
1176 def record_info(self) -> Optional[_InfoType]:
1177 return self._connection_record.record_info
1178
1179 def cursor(self, *args: Any, **kwargs: Any) -> DBAPICursor:
1180 return self.dbapi_connection.cursor(*args, **kwargs)
1181
1182 def __getattr__(self, key: Any) -> Any:
1183 return getattr(self.dbapi_connection, key)
1184
1185
1186class _ConnectionFairy(PoolProxiedConnection):
1187 """Proxies a DBAPI connection and provides return-on-dereference
1188 support.
1189
1190 This is an internal object used by the :class:`_pool.Pool` implementation
1191 to provide context management to a DBAPI connection delivered by
1192 that :class:`_pool.Pool`. The public facing interface for this class
1193 is described by the :class:`.PoolProxiedConnection` class. See that
1194 class for public API details.
1195
1196 The name "fairy" is inspired by the fact that the
1197 :class:`._ConnectionFairy` object's lifespan is transitory, as it lasts
1198 only for the length of a specific DBAPI connection being checked out from
1199 the pool, and additionally that as a transparent proxy, it is mostly
1200 invisible.
1201
1202 .. seealso::
1203
1204 :class:`.PoolProxiedConnection`
1205
1206 :class:`.ConnectionPoolEntry`
1207
1208
1209 """
1210
1211 __slots__ = (
1212 "dbapi_connection",
1213 "_connection_record",
1214 "_echo",
1215 "_pool",
1216 "_counter",
1217 "__weakref__",
1218 "__dict__",
1219 )
1220
1221 pool: Pool
1222 dbapi_connection: DBAPIConnection
1223 _echo: log._EchoFlagType
1224
1225 def __init__(
1226 self,
1227 pool: Pool,
1228 dbapi_connection: DBAPIConnection,
1229 connection_record: _ConnectionRecord,
1230 echo: log._EchoFlagType,
1231 ):
1232 self._pool = pool
1233 self._counter = 0
1234 self.dbapi_connection = dbapi_connection
1235 self._connection_record = connection_record
1236 self._echo = echo
1237
1238 _connection_record: Optional[_ConnectionRecord]
1239
1240 @property
1241 def driver_connection(self) -> Optional[Any]: # type: ignore[override] # mypy#4125 # noqa: E501
1242 if self._connection_record is None:
1243 return None
1244 return self._connection_record.driver_connection
1245
1246 @property
1247 @util.deprecated(
1248 "2.0",
1249 "The _ConnectionFairy.connection attribute is deprecated; "
1250 "please use 'driver_connection'",
1251 )
1252 def connection(self) -> DBAPIConnection:
1253 return self.dbapi_connection
1254
1255 @classmethod
1256 def _checkout(
1257 cls,
1258 pool: Pool,
1259 threadconns: Optional[threading.local] = None,
1260 fairy: Optional[_ConnectionFairy] = None,
1261 ) -> _ConnectionFairy:
1262 if not fairy:
1263 fairy = _ConnectionRecord.checkout(pool)
1264
1265 if threadconns is not None:
1266 threadconns.current = weakref.ref(fairy)
1267
1268 assert (
1269 fairy._connection_record is not None
1270 ), "can't 'checkout' a detached connection fairy"
1271 assert (
1272 fairy.dbapi_connection is not None
1273 ), "can't 'checkout' an invalidated connection fairy"
1274
1275 fairy._counter += 1
1276 if (
1277 not pool.dispatch.checkout and not pool._pre_ping
1278 ) or fairy._counter != 1:
1279 return fairy
1280
1281 # Pool listeners can trigger a reconnection on checkout, as well
1282 # as the pre-pinger.
1283 # there are three attempts made here, but note that if the database
1284 # is not accessible from a connection standpoint, those won't proceed
1285 # here.
1286
1287 attempts = 2
1288
1289 while attempts > 0:
1290 connection_is_fresh = fairy._connection_record.fresh
1291 fairy._connection_record.fresh = False
1292 try:
1293 if pool._pre_ping:
1294 if not connection_is_fresh:
1295 if fairy._echo:
1296 pool.logger.debug(
1297 "Pool pre-ping on connection %s",
1298 fairy.dbapi_connection,
1299 )
1300 result = pool._dialect._do_ping_w_event(
1301 fairy.dbapi_connection
1302 )
1303 if not result:
1304 if fairy._echo:
1305 pool.logger.debug(
1306 "Pool pre-ping on connection %s failed, "
1307 "will invalidate pool",
1308 fairy.dbapi_connection,
1309 )
1310 raise exc.InvalidatePoolError()
1311 elif fairy._echo:
1312 pool.logger.debug(
1313 "Connection %s is fresh, skipping pre-ping",
1314 fairy.dbapi_connection,
1315 )
1316
1317 pool.dispatch.checkout(
1318 fairy.dbapi_connection, fairy._connection_record, fairy
1319 )
1320 return fairy
1321 except exc.DisconnectionError as e:
1322 if e.invalidate_pool:
1323 pool.logger.info(
1324 "Disconnection detected on checkout, "
1325 "invalidating all pooled connections prior to "
1326 "current timestamp (reason: %r)",
1327 e,
1328 )
1329 fairy._connection_record.invalidate(e)
1330 pool._invalidate(fairy, e, _checkin=False)
1331 else:
1332 pool.logger.info(
1333 "Disconnection detected on checkout, "
1334 "invalidating individual connection %s (reason: %r)",
1335 fairy.dbapi_connection,
1336 e,
1337 )
1338 fairy._connection_record.invalidate(e)
1339 try:
1340 fairy.dbapi_connection = (
1341 fairy._connection_record.get_connection()
1342 )
1343 except BaseException as err:
1344 with util.safe_reraise():
1345 fairy._connection_record._checkin_failed(
1346 err,
1347 _fairy_was_created=True,
1348 )
1349
1350 # prevent _ConnectionFairy from being carried
1351 # in the stack trace. Do this after the
1352 # connection record has been checked in, so that
1353 # if the del triggers a finalize fairy, it won't
1354 # try to checkin a second time.
1355 del fairy
1356
1357 # never called, this is for code linters
1358 raise
1359
1360 attempts -= 1
1361 except BaseException as be_outer:
1362 with util.safe_reraise():
1363 rec = fairy._connection_record
1364 if rec is not None:
1365 rec._checkin_failed(
1366 be_outer,
1367 _fairy_was_created=True,
1368 )
1369
1370 # prevent _ConnectionFairy from being carried
1371 # in the stack trace, see above
1372 del fairy
1373
1374 # never called, this is for code linters
1375 raise
1376
1377 pool.logger.info("Reconnection attempts exhausted on checkout")
1378 fairy.invalidate()
1379 raise exc.InvalidRequestError("This connection is closed")
1380
1381 def _checkout_existing(self) -> _ConnectionFairy:
1382 return _ConnectionFairy._checkout(self._pool, fairy=self)
1383
1384 def _checkin(self, transaction_was_reset: bool = False) -> None:
1385 _finalize_fairy(
1386 self.dbapi_connection,
1387 self._connection_record,
1388 self._pool,
1389 None,
1390 self._echo,
1391 transaction_was_reset=transaction_was_reset,
1392 fairy=self,
1393 )
1394
1395 def _close(self) -> None:
1396 self._checkin()
1397
1398 def _reset(
1399 self,
1400 pool: Pool,
1401 transaction_was_reset: bool,
1402 terminate_only: bool,
1403 asyncio_safe: bool,
1404 ) -> None:
1405 if pool.dispatch.reset:
1406 pool.dispatch.reset(
1407 self.dbapi_connection,
1408 self._connection_record,
1409 PoolResetState(
1410 transaction_was_reset=transaction_was_reset,
1411 terminate_only=terminate_only,
1412 asyncio_safe=asyncio_safe,
1413 ),
1414 )
1415
1416 if not asyncio_safe:
1417 return
1418
1419 if pool._reset_on_return is reset_rollback:
1420 if transaction_was_reset:
1421 if self._echo:
1422 pool.logger.debug(
1423 "Connection %s reset, transaction already reset",
1424 self.dbapi_connection,
1425 )
1426 else:
1427 if self._echo:
1428 pool.logger.debug(
1429 "Connection %s rollback-on-return",
1430 self.dbapi_connection,
1431 )
1432 pool._dialect.do_rollback(self)
1433 elif pool._reset_on_return is reset_commit:
1434 if self._echo:
1435 pool.logger.debug(
1436 "Connection %s commit-on-return",
1437 self.dbapi_connection,
1438 )
1439 pool._dialect.do_commit(self)
1440
1441 @property
1442 def _logger(self) -> log._IdentifiedLoggerType:
1443 return self._pool.logger
1444
1445 @property
1446 def is_valid(self) -> bool:
1447 return self.dbapi_connection is not None
1448
1449 @property
1450 def is_detached(self) -> bool:
1451 return self._connection_record is None
1452
1453 @util.ro_memoized_property
1454 def info(self) -> _InfoType:
1455 if self._connection_record is None:
1456 return {}
1457 else:
1458 return self._connection_record.info
1459
1460 @util.ro_non_memoized_property
1461 def record_info(self) -> Optional[_InfoType]:
1462 if self._connection_record is None:
1463 return None
1464 else:
1465 return self._connection_record.record_info
1466
1467 def invalidate(
1468 self, e: Optional[BaseException] = None, soft: bool = False
1469 ) -> None:
1470 if self.dbapi_connection is None:
1471 util.warn("Can't invalidate an already-closed connection.")
1472 return
1473 if self._connection_record:
1474 self._connection_record.invalidate(e=e, soft=soft)
1475 if not soft:
1476 # prevent any rollback / reset actions etc. on
1477 # the connection
1478 self.dbapi_connection = None # type: ignore
1479
1480 # finalize
1481 self._checkin()
1482
1483 def cursor(self, *args: Any, **kwargs: Any) -> DBAPICursor:
1484 assert self.dbapi_connection is not None
1485 return self.dbapi_connection.cursor(*args, **kwargs)
1486
1487 def __getattr__(self, key: str) -> Any:
1488 return getattr(self.dbapi_connection, key)
1489
1490 def detach(self) -> None:
1491 if self._connection_record is not None:
1492 rec = self._connection_record
1493 rec.fairy_ref = None
1494 rec.dbapi_connection = None
1495 # TODO: should this be _return_conn?
1496 self._pool._do_return_conn(self._connection_record)
1497
1498 # can't get the descriptor assignment to work here
1499 # in pylance. mypy is OK w/ it
1500 self.info = self.info.copy() # type: ignore
1501
1502 self._connection_record = None
1503
1504 if self._pool.dispatch.detach:
1505 self._pool.dispatch.detach(self.dbapi_connection, rec)
1506
1507 def close(self) -> None:
1508 self._counter -= 1
1509 if self._counter == 0:
1510 self._checkin()
1511
1512 def _close_special(self, transaction_reset: bool = False) -> None:
1513 self._counter -= 1
1514 if self._counter == 0:
1515 self._checkin(transaction_was_reset=transaction_reset)