1# sqlalchemy/pool.py
2# Copyright (C) 2005-2021 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: http://www.opensource.org/licenses/mit-license.php
7
8
9"""Base constructs for connection pools.
10
11"""
12
13from collections import deque
14import time
15import weakref
16
17from .. import event
18from .. import exc
19from .. import interfaces
20from .. import log
21from .. import util
22from ..util import threading
23
24
25reset_rollback = util.symbol("reset_rollback")
26reset_commit = util.symbol("reset_commit")
27reset_none = util.symbol("reset_none")
28
29
30class _ConnDialect(object):
31
32 """partial implementation of :class:`.Dialect`
33 which provides DBAPI connection methods.
34
35 When a :class:`_pool.Pool` is combined with an :class:`_engine.Engine`,
36 the :class:`_engine.Engine` replaces this with its own
37 :class:`.Dialect`.
38
39 """
40
41 def do_rollback(self, dbapi_connection):
42 dbapi_connection.rollback()
43
44 def do_commit(self, dbapi_connection):
45 dbapi_connection.commit()
46
47 def do_close(self, dbapi_connection):
48 dbapi_connection.close()
49
50 def do_ping(self, dbapi_connection):
51 raise NotImplementedError(
52 "The ping feature requires that a dialect is "
53 "passed to the connection pool."
54 )
55
56
57class Pool(log.Identified):
58
59 """Abstract base class for connection pools."""
60
61 _dialect = _ConnDialect()
62
63 @util.deprecated_params(
64 use_threadlocal=(
65 "1.3",
66 "The :paramref:`_pool.Pool.use_threadlocal` parameter is "
67 "deprecated and will be removed in a future release.",
68 ),
69 listeners=(
70 "0.7",
71 ":class:`.PoolListener` is deprecated in favor of the "
72 ":class:`_events.PoolEvents` listener interface. The "
73 ":paramref:`_pool.Pool.listeners` parameter will be removed in a "
74 "future release.",
75 ),
76 )
77 def __init__(
78 self,
79 creator,
80 recycle=-1,
81 echo=None,
82 use_threadlocal=False,
83 logging_name=None,
84 reset_on_return=True,
85 listeners=None,
86 events=None,
87 dialect=None,
88 pre_ping=False,
89 _dispatch=None,
90 ):
91 """
92 Construct a Pool.
93
94 :param creator: a callable function that returns a DB-API
95 connection object. The function will be called with
96 parameters.
97
98 :param recycle: If set to a value other than -1, number of
99 seconds between connection recycling, which means upon
100 checkout, if this timeout is surpassed the connection will be
101 closed and replaced with a newly opened connection. Defaults to -1.
102
103 :param logging_name: String identifier which will be used within
104 the "name" field of logging records generated within the
105 "sqlalchemy.pool" logger. Defaults to a hexstring of the object's
106 id.
107
108 :param echo: if True, the connection pool will log
109 informational output such as when connections are invalidated
110 as well as when connections are recycled to the default log handler,
111 which defaults to ``sys.stdout`` for output.. If set to the string
112 ``"debug"``, the logging will include pool checkouts and checkins.
113
114 The :paramref:`_pool.Pool.echo` parameter can also be set from the
115 :func:`_sa.create_engine` call by using the
116 :paramref:`_sa.create_engine.echo_pool` parameter.
117
118 .. seealso::
119
120 :ref:`dbengine_logging` - further detail on how to configure
121 logging.
122
123 :param use_threadlocal: If set to True, repeated calls to
124 :meth:`connect` within the same application thread will be
125 guaranteed to return the same connection object that is already
126 checked out. This is a legacy use case and the flag has no
127 effect when using the pool with a :class:`_engine.Engine` object.
128
129 :param reset_on_return: Determine steps to take on
130 connections as they are returned to the pool.
131 reset_on_return can have any of these values:
132
133 * ``"rollback"`` - call rollback() on the connection,
134 to release locks and transaction resources.
135 This is the default value. The vast majority
136 of use cases should leave this value set.
137 * ``True`` - same as 'rollback', this is here for
138 backwards compatibility.
139 * ``"commit"`` - call commit() on the connection,
140 to release locks and transaction resources.
141 A commit here may be desirable for databases that
142 cache query plans if a commit is emitted,
143 such as Microsoft SQL Server. However, this
144 value is more dangerous than 'rollback' because
145 any data changes present on the transaction
146 are committed unconditionally.
147 * ``None`` - don't do anything on the connection.
148 This setting should generally only be made on a database
149 that has no transaction support at all,
150 namely MySQL MyISAM; when used on this backend, performance
151 can be improved as the "rollback" call is still expensive on
152 MySQL. It is **strongly recommended** that this setting not be
153 used for transaction-supporting databases in conjunction with
154 a persistent pool such as :class:`.QueuePool`, as it opens
155 the possibility for connections still in a transaction to be
156 idle in the pool. The setting may be appropriate in the
157 case of :class:`.NullPool` or special circumstances where
158 the connection pool in use is not being used to maintain connection
159 lifecycle.
160
161 * ``False`` - same as None, this is here for
162 backwards compatibility.
163
164 :param events: a list of 2-tuples, each of the form
165 ``(callable, target)`` which will be passed to :func:`.event.listen`
166 upon construction. Provided here so that event listeners
167 can be assigned via :func:`_sa.create_engine` before dialect-level
168 listeners are applied.
169
170 :param listeners: A list of :class:`.PoolListener`-like objects or
171 dictionaries of callables that receive events when DB-API
172 connections are created, checked out and checked in to the
173 pool.
174
175 :param dialect: a :class:`.Dialect` that will handle the job
176 of calling rollback(), close(), or commit() on DBAPI connections.
177 If omitted, a built-in "stub" dialect is used. Applications that
178 make use of :func:`_sa.create_engine` should not use this parameter
179 as it is handled by the engine creation strategy.
180
181 .. versionadded:: 1.1 - ``dialect`` is now a public parameter
182 to the :class:`_pool.Pool`.
183
184 :param pre_ping: if True, the pool will emit a "ping" (typically
185 "SELECT 1", but is dialect-specific) on the connection
186 upon checkout, to test if the connection is alive or not. If not,
187 the connection is transparently re-connected and upon success, all
188 other pooled connections established prior to that timestamp are
189 invalidated. Requires that a dialect is passed as well to
190 interpret the disconnection error.
191
192 .. versionadded:: 1.2
193
194 """
195 if logging_name:
196 self.logging_name = self._orig_logging_name = logging_name
197 else:
198 self._orig_logging_name = None
199
200 log.instance_logger(self, echoflag=echo)
201 self._threadconns = threading.local()
202 self._creator = creator
203 self._recycle = recycle
204 self._invalidate_time = 0
205 self._use_threadlocal = use_threadlocal
206 self._pre_ping = pre_ping
207 self._reset_on_return = util.symbol.parse_user_argument(
208 reset_on_return,
209 {
210 reset_rollback: ["rollback", True],
211 reset_none: ["none", None, False],
212 reset_commit: ["commit"],
213 },
214 "reset_on_return",
215 resolve_symbol_names=False,
216 )
217
218 self.echo = echo
219
220 if _dispatch:
221 self.dispatch._update(_dispatch, only_propagate=False)
222 if dialect:
223 self._dialect = dialect
224 if events:
225 for fn, target in events:
226 event.listen(self, target, fn)
227 if listeners:
228 for l in listeners:
229 self.add_listener(l)
230
231 @property
232 def _creator(self):
233 return self.__dict__["_creator"]
234
235 @_creator.setter
236 def _creator(self, creator):
237 self.__dict__["_creator"] = creator
238 self._invoke_creator = self._should_wrap_creator(creator)
239
240 def _should_wrap_creator(self, creator):
241 """Detect if creator accepts a single argument, or is sent
242 as a legacy style no-arg function.
243
244 """
245
246 try:
247 argspec = util.get_callable_argspec(self._creator, no_self=True)
248 except TypeError:
249 return lambda crec: creator()
250
251 defaulted = argspec[3] is not None and len(argspec[3]) or 0
252 positionals = len(argspec[0]) - defaulted
253
254 # look for the exact arg signature that DefaultStrategy
255 # sends us
256 if (argspec[0], argspec[3]) == (["connection_record"], (None,)):
257 return creator
258 # or just a single positional
259 elif positionals == 1:
260 return creator
261 # all other cases, just wrap and assume legacy "creator" callable
262 # thing
263 else:
264 return lambda crec: creator()
265
266 def _close_connection(self, connection):
267 self.logger.debug("Closing connection %r", connection)
268
269 try:
270 self._dialect.do_close(connection)
271 except Exception:
272 self.logger.error(
273 "Exception closing connection %r", connection, exc_info=True
274 )
275
276 @util.deprecated(
277 "0.7",
278 "The :meth:`_pool.Pool.add_listener` method is deprecated and "
279 "will be removed in a future release. Please use the "
280 ":class:`_events.PoolEvents` listener interface.",
281 )
282 def add_listener(self, listener):
283 """Add a :class:`.PoolListener`-like object to this pool.
284
285 ``listener`` may be an object that implements some or all of
286 PoolListener, or a dictionary of callables containing implementations
287 of some or all of the named methods in PoolListener.
288
289 """
290 interfaces.PoolListener._adapt_listener(self, listener)
291
292 def unique_connection(self):
293 """Produce a DBAPI connection that is not referenced by any
294 thread-local context.
295
296 This method is equivalent to :meth:`_pool.Pool.connect` when the
297 :paramref:`_pool.Pool.use_threadlocal` flag is not set to True.
298 When :paramref:`_pool.Pool.use_threadlocal` is True, the
299 :meth:`_pool.Pool.unique_connection`
300 method provides a means of bypassing
301 the threadlocal context.
302
303 """
304 return _ConnectionFairy._checkout(self)
305
306 def _create_connection(self):
307 """Called by subclasses to create a new ConnectionRecord."""
308
309 return _ConnectionRecord(self)
310
311 def _invalidate(self, connection, exception=None, _checkin=True):
312 """Mark all connections established within the generation
313 of the given connection as invalidated.
314
315 If this pool's last invalidate time is before when the given
316 connection was created, update the timestamp til now. Otherwise,
317 no action is performed.
318
319 Connections with a start time prior to this pool's invalidation
320 time will be recycled upon next checkout.
321 """
322 rec = getattr(connection, "_connection_record", None)
323 if not rec or self._invalidate_time < rec.starttime:
324 self._invalidate_time = time.time()
325 if _checkin and getattr(connection, "is_valid", False):
326 connection.invalidate(exception)
327
328 def recreate(self):
329 """Return a new :class:`_pool.Pool`, of the same class as this one
330 and configured with identical creation arguments.
331
332 This method is used in conjunction with :meth:`dispose`
333 to close out an entire :class:`_pool.Pool` and create a new one in
334 its place.
335
336 """
337
338 raise NotImplementedError()
339
340 def dispose(self):
341 """Dispose of this pool.
342
343 This method leaves the possibility of checked-out connections
344 remaining open, as it only affects connections that are
345 idle in the pool.
346
347 .. seealso::
348
349 :meth:`Pool.recreate`
350
351 """
352
353 raise NotImplementedError()
354
355 def connect(self):
356 """Return a DBAPI connection from the pool.
357
358 The connection is instrumented such that when its
359 ``close()`` method is called, the connection will be returned to
360 the pool.
361
362 """
363 if not self._use_threadlocal:
364 return _ConnectionFairy._checkout(self)
365
366 try:
367 rec = self._threadconns.current()
368 except AttributeError:
369 pass
370 else:
371 if rec is not None:
372 return rec._checkout_existing()
373
374 return _ConnectionFairy._checkout(self, self._threadconns)
375
376 def _return_conn(self, record):
377 """Given a _ConnectionRecord, return it to the :class:`_pool.Pool`.
378
379 This method is called when an instrumented DBAPI connection
380 has its ``close()`` method called.
381
382 """
383 if self._use_threadlocal:
384 try:
385 del self._threadconns.current
386 except AttributeError:
387 pass
388 self._do_return_conn(record)
389
390 def _do_get(self):
391 """Implementation for :meth:`get`, supplied by subclasses."""
392
393 raise NotImplementedError()
394
395 def _do_return_conn(self, conn):
396 """Implementation for :meth:`return_conn`, supplied by subclasses."""
397
398 raise NotImplementedError()
399
400 def status(self):
401 raise NotImplementedError()
402
403
404class _ConnectionRecord(object):
405
406 """Internal object which maintains an individual DBAPI connection
407 referenced by a :class:`_pool.Pool`.
408
409 The :class:`._ConnectionRecord` object always exists for any particular
410 DBAPI connection whether or not that DBAPI connection has been
411 "checked out". This is in contrast to the :class:`._ConnectionFairy`
412 which is only a public facade to the DBAPI connection while it is checked
413 out.
414
415 A :class:`._ConnectionRecord` may exist for a span longer than that
416 of a single DBAPI connection. For example, if the
417 :meth:`._ConnectionRecord.invalidate`
418 method is called, the DBAPI connection associated with this
419 :class:`._ConnectionRecord`
420 will be discarded, but the :class:`._ConnectionRecord` may be used again,
421 in which case a new DBAPI connection is produced when the
422 :class:`_pool.Pool`
423 next uses this record.
424
425 The :class:`._ConnectionRecord` is delivered along with connection
426 pool events, including :meth:`_events.PoolEvents.connect` and
427 :meth:`_events.PoolEvents.checkout`, however :class:`._ConnectionRecord`
428 still
429 remains an internal object whose API and internals may change.
430
431 .. seealso::
432
433 :class:`._ConnectionFairy`
434
435 """
436
437 def __init__(self, pool, connect=True):
438 self.__pool = pool
439 if connect:
440 self.__connect(first_connect_check=True)
441 self.finalize_callback = deque()
442
443 fairy_ref = None
444
445 starttime = None
446
447 connection = None
448 """A reference to the actual DBAPI connection being tracked.
449
450 May be ``None`` if this :class:`._ConnectionRecord` has been marked
451 as invalidated; a new DBAPI connection may replace it if the owning
452 pool calls upon this :class:`._ConnectionRecord` to reconnect.
453
454 """
455
456 _soft_invalidate_time = 0
457
458 @util.memoized_property
459 def info(self):
460 """The ``.info`` dictionary associated with the DBAPI connection.
461
462 This dictionary is shared among the :attr:`._ConnectionFairy.info`
463 and :attr:`_engine.Connection.info` accessors.
464
465 .. note::
466
467 The lifespan of this dictionary is linked to the
468 DBAPI connection itself, meaning that it is **discarded** each time
469 the DBAPI connection is closed and/or invalidated. The
470 :attr:`._ConnectionRecord.record_info` dictionary remains
471 persistent throughout the lifespan of the
472 :class:`._ConnectionRecord` container.
473
474 """
475 return {}
476
477 @util.memoized_property
478 def record_info(self):
479 """An "info' dictionary associated with the connection record
480 itself.
481
482 Unlike the :attr:`._ConnectionRecord.info` dictionary, which is linked
483 to the lifespan of the DBAPI connection, this dictionary is linked
484 to the lifespan of the :class:`._ConnectionRecord` container itself
485 and will remain persistent throughout the life of the
486 :class:`._ConnectionRecord`.
487
488 .. versionadded:: 1.1
489
490 """
491 return {}
492
493 @classmethod
494 def checkout(cls, pool):
495 rec = pool._do_get()
496 try:
497 dbapi_connection = rec.get_connection()
498 except Exception as err:
499 with util.safe_reraise():
500 rec._checkin_failed(err)
501 echo = pool._should_log_debug()
502 fairy = _ConnectionFairy(dbapi_connection, rec, echo)
503 rec.fairy_ref = weakref.ref(
504 fairy,
505 lambda ref: _finalize_fairy
506 and _finalize_fairy(None, rec, pool, ref, echo),
507 )
508 _refs.add(rec)
509 if echo:
510 pool.logger.debug(
511 "Connection %r checked out from pool", dbapi_connection
512 )
513 return fairy
514
515 def _checkin_failed(self, err):
516 self.invalidate(e=err)
517 self.checkin(_no_fairy_ref=True)
518
519 def checkin(self, _no_fairy_ref=False):
520 if self.fairy_ref is None and not _no_fairy_ref:
521 util.warn("Double checkin attempted on %s" % self)
522 return
523 self.fairy_ref = None
524 connection = self.connection
525 pool = self.__pool
526 while self.finalize_callback:
527 finalizer = self.finalize_callback.pop()
528 finalizer(connection)
529 if pool.dispatch.checkin:
530 pool.dispatch.checkin(connection, self)
531 pool._return_conn(self)
532
533 @property
534 def in_use(self):
535 return self.fairy_ref is not None
536
537 @property
538 def last_connect_time(self):
539 return self.starttime
540
541 def close(self):
542 if self.connection is not None:
543 self.__close()
544
545 def invalidate(self, e=None, soft=False):
546 """Invalidate the DBAPI connection held by this :class:`._ConnectionRecord`.
547
548 This method is called for all connection invalidations, including
549 when the :meth:`._ConnectionFairy.invalidate` or
550 :meth:`_engine.Connection.invalidate` methods are called,
551 as well as when any
552 so-called "automatic invalidation" condition occurs.
553
554 :param e: an exception object indicating a reason for the invalidation.
555
556 :param soft: if True, the connection isn't closed; instead, this
557 connection will be recycled on next checkout.
558
559 .. versionadded:: 1.0.3
560
561 .. seealso::
562
563 :ref:`pool_connection_invalidation`
564
565 """
566 # already invalidated
567 if self.connection is None:
568 return
569 if soft:
570 self.__pool.dispatch.soft_invalidate(self.connection, self, e)
571 else:
572 self.__pool.dispatch.invalidate(self.connection, self, e)
573 if e is not None:
574 self.__pool.logger.info(
575 "%sInvalidate connection %r (reason: %s:%s)",
576 "Soft " if soft else "",
577 self.connection,
578 e.__class__.__name__,
579 e,
580 )
581 else:
582 self.__pool.logger.info(
583 "%sInvalidate connection %r",
584 "Soft " if soft else "",
585 self.connection,
586 )
587 if soft:
588 self._soft_invalidate_time = time.time()
589 else:
590 self.__close()
591 self.connection = None
592
593 def get_connection(self):
594 recycle = False
595
596 # NOTE: the various comparisons here are assuming that measurable time
597 # passes between these state changes. however, time.time() is not
598 # guaranteed to have sub-second precision. comparisons of
599 # "invalidation time" to "starttime" should perhaps use >= so that the
600 # state change can take place assuming no measurable time has passed,
601 # however this does not guarantee correct behavior here as if time
602 # continues to not pass, it will try to reconnect repeatedly until
603 # these timestamps diverge, so in that sense using > is safer. Per
604 # https://stackoverflow.com/a/1938096/34549, Windows time.time() may be
605 # within 16 milliseconds accuracy, so unit tests for connection
606 # invalidation need a sleep of at least this long between initial start
607 # time and invalidation for the logic below to work reliably.
608 if self.connection is None:
609 self.info.clear()
610 self.__connect()
611 elif (
612 self.__pool._recycle > -1
613 and time.time() - self.starttime > self.__pool._recycle
614 ):
615 self.__pool.logger.info(
616 "Connection %r exceeded timeout; recycling", self.connection
617 )
618 recycle = True
619 elif self.__pool._invalidate_time > self.starttime:
620 self.__pool.logger.info(
621 "Connection %r invalidated due to pool invalidation; "
622 + "recycling",
623 self.connection,
624 )
625 recycle = True
626 elif self._soft_invalidate_time > self.starttime:
627 self.__pool.logger.info(
628 "Connection %r invalidated due to local soft invalidation; "
629 + "recycling",
630 self.connection,
631 )
632 recycle = True
633
634 if recycle:
635 self.__close()
636 self.info.clear()
637
638 self.__connect()
639 return self.connection
640
641 def __close(self):
642 self.finalize_callback.clear()
643 if self.__pool.dispatch.close:
644 self.__pool.dispatch.close(self.connection, self)
645 self.__pool._close_connection(self.connection)
646 self.connection = None
647
648 def __connect(self, first_connect_check=False):
649 pool = self.__pool
650
651 # ensure any existing connection is removed, so that if
652 # creator fails, this attribute stays None
653 self.connection = None
654 try:
655 self.starttime = time.time()
656 connection = pool._invoke_creator(self)
657 pool.logger.debug("Created new connection %r", connection)
658 self.connection = connection
659 except Exception as e:
660 with util.safe_reraise():
661 pool.logger.debug("Error on connect(): %s", e)
662 else:
663 if first_connect_check:
664 pool.dispatch.first_connect.for_modify(
665 pool.dispatch
666 ).exec_once_unless_exception(self.connection, self)
667 if pool.dispatch.connect:
668 pool.dispatch.connect(self.connection, self)
669
670
671def _finalize_fairy(
672 connection, connection_record, pool, ref, echo, fairy=None
673):
674 """Cleanup for a :class:`._ConnectionFairy` whether or not it's already
675 been garbage collected.
676
677 """
678 _refs.discard(connection_record)
679
680 if ref is not None:
681 if connection_record.fairy_ref is not ref:
682 return
683 assert connection is None
684 connection = connection_record.connection
685
686 if connection is not None:
687 if connection_record and echo:
688 pool.logger.debug(
689 "Connection %r being returned to pool", connection
690 )
691
692 try:
693 fairy = fairy or _ConnectionFairy(
694 connection, connection_record, echo
695 )
696 assert fairy.connection is connection
697 fairy._reset(pool)
698
699 # Immediately close detached instances
700 if not connection_record:
701 if pool.dispatch.close_detached:
702 pool.dispatch.close_detached(connection)
703 pool._close_connection(connection)
704 except BaseException as e:
705 pool.logger.error(
706 "Exception during reset or similar", exc_info=True
707 )
708 if connection_record:
709 connection_record.invalidate(e=e)
710 if not isinstance(e, Exception):
711 raise
712
713 if connection_record and connection_record.fairy_ref is not None:
714 connection_record.checkin()
715
716
717_refs = set()
718
719
720class _ConnectionFairy(object):
721
722 """Proxies a DBAPI connection and provides return-on-dereference
723 support.
724
725 This is an internal object used by the :class:`_pool.Pool` implementation
726 to provide context management to a DBAPI connection delivered by
727 that :class:`_pool.Pool`.
728
729 The name "fairy" is inspired by the fact that the
730 :class:`._ConnectionFairy` object's lifespan is transitory, as it lasts
731 only for the length of a specific DBAPI connection being checked out from
732 the pool, and additionally that as a transparent proxy, it is mostly
733 invisible.
734
735 .. seealso::
736
737 :class:`._ConnectionRecord`
738
739 """
740
741 def __init__(self, dbapi_connection, connection_record, echo):
742 self.connection = dbapi_connection
743 self._connection_record = connection_record
744 self._echo = echo
745
746 connection = None
747 """A reference to the actual DBAPI connection being tracked."""
748
749 _connection_record = None
750 """A reference to the :class:`._ConnectionRecord` object associated
751 with the DBAPI connection.
752
753 This is currently an internal accessor which is subject to change.
754
755 """
756
757 _reset_agent = None
758 """Refer to an object with a ``.commit()`` and ``.rollback()`` method;
759 if non-None, the "reset-on-return" feature will call upon this object
760 rather than directly against the dialect-level do_rollback() and
761 do_commit() methods.
762
763 In practice, a :class:`_engine.Connection` assigns a :class:`.Transaction`
764 object
765 to this variable when one is in scope so that the :class:`.Transaction`
766 takes the job of committing or rolling back on return if
767 :meth:`_engine.Connection.close` is called while the :class:`.Transaction`
768 still exists.
769
770 This is essentially an "event handler" of sorts but is simplified as an
771 instance variable both for performance/simplicity as well as that there
772 can only be one "reset agent" at a time.
773 """
774
775 @classmethod
776 def _checkout(cls, pool, threadconns=None, fairy=None):
777 if not fairy:
778 fairy = _ConnectionRecord.checkout(pool)
779
780 fairy._pool = pool
781 fairy._counter = 0
782
783 if threadconns is not None:
784 threadconns.current = weakref.ref(fairy)
785
786 if fairy.connection is None:
787 raise exc.InvalidRequestError("This connection is closed")
788 fairy._counter += 1
789
790 if (
791 not pool.dispatch.checkout and not pool._pre_ping
792 ) or fairy._counter != 1:
793 return fairy
794
795 # Pool listeners can trigger a reconnection on checkout, as well
796 # as the pre-pinger.
797 # there are three attempts made here, but note that if the database
798 # is not accessible from a connection standpoint, those won't proceed
799 # here.
800 attempts = 2
801 while attempts > 0:
802 try:
803 if pool._pre_ping:
804 if fairy._echo:
805 pool.logger.debug(
806 "Pool pre-ping on connection %s", fairy.connection
807 )
808
809 result = pool._dialect.do_ping(fairy.connection)
810 if not result:
811 if fairy._echo:
812 pool.logger.debug(
813 "Pool pre-ping on connection %s failed, "
814 "will invalidate pool",
815 fairy.connection,
816 )
817 raise exc.InvalidatePoolError()
818
819 pool.dispatch.checkout(
820 fairy.connection, fairy._connection_record, fairy
821 )
822 return fairy
823 except exc.DisconnectionError as e:
824 if e.invalidate_pool:
825 pool.logger.info(
826 "Disconnection detected on checkout, "
827 "invalidating all pooled connections prior to "
828 "current timestamp (reason: %r)",
829 e,
830 )
831 fairy._connection_record.invalidate(e)
832 pool._invalidate(fairy, e, _checkin=False)
833 else:
834 pool.logger.info(
835 "Disconnection detected on checkout, "
836 "invalidating individual connection %s (reason: %r)",
837 fairy.connection,
838 e,
839 )
840 fairy._connection_record.invalidate(e)
841 try:
842 fairy.connection = (
843 fairy._connection_record.get_connection()
844 )
845 except Exception as err:
846 with util.safe_reraise():
847 fairy._connection_record._checkin_failed(err)
848
849 attempts -= 1
850
851 pool.logger.info("Reconnection attempts exhausted on checkout")
852 fairy.invalidate()
853 raise exc.InvalidRequestError("This connection is closed")
854
855 def _checkout_existing(self):
856 return _ConnectionFairy._checkout(self._pool, fairy=self)
857
858 def _checkin(self):
859 _finalize_fairy(
860 self.connection,
861 self._connection_record,
862 self._pool,
863 None,
864 self._echo,
865 fairy=self,
866 )
867 self.connection = None
868 self._connection_record = None
869
870 _close = _checkin
871
872 def _reset(self, pool):
873 if pool.dispatch.reset:
874 pool.dispatch.reset(self, self._connection_record)
875 if pool._reset_on_return is reset_rollback:
876 if self._echo:
877 pool.logger.debug(
878 "Connection %s rollback-on-return%s",
879 self.connection,
880 ", via agent" if self._reset_agent else "",
881 )
882 if self._reset_agent:
883 if not self._reset_agent.is_active:
884 util.warn(
885 "Reset agent is not active. "
886 "This should not occur unless there was already "
887 "a connectivity error in progress."
888 )
889 pool._dialect.do_rollback(self)
890 else:
891 self._reset_agent.rollback()
892 else:
893 pool._dialect.do_rollback(self)
894 elif pool._reset_on_return is reset_commit:
895 if self._echo:
896 pool.logger.debug(
897 "Connection %s commit-on-return%s",
898 self.connection,
899 ", via agent" if self._reset_agent else "",
900 )
901 if self._reset_agent:
902 if not self._reset_agent.is_active:
903 util.warn(
904 "Reset agent is not active. "
905 "This should not occur unless there was already "
906 "a connectivity error in progress."
907 )
908 pool._dialect.do_commit(self)
909 else:
910 self._reset_agent.commit()
911 else:
912 pool._dialect.do_commit(self)
913
914 @property
915 def _logger(self):
916 return self._pool.logger
917
918 @property
919 def is_valid(self):
920 """Return True if this :class:`._ConnectionFairy` still refers
921 to an active DBAPI connection."""
922
923 return self.connection is not None
924
925 @util.memoized_property
926 def info(self):
927 """Info dictionary associated with the underlying DBAPI connection
928 referred to by this :class:`.ConnectionFairy`, allowing user-defined
929 data to be associated with the connection.
930
931 The data here will follow along with the DBAPI connection including
932 after it is returned to the connection pool and used again
933 in subsequent instances of :class:`._ConnectionFairy`. It is shared
934 with the :attr:`._ConnectionRecord.info` and
935 :attr:`_engine.Connection.info`
936 accessors.
937
938 The dictionary associated with a particular DBAPI connection is
939 discarded when the connection itself is discarded.
940
941 """
942 return self._connection_record.info
943
944 @property
945 def record_info(self):
946 """Info dictionary associated with the :class:`._ConnectionRecord
947 container referred to by this :class:`.ConnectionFairy`.
948
949 Unlike the :attr:`._ConnectionFairy.info` dictionary, the lifespan
950 of this dictionary is persistent across connections that are
951 disconnected and/or invalidated within the lifespan of a
952 :class:`._ConnectionRecord`.
953
954 .. versionadded:: 1.1
955
956 """
957 if self._connection_record:
958 return self._connection_record.record_info
959 else:
960 return None
961
962 def invalidate(self, e=None, soft=False):
963 """Mark this connection as invalidated.
964
965 This method can be called directly, and is also called as a result
966 of the :meth:`_engine.Connection.invalidate` method. When invoked,
967 the DBAPI connection is immediately closed and discarded from
968 further use by the pool. The invalidation mechanism proceeds
969 via the :meth:`._ConnectionRecord.invalidate` internal method.
970
971 :param e: an exception object indicating a reason for the invalidation.
972
973 :param soft: if True, the connection isn't closed; instead, this
974 connection will be recycled on next checkout.
975
976 .. versionadded:: 1.0.3
977
978 .. seealso::
979
980 :ref:`pool_connection_invalidation`
981
982 """
983
984 if self.connection is None:
985 util.warn("Can't invalidate an already-closed connection.")
986 return
987 if self._connection_record:
988 self._connection_record.invalidate(e=e, soft=soft)
989 if not soft:
990 self.connection = None
991 self._checkin()
992
993 def cursor(self, *args, **kwargs):
994 """Return a new DBAPI cursor for the underlying connection.
995
996 This method is a proxy for the ``connection.cursor()`` DBAPI
997 method.
998
999 """
1000 return self.connection.cursor(*args, **kwargs)
1001
1002 def __getattr__(self, key):
1003 return getattr(self.connection, key)
1004
1005 def detach(self):
1006 """Separate this connection from its Pool.
1007
1008 This means that the connection will no longer be returned to the
1009 pool when closed, and will instead be literally closed. The
1010 containing ConnectionRecord is separated from the DB-API connection,
1011 and will create a new connection when next used.
1012
1013 Note that any overall connection limiting constraints imposed by a
1014 Pool implementation may be violated after a detach, as the detached
1015 connection is removed from the pool's knowledge and control.
1016 """
1017
1018 if self._connection_record is not None:
1019 rec = self._connection_record
1020 _refs.remove(rec)
1021 rec.fairy_ref = None
1022 rec.connection = None
1023 # TODO: should this be _return_conn?
1024 self._pool._do_return_conn(self._connection_record)
1025 self.info = self.info.copy()
1026 self._connection_record = None
1027
1028 if self._pool.dispatch.detach:
1029 self._pool.dispatch.detach(self.connection, rec)
1030
1031 def close(self):
1032 self._counter -= 1
1033 if self._counter == 0:
1034 self._checkin()