1# engine/base.py 
    2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 
    3# <see AUTHORS file> 
    4# 
    5# This module is part of SQLAlchemy and is released under 
    6# the MIT License: https://www.opensource.org/licenses/mit-license.php 
    7"""Defines :class:`_engine.Connection` and :class:`_engine.Engine`.""" 
    8from __future__ import annotations 
    9 
    10import contextlib 
    11import sys 
    12import typing 
    13from typing import Any 
    14from typing import Callable 
    15from typing import cast 
    16from typing import Iterable 
    17from typing import Iterator 
    18from typing import List 
    19from typing import Mapping 
    20from typing import NoReturn 
    21from typing import Optional 
    22from typing import overload 
    23from typing import Tuple 
    24from typing import Type 
    25from typing import TypeVar 
    26from typing import Union 
    27 
    28from .interfaces import BindTyping 
    29from .interfaces import ConnectionEventsTarget 
    30from .interfaces import DBAPICursor 
    31from .interfaces import ExceptionContext 
    32from .interfaces import ExecuteStyle 
    33from .interfaces import ExecutionContext 
    34from .interfaces import IsolationLevel 
    35from .util import _distill_params_20 
    36from .util import _distill_raw_params 
    37from .util import TransactionalContext 
    38from .. import exc 
    39from .. import inspection 
    40from .. import log 
    41from .. import util 
    42from ..sql import compiler 
    43from ..sql import util as sql_util 
    44from ..util.typing import TupleAny 
    45from ..util.typing import TypeVarTuple 
    46from ..util.typing import Unpack 
    47 
    48if typing.TYPE_CHECKING: 
    49    from . import CursorResult 
    50    from . import ScalarResult 
    51    from .interfaces import _AnyExecuteParams 
    52    from .interfaces import _AnyMultiExecuteParams 
    53    from .interfaces import _CoreAnyExecuteParams 
    54    from .interfaces import _CoreMultiExecuteParams 
    55    from .interfaces import _CoreSingleExecuteParams 
    56    from .interfaces import _DBAPIAnyExecuteParams 
    57    from .interfaces import _DBAPISingleExecuteParams 
    58    from .interfaces import _ExecuteOptions 
    59    from .interfaces import CompiledCacheType 
    60    from .interfaces import CoreExecuteOptionsParameter 
    61    from .interfaces import Dialect 
    62    from .interfaces import SchemaTranslateMapType 
    63    from .reflection import Inspector  # noqa 
    64    from .url import URL 
    65    from ..event import dispatcher 
    66    from ..log import _EchoFlagType 
    67    from ..pool import _ConnectionFairy 
    68    from ..pool import Pool 
    69    from ..pool import PoolProxiedConnection 
    70    from ..sql import Executable 
    71    from ..sql._typing import _InfoType 
    72    from ..sql.compiler import Compiled 
    73    from ..sql.ddl import ExecutableDDLElement 
    74    from ..sql.ddl import InvokeDDLBase 
    75    from ..sql.functions import FunctionElement 
    76    from ..sql.schema import DefaultGenerator 
    77    from ..sql.schema import HasSchemaAttr 
    78    from ..sql.schema import SchemaVisitable 
    79    from ..sql.selectable import TypedReturnsRows 
    80 
    81 
    82_T = TypeVar("_T", bound=Any) 
    83_Ts = TypeVarTuple("_Ts") 
    84_EMPTY_EXECUTION_OPTS: _ExecuteOptions = util.EMPTY_DICT 
    85NO_OPTIONS: Mapping[str, Any] = util.EMPTY_DICT 
    86 
    87 
    88class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]): 
    89    """Provides high-level functionality for a wrapped DB-API connection. 
    90 
    91    The :class:`_engine.Connection` object is procured by calling the 
    92    :meth:`_engine.Engine.connect` method of the :class:`_engine.Engine` 
    93    object, and provides services for execution of SQL statements as well 
    94    as transaction control. 
    95 
    96    The Connection object is **not** thread-safe. While a Connection can be 
    97    shared among threads using properly synchronized access, it is still 
    98    possible that the underlying DBAPI connection may not support shared 
    99    access between threads. Check the DBAPI documentation for details. 
    100 
    101    The Connection object represents a single DBAPI connection checked out 
    102    from the connection pool. In this state, the connection pool has no 
    103    affect upon the connection, including its expiration or timeout state. 
    104    For the connection pool to properly manage connections, connections 
    105    should be returned to the connection pool (i.e. ``connection.close()``) 
    106    whenever the connection is not in use. 
    107 
    108    .. index:: 
    109      single: thread safety; Connection 
    110 
    111    """ 
    112 
    113    dialect: Dialect 
    114    dispatch: dispatcher[ConnectionEventsTarget] 
    115 
    116    _sqla_logger_namespace = "sqlalchemy.engine.Connection" 
    117 
    118    # used by sqlalchemy.engine.util.TransactionalContext 
    119    _trans_context_manager: Optional[TransactionalContext] = None 
    120 
    121    # legacy as of 2.0, should be eventually deprecated and 
    122    # removed.  was used in the "pre_ping" recipe that's been in the docs 
    123    # a long time 
    124    should_close_with_result = False 
    125 
    126    _dbapi_connection: Optional[PoolProxiedConnection] 
    127 
    128    _execution_options: _ExecuteOptions 
    129 
    130    _transaction: Optional[RootTransaction] 
    131    _nested_transaction: Optional[NestedTransaction] 
    132 
    133    def __init__( 
    134        self, 
    135        engine: Engine, 
    136        connection: Optional[PoolProxiedConnection] = None, 
    137        _has_events: Optional[bool] = None, 
    138        _allow_revalidate: bool = True, 
    139        _allow_autobegin: bool = True, 
    140    ): 
    141        """Construct a new Connection.""" 
    142        self.engine = engine 
    143        self.dialect = dialect = engine.dialect 
    144 
    145        if connection is None: 
    146            try: 
    147                self._dbapi_connection = engine.raw_connection() 
    148            except dialect.loaded_dbapi.Error as err: 
    149                Connection._handle_dbapi_exception_noconnection( 
    150                    err, dialect, engine 
    151                ) 
    152                raise 
    153        else: 
    154            self._dbapi_connection = connection 
    155 
    156        self._transaction = self._nested_transaction = None 
    157        self.__savepoint_seq = 0 
    158        self.__in_begin = False 
    159 
    160        self.__can_reconnect = _allow_revalidate 
    161        self._allow_autobegin = _allow_autobegin 
    162        self._echo = self.engine._should_log_info() 
    163 
    164        if _has_events is None: 
    165            # if _has_events is sent explicitly as False, 
    166            # then don't join the dispatch of the engine; we don't 
    167            # want to handle any of the engine's events in that case. 
    168            self.dispatch = self.dispatch._join(engine.dispatch) 
    169        self._has_events = _has_events or ( 
    170            _has_events is None and engine._has_events 
    171        ) 
    172 
    173        self._execution_options = engine._execution_options 
    174 
    175        if self._has_events or self.engine._has_events: 
    176            self.dispatch.engine_connect(self) 
    177 
    178    # this can be assigned differently via 
    179    # characteristics.LoggingTokenCharacteristic 
    180    _message_formatter: Any = None 
    181 
    182    def _log_info(self, message: str, *arg: Any, **kw: Any) -> None: 
    183        fmt = self._message_formatter 
    184 
    185        if fmt: 
    186            message = fmt(message) 
    187 
    188        if log.STACKLEVEL: 
    189            kw["stacklevel"] = 1 + log.STACKLEVEL_OFFSET 
    190 
    191        self.engine.logger.info(message, *arg, **kw) 
    192 
    193    def _log_debug(self, message: str, *arg: Any, **kw: Any) -> None: 
    194        fmt = self._message_formatter 
    195 
    196        if fmt: 
    197            message = fmt(message) 
    198 
    199        if log.STACKLEVEL: 
    200            kw["stacklevel"] = 1 + log.STACKLEVEL_OFFSET 
    201 
    202        self.engine.logger.debug(message, *arg, **kw) 
    203 
    204    @property 
    205    def _schema_translate_map(self) -> Optional[SchemaTranslateMapType]: 
    206        schema_translate_map: Optional[SchemaTranslateMapType] = ( 
    207            self._execution_options.get("schema_translate_map", None) 
    208        ) 
    209 
    210        return schema_translate_map 
    211 
    212    def schema_for_object(self, obj: HasSchemaAttr) -> Optional[str]: 
    213        """Return the schema name for the given schema item taking into 
    214        account current schema translate map. 
    215 
    216        """ 
    217 
    218        name = obj.schema 
    219        schema_translate_map: Optional[SchemaTranslateMapType] = ( 
    220            self._execution_options.get("schema_translate_map", None) 
    221        ) 
    222 
    223        if ( 
    224            schema_translate_map 
    225            and name in schema_translate_map 
    226            and obj._use_schema_map 
    227        ): 
    228            return schema_translate_map[name] 
    229        else: 
    230            return name 
    231 
    232    def __enter__(self) -> Connection: 
    233        return self 
    234 
    235    def __exit__(self, type_: Any, value: Any, traceback: Any) -> None: 
    236        self.close() 
    237 
    238    @overload 
    239    def execution_options( 
    240        self, 
    241        *, 
    242        compiled_cache: Optional[CompiledCacheType] = ..., 
    243        logging_token: str = ..., 
    244        isolation_level: IsolationLevel = ..., 
    245        no_parameters: bool = False, 
    246        stream_results: bool = False, 
    247        max_row_buffer: int = ..., 
    248        yield_per: int = ..., 
    249        insertmanyvalues_page_size: int = ..., 
    250        schema_translate_map: Optional[SchemaTranslateMapType] = ..., 
    251        preserve_rowcount: bool = False, 
    252        driver_column_names: bool = False, 
    253        **opt: Any, 
    254    ) -> Connection: ... 
    255 
    256    @overload 
    257    def execution_options(self, **opt: Any) -> Connection: ... 
    258 
    259    def execution_options(self, **opt: Any) -> Connection: 
    260        r"""Set non-SQL options for the connection which take effect 
    261        during execution. 
    262 
    263        This method modifies this :class:`_engine.Connection` **in-place**; 
    264        the return value is the same :class:`_engine.Connection` object 
    265        upon which the method is called.   Note that this is in contrast 
    266        to the behavior of the ``execution_options`` methods on other 
    267        objects such as :meth:`_engine.Engine.execution_options` and 
    268        :meth:`_sql.Executable.execution_options`.  The rationale is that many 
    269        such execution options necessarily modify the state of the base 
    270        DBAPI connection in any case so there is no feasible means of 
    271        keeping the effect of such an option localized to a "sub" connection. 
    272 
    273        .. versionchanged:: 2.0  The :meth:`_engine.Connection.execution_options` 
    274           method, in contrast to other objects with this method, modifies 
    275           the connection in-place without creating copy of it. 
    276 
    277        As discussed elsewhere, the :meth:`_engine.Connection.execution_options` 
    278        method accepts any arbitrary parameters including user defined names. 
    279        All parameters given are consumable in a number of ways including 
    280        by using the :meth:`_engine.Connection.get_execution_options` method. 
    281        See the examples at :meth:`_sql.Executable.execution_options` 
    282        and :meth:`_engine.Engine.execution_options`. 
    283 
    284        The keywords that are currently recognized by SQLAlchemy itself 
    285        include all those listed under :meth:`.Executable.execution_options`, 
    286        as well as others that are specific to :class:`_engine.Connection`. 
    287 
    288        :param compiled_cache: Available on: :class:`_engine.Connection`, 
    289          :class:`_engine.Engine`. 
    290 
    291          A dictionary where :class:`.Compiled` objects 
    292          will be cached when the :class:`_engine.Connection` 
    293          compiles a clause 
    294          expression into a :class:`.Compiled` object.  This dictionary will 
    295          supersede the statement cache that may be configured on the 
    296          :class:`_engine.Engine` itself.   If set to None, caching 
    297          is disabled, even if the engine has a configured cache size. 
    298 
    299          Note that the ORM makes use of its own "compiled" caches for 
    300          some operations, including flush operations.  The caching 
    301          used by the ORM internally supersedes a cache dictionary 
    302          specified here. 
    303 
    304        :param logging_token: Available on: :class:`_engine.Connection`, 
    305          :class:`_engine.Engine`, :class:`_sql.Executable`. 
    306 
    307          Adds the specified string token surrounded by brackets in log 
    308          messages logged by the connection, i.e. the logging that's enabled 
    309          either via the :paramref:`_sa.create_engine.echo` flag or via the 
    310          ``logging.getLogger("sqlalchemy.engine")`` logger. This allows a 
    311          per-connection or per-sub-engine token to be available which is 
    312          useful for debugging concurrent connection scenarios. 
    313 
    314          .. versionadded:: 1.4.0b2 
    315 
    316          .. seealso:: 
    317 
    318            :ref:`dbengine_logging_tokens` - usage example 
    319 
    320            :paramref:`_sa.create_engine.logging_name` - adds a name to the 
    321            name used by the Python logger object itself. 
    322 
    323        :param isolation_level: Available on: :class:`_engine.Connection`, 
    324          :class:`_engine.Engine`. 
    325 
    326          Set the transaction isolation level for the lifespan of this 
    327          :class:`_engine.Connection` object. 
    328          Valid values include those string 
    329          values accepted by the :paramref:`_sa.create_engine.isolation_level` 
    330          parameter passed to :func:`_sa.create_engine`.  These levels are 
    331          semi-database specific; see individual dialect documentation for 
    332          valid levels. 
    333 
    334          The isolation level option applies the isolation level by emitting 
    335          statements on the DBAPI connection, and **necessarily affects the 
    336          original Connection object overall**. The isolation level will remain 
    337          at the given setting until explicitly changed, or when the DBAPI 
    338          connection itself is :term:`released` to the connection pool, i.e. the 
    339          :meth:`_engine.Connection.close` method is called, at which time an 
    340          event handler will emit additional statements on the DBAPI connection 
    341          in order to revert the isolation level change. 
    342 
    343          .. note:: The ``isolation_level`` execution option may only be 
    344             established before the :meth:`_engine.Connection.begin` method is 
    345             called, as well as before any SQL statements are emitted which 
    346             would otherwise trigger "autobegin", or directly after a call to 
    347             :meth:`_engine.Connection.commit` or 
    348             :meth:`_engine.Connection.rollback`. A database cannot change the 
    349             isolation level on a transaction in progress. 
    350 
    351          .. note:: The ``isolation_level`` execution option is implicitly 
    352             reset if the :class:`_engine.Connection` is invalidated, e.g. via 
    353             the :meth:`_engine.Connection.invalidate` method, or if a 
    354             disconnection error occurs. The new connection produced after the 
    355             invalidation will **not** have the selected isolation level 
    356             re-applied to it automatically. 
    357 
    358          .. seealso:: 
    359 
    360                :ref:`dbapi_autocommit` 
    361 
    362                :meth:`_engine.Connection.get_isolation_level` 
    363                - view current actual level 
    364 
    365        :param no_parameters: Available on: :class:`_engine.Connection`, 
    366          :class:`_sql.Executable`. 
    367 
    368          When ``True``, if the final parameter 
    369          list or dictionary is totally empty, will invoke the 
    370          statement on the cursor as ``cursor.execute(statement)``, 
    371          not passing the parameter collection at all. 
    372          Some DBAPIs such as psycopg2 and mysql-python consider 
    373          percent signs as significant only when parameters are 
    374          present; this option allows code to generate SQL 
    375          containing percent signs (and possibly other characters) 
    376          that is neutral regarding whether it's executed by the DBAPI 
    377          or piped into a script that's later invoked by 
    378          command line tools. 
    379 
    380        :param stream_results: Available on: :class:`_engine.Connection`, 
    381          :class:`_sql.Executable`. 
    382 
    383          Indicate to the dialect that results should be "streamed" and not 
    384          pre-buffered, if possible.  For backends such as PostgreSQL, MySQL 
    385          and MariaDB, this indicates the use of a "server side cursor" as 
    386          opposed to a client side cursor.  Other backends such as that of 
    387          Oracle Database may already use server side cursors by default. 
    388 
    389          The usage of 
    390          :paramref:`_engine.Connection.execution_options.stream_results` is 
    391          usually combined with setting a fixed number of rows to to be fetched 
    392          in batches, to allow for efficient iteration of database rows while 
    393          at the same time not loading all result rows into memory at once; 
    394          this can be configured on a :class:`_engine.Result` object using the 
    395          :meth:`_engine.Result.yield_per` method, after execution has 
    396          returned a new :class:`_engine.Result`.   If 
    397          :meth:`_engine.Result.yield_per` is not used, 
    398          the :paramref:`_engine.Connection.execution_options.stream_results` 
    399          mode of operation will instead use a dynamically sized buffer 
    400          which buffers sets of rows at a time, growing on each batch 
    401          based on a fixed growth size up until a limit which may 
    402          be configured using the 
    403          :paramref:`_engine.Connection.execution_options.max_row_buffer` 
    404          parameter. 
    405 
    406          When using the ORM to fetch ORM mapped objects from a result, 
    407          :meth:`_engine.Result.yield_per` should always be used with 
    408          :paramref:`_engine.Connection.execution_options.stream_results`, 
    409          so that the ORM does not fetch all rows into new ORM objects at once. 
    410 
    411          For typical use, the 
    412          :paramref:`_engine.Connection.execution_options.yield_per` execution 
    413          option should be preferred, which sets up both 
    414          :paramref:`_engine.Connection.execution_options.stream_results` and 
    415          :meth:`_engine.Result.yield_per` at once. This option is supported 
    416          both at a core level by :class:`_engine.Connection` as well as by the 
    417          ORM :class:`_engine.Session`; the latter is described at 
    418          :ref:`orm_queryguide_yield_per`. 
    419 
    420          .. seealso:: 
    421 
    422            :ref:`engine_stream_results` - background on 
    423            :paramref:`_engine.Connection.execution_options.stream_results` 
    424 
    425            :paramref:`_engine.Connection.execution_options.max_row_buffer` 
    426 
    427            :paramref:`_engine.Connection.execution_options.yield_per` 
    428 
    429            :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel` 
    430            describing the ORM version of ``yield_per`` 
    431 
    432        :param max_row_buffer: Available on: :class:`_engine.Connection`, 
    433          :class:`_sql.Executable`.  Sets a maximum 
    434          buffer size to use when the 
    435          :paramref:`_engine.Connection.execution_options.stream_results` 
    436          execution option is used on a backend that supports server side 
    437          cursors.  The default value if not specified is 1000. 
    438 
    439          .. seealso:: 
    440 
    441            :paramref:`_engine.Connection.execution_options.stream_results` 
    442 
    443            :ref:`engine_stream_results` 
    444 
    445 
    446        :param yield_per: Available on: :class:`_engine.Connection`, 
    447          :class:`_sql.Executable`.  Integer value applied which will 
    448          set the :paramref:`_engine.Connection.execution_options.stream_results` 
    449          execution option and invoke :meth:`_engine.Result.yield_per` 
    450          automatically at once.  Allows equivalent functionality as 
    451          is present when using this parameter with the ORM. 
    452 
    453          .. versionadded:: 1.4.40 
    454 
    455          .. seealso:: 
    456 
    457            :ref:`engine_stream_results` - background and examples 
    458            on using server side cursors with Core. 
    459 
    460            :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel` 
    461            describing the ORM version of ``yield_per`` 
    462 
    463        :param insertmanyvalues_page_size: Available on: :class:`_engine.Connection`, 
    464            :class:`_engine.Engine`. Number of rows to format into an 
    465            INSERT statement when the statement uses "insertmanyvalues" mode, 
    466            which is a paged form of bulk insert that is used for many backends 
    467            when using :term:`executemany` execution typically in conjunction 
    468            with RETURNING. Defaults to 1000. May also be modified on a 
    469            per-engine basis using the 
    470            :paramref:`_sa.create_engine.insertmanyvalues_page_size` parameter. 
    471 
    472            .. versionadded:: 2.0 
    473 
    474            .. seealso:: 
    475 
    476                :ref:`engine_insertmanyvalues` 
    477 
    478        :param schema_translate_map: Available on: :class:`_engine.Connection`, 
    479          :class:`_engine.Engine`, :class:`_sql.Executable`. 
    480 
    481          A dictionary mapping schema names to schema names, that will be 
    482          applied to the :paramref:`_schema.Table.schema` element of each 
    483          :class:`_schema.Table` 
    484          encountered when SQL or DDL expression elements 
    485          are compiled into strings; the resulting schema name will be 
    486          converted based on presence in the map of the original name. 
    487 
    488          .. seealso:: 
    489 
    490            :ref:`schema_translating` 
    491 
    492        :param preserve_rowcount: Boolean; when True, the ``cursor.rowcount`` 
    493          attribute will be unconditionally memoized within the result and 
    494          made available via the :attr:`.CursorResult.rowcount` attribute. 
    495          Normally, this attribute is only preserved for UPDATE and DELETE 
    496          statements.  Using this option, the DBAPIs rowcount value can 
    497          be accessed for other kinds of statements such as INSERT and SELECT, 
    498          to the degree that the DBAPI supports these statements.  See 
    499          :attr:`.CursorResult.rowcount` for notes regarding the behavior 
    500          of this attribute. 
    501 
    502          .. versionadded:: 2.0.28 
    503 
    504        .. seealso:: 
    505 
    506            :meth:`_engine.Engine.execution_options` 
    507 
    508            :meth:`.Executable.execution_options` 
    509 
    510            :meth:`_engine.Connection.get_execution_options` 
    511 
    512            :ref:`orm_queryguide_execution_options` - documentation on all 
    513            ORM-specific execution options 
    514 
    515        :param driver_column_names: When True, the returned 
    516         :class:`_engine.CursorResult` will use the column names as written in 
    517         ``cursor.description`` to set up the keys for the result set, 
    518         including the names of columns for the :class:`_engine.Row` object as 
    519         well as the dictionary keys when using :attr:`_engine.Row._mapping`. 
    520         On backends that use "name normalization" such as Oracle Database to 
    521         correct for lower case names being converted to all uppercase, this 
    522         behavior is turned off and the raw UPPERCASE names in 
    523         cursor.description will be present. 
    524 
    525         .. versionadded:: 2.1 
    526 
    527        """  # noqa 
    528        if self._has_events or self.engine._has_events: 
    529            self.dispatch.set_connection_execution_options(self, opt) 
    530        self._execution_options = self._execution_options.union(opt) 
    531        self.dialect.set_connection_execution_options(self, opt) 
    532        return self 
    533 
    534    def get_execution_options(self) -> _ExecuteOptions: 
    535        """Get the non-SQL options which will take effect during execution. 
    536 
    537        .. seealso:: 
    538 
    539            :meth:`_engine.Connection.execution_options` 
    540        """ 
    541        return self._execution_options 
    542 
    543    @property 
    544    def _still_open_and_dbapi_connection_is_valid(self) -> bool: 
    545        pool_proxied_connection = self._dbapi_connection 
    546        return ( 
    547            pool_proxied_connection is not None 
    548            and pool_proxied_connection.is_valid 
    549        ) 
    550 
    551    @property 
    552    def closed(self) -> bool: 
    553        """Return True if this connection is closed.""" 
    554 
    555        return self._dbapi_connection is None and not self.__can_reconnect 
    556 
    557    @property 
    558    def invalidated(self) -> bool: 
    559        """Return True if this connection was invalidated. 
    560 
    561        This does not indicate whether or not the connection was 
    562        invalidated at the pool level, however 
    563 
    564        """ 
    565 
    566        # prior to 1.4, "invalid" was stored as a state independent of 
    567        # "closed", meaning an invalidated connection could be "closed", 
    568        # the _dbapi_connection would be None and closed=True, yet the 
    569        # "invalid" flag would stay True.  This meant that there were 
    570        # three separate states (open/valid, closed/valid, closed/invalid) 
    571        # when there is really no reason for that; a connection that's 
    572        # "closed" does not need to be "invalid".  So the state is now 
    573        # represented by the two facts alone. 
    574 
    575        pool_proxied_connection = self._dbapi_connection 
    576        return pool_proxied_connection is None and self.__can_reconnect 
    577 
    578    @property 
    579    def connection(self) -> PoolProxiedConnection: 
    580        """The underlying DB-API connection managed by this Connection. 
    581 
    582        This is a SQLAlchemy connection-pool proxied connection 
    583        which then has the attribute 
    584        :attr:`_pool._ConnectionFairy.dbapi_connection` that refers to the 
    585        actual driver connection. 
    586 
    587        .. seealso:: 
    588 
    589 
    590            :ref:`dbapi_connections` 
    591 
    592        """ 
    593 
    594        if self._dbapi_connection is None: 
    595            try: 
    596                return self._revalidate_connection() 
    597            except (exc.PendingRollbackError, exc.ResourceClosedError): 
    598                raise 
    599            except BaseException as e: 
    600                self._handle_dbapi_exception(e, None, None, None, None) 
    601        else: 
    602            return self._dbapi_connection 
    603 
    604    def get_isolation_level(self) -> IsolationLevel: 
    605        """Return the current **actual** isolation level that's present on 
    606        the database within the scope of this connection. 
    607 
    608        This attribute will perform a live SQL operation against the database 
    609        in order to procure the current isolation level, so the value returned 
    610        is the actual level on the underlying DBAPI connection regardless of 
    611        how this state was set. This will be one of the four actual isolation 
    612        modes ``READ UNCOMMITTED``, ``READ COMMITTED``, ``REPEATABLE READ``, 
    613        ``SERIALIZABLE``. It will **not** include the ``AUTOCOMMIT`` isolation 
    614        level setting. Third party dialects may also feature additional 
    615        isolation level settings. 
    616 
    617        .. note::  This method **will not report** on the ``AUTOCOMMIT`` 
    618          isolation level, which is a separate :term:`dbapi` setting that's 
    619          independent of **actual** isolation level.  When ``AUTOCOMMIT`` is 
    620          in use, the database connection still has a "traditional" isolation 
    621          mode in effect, that is typically one of the four values 
    622          ``READ UNCOMMITTED``, ``READ COMMITTED``, ``REPEATABLE READ``, 
    623          ``SERIALIZABLE``. 
    624 
    625        Compare to the :attr:`_engine.Connection.default_isolation_level` 
    626        accessor which returns the isolation level that is present on the 
    627        database at initial connection time. 
    628 
    629        .. seealso:: 
    630 
    631            :attr:`_engine.Connection.default_isolation_level` 
    632            - view default level 
    633 
    634            :paramref:`_sa.create_engine.isolation_level` 
    635            - set per :class:`_engine.Engine` isolation level 
    636 
    637            :paramref:`.Connection.execution_options.isolation_level` 
    638            - set per :class:`_engine.Connection` isolation level 
    639 
    640        """ 
    641        dbapi_connection = self.connection.dbapi_connection 
    642        assert dbapi_connection is not None 
    643        try: 
    644            return self.dialect.get_isolation_level(dbapi_connection) 
    645        except BaseException as e: 
    646            self._handle_dbapi_exception(e, None, None, None, None) 
    647 
    648    @property 
    649    def default_isolation_level(self) -> Optional[IsolationLevel]: 
    650        """The initial-connection time isolation level associated with the 
    651        :class:`_engine.Dialect` in use. 
    652 
    653        This value is independent of the 
    654        :paramref:`.Connection.execution_options.isolation_level` and 
    655        :paramref:`.Engine.execution_options.isolation_level` execution 
    656        options, and is determined by the :class:`_engine.Dialect` when the 
    657        first connection is created, by performing a SQL query against the 
    658        database for the current isolation level before any additional commands 
    659        have been emitted. 
    660 
    661        Calling this accessor does not invoke any new SQL queries. 
    662 
    663        .. seealso:: 
    664 
    665            :meth:`_engine.Connection.get_isolation_level` 
    666            - view current actual isolation level 
    667 
    668            :paramref:`_sa.create_engine.isolation_level` 
    669            - set per :class:`_engine.Engine` isolation level 
    670 
    671            :paramref:`.Connection.execution_options.isolation_level` 
    672            - set per :class:`_engine.Connection` isolation level 
    673 
    674        """ 
    675        return self.dialect.default_isolation_level 
    676 
    677    def _invalid_transaction(self) -> NoReturn: 
    678        raise exc.PendingRollbackError( 
    679            "Can't reconnect until invalid %stransaction is rolled " 
    680            "back.  Please rollback() fully before proceeding" 
    681            % ("savepoint " if self._nested_transaction is not None else ""), 
    682            code="8s2b", 
    683        ) 
    684 
    685    def _revalidate_connection(self) -> PoolProxiedConnection: 
    686        if self.__can_reconnect and self.invalidated: 
    687            if self._transaction is not None: 
    688                self._invalid_transaction() 
    689            self._dbapi_connection = self.engine.raw_connection() 
    690            return self._dbapi_connection 
    691        raise exc.ResourceClosedError("This Connection is closed") 
    692 
    693    @property 
    694    def info(self) -> _InfoType: 
    695        """Info dictionary associated with the underlying DBAPI connection 
    696        referred to by this :class:`_engine.Connection`, allowing user-defined 
    697        data to be associated with the connection. 
    698 
    699        The data here will follow along with the DBAPI connection including 
    700        after it is returned to the connection pool and used again 
    701        in subsequent instances of :class:`_engine.Connection`. 
    702 
    703        """ 
    704 
    705        return self.connection.info 
    706 
    707    def invalidate(self, exception: Optional[BaseException] = None) -> None: 
    708        """Invalidate the underlying DBAPI connection associated with 
    709        this :class:`_engine.Connection`. 
    710 
    711        An attempt will be made to close the underlying DBAPI connection 
    712        immediately; however if this operation fails, the error is logged 
    713        but not raised.  The connection is then discarded whether or not 
    714        close() succeeded. 
    715 
    716        Upon the next use (where "use" typically means using the 
    717        :meth:`_engine.Connection.execute` method or similar), 
    718        this :class:`_engine.Connection` will attempt to 
    719        procure a new DBAPI connection using the services of the 
    720        :class:`_pool.Pool` as a source of connectivity (e.g. 
    721        a "reconnection"). 
    722 
    723        If a transaction was in progress (e.g. the 
    724        :meth:`_engine.Connection.begin` method has been called) when 
    725        :meth:`_engine.Connection.invalidate` method is called, at the DBAPI 
    726        level all state associated with this transaction is lost, as 
    727        the DBAPI connection is closed.  The :class:`_engine.Connection` 
    728        will not allow a reconnection to proceed until the 
    729        :class:`.Transaction` object is ended, by calling the 
    730        :meth:`.Transaction.rollback` method; until that point, any attempt at 
    731        continuing to use the :class:`_engine.Connection` will raise an 
    732        :class:`~sqlalchemy.exc.InvalidRequestError`. 
    733        This is to prevent applications from accidentally 
    734        continuing an ongoing transactional operations despite the 
    735        fact that the transaction has been lost due to an 
    736        invalidation. 
    737 
    738        The :meth:`_engine.Connection.invalidate` method, 
    739        just like auto-invalidation, 
    740        will at the connection pool level invoke the 
    741        :meth:`_events.PoolEvents.invalidate` event. 
    742 
    743        :param exception: an optional ``Exception`` instance that's the 
    744         reason for the invalidation.  is passed along to event handlers 
    745         and logging functions. 
    746 
    747        .. seealso:: 
    748 
    749            :ref:`pool_connection_invalidation` 
    750 
    751        """ 
    752 
    753        if self.invalidated: 
    754            return 
    755 
    756        if self.closed: 
    757            raise exc.ResourceClosedError("This Connection is closed") 
    758 
    759        if self._still_open_and_dbapi_connection_is_valid: 
    760            pool_proxied_connection = self._dbapi_connection 
    761            assert pool_proxied_connection is not None 
    762            pool_proxied_connection.invalidate(exception) 
    763 
    764        self._dbapi_connection = None 
    765 
    766    def detach(self) -> None: 
    767        """Detach the underlying DB-API connection from its connection pool. 
    768 
    769        E.g.:: 
    770 
    771            with engine.connect() as conn: 
    772                conn.detach() 
    773                conn.execute(text("SET search_path TO schema1, schema2")) 
    774 
    775                # work with connection 
    776 
    777            # connection is fully closed (since we used "with:", can 
    778            # also call .close()) 
    779 
    780        This :class:`_engine.Connection` instance will remain usable. 
    781        When closed 
    782        (or exited from a context manager context as above), 
    783        the DB-API connection will be literally closed and not 
    784        returned to its originating pool. 
    785 
    786        This method can be used to insulate the rest of an application 
    787        from a modified state on a connection (such as a transaction 
    788        isolation level or similar). 
    789 
    790        """ 
    791 
    792        if self.closed: 
    793            raise exc.ResourceClosedError("This Connection is closed") 
    794 
    795        pool_proxied_connection = self._dbapi_connection 
    796        if pool_proxied_connection is None: 
    797            raise exc.InvalidRequestError( 
    798                "Can't detach an invalidated Connection" 
    799            ) 
    800        pool_proxied_connection.detach() 
    801 
    802    def _autobegin(self) -> None: 
    803        if self._allow_autobegin and not self.__in_begin: 
    804            self.begin() 
    805 
    806    def begin(self) -> RootTransaction: 
    807        """Begin a transaction prior to autobegin occurring. 
    808 
    809        E.g.:: 
    810 
    811            with engine.connect() as conn: 
    812                with conn.begin() as trans: 
    813                    conn.execute(table.insert(), {"username": "sandy"}) 
    814 
    815        The returned object is an instance of :class:`_engine.RootTransaction`. 
    816        This object represents the "scope" of the transaction, 
    817        which completes when either the :meth:`_engine.Transaction.rollback` 
    818        or :meth:`_engine.Transaction.commit` method is called; the object 
    819        also works as a context manager as illustrated above. 
    820 
    821        The :meth:`_engine.Connection.begin` method begins a 
    822        transaction that normally will be begun in any case when the connection 
    823        is first used to execute a statement.  The reason this method might be 
    824        used would be to invoke the :meth:`_events.ConnectionEvents.begin` 
    825        event at a specific time, or to organize code within the scope of a 
    826        connection checkout in terms of context managed blocks, such as:: 
    827 
    828            with engine.connect() as conn: 
    829                with conn.begin(): 
    830                    conn.execute(...) 
    831                    conn.execute(...) 
    832 
    833                with conn.begin(): 
    834                    conn.execute(...) 
    835                    conn.execute(...) 
    836 
    837        The above code is not  fundamentally any different in its behavior than 
    838        the following code  which does not use 
    839        :meth:`_engine.Connection.begin`; the below style is known 
    840        as "commit as you go" style:: 
    841 
    842            with engine.connect() as conn: 
    843                conn.execute(...) 
    844                conn.execute(...) 
    845                conn.commit() 
    846 
    847                conn.execute(...) 
    848                conn.execute(...) 
    849                conn.commit() 
    850 
    851        From a database point of view, the :meth:`_engine.Connection.begin` 
    852        method does not emit any SQL or change the state of the underlying 
    853        DBAPI connection in any way; the Python DBAPI does not have any 
    854        concept of explicit transaction begin. 
    855 
    856        .. seealso:: 
    857 
    858            :ref:`tutorial_working_with_transactions` - in the 
    859            :ref:`unified_tutorial` 
    860 
    861            :meth:`_engine.Connection.begin_nested` - use a SAVEPOINT 
    862 
    863            :meth:`_engine.Connection.begin_twophase` - 
    864            use a two phase /XID transaction 
    865 
    866            :meth:`_engine.Engine.begin` - context manager available from 
    867            :class:`_engine.Engine` 
    868 
    869        """ 
    870        if self._transaction is None: 
    871            self._transaction = RootTransaction(self) 
    872            return self._transaction 
    873        else: 
    874            raise exc.InvalidRequestError( 
    875                "This connection has already initialized a SQLAlchemy " 
    876                "Transaction() object via begin() or autobegin; can't " 
    877                "call begin() here unless rollback() or commit() " 
    878                "is called first." 
    879            ) 
    880 
    881    def begin_nested(self) -> NestedTransaction: 
    882        """Begin a nested transaction (i.e. SAVEPOINT) and return a transaction 
    883        handle that controls the scope of the SAVEPOINT. 
    884 
    885        E.g.:: 
    886 
    887            with engine.begin() as connection: 
    888                with connection.begin_nested(): 
    889                    connection.execute(table.insert(), {"username": "sandy"}) 
    890 
    891        The returned object is an instance of 
    892        :class:`_engine.NestedTransaction`, which includes transactional 
    893        methods :meth:`_engine.NestedTransaction.commit` and 
    894        :meth:`_engine.NestedTransaction.rollback`; for a nested transaction, 
    895        these methods correspond to the operations "RELEASE SAVEPOINT <name>" 
    896        and "ROLLBACK TO SAVEPOINT <name>". The name of the savepoint is local 
    897        to the :class:`_engine.NestedTransaction` object and is generated 
    898        automatically. Like any other :class:`_engine.Transaction`, the 
    899        :class:`_engine.NestedTransaction` may be used as a context manager as 
    900        illustrated above which will "release" or "rollback" corresponding to 
    901        if the operation within the block were successful or raised an 
    902        exception. 
    903 
    904        Nested transactions require SAVEPOINT support in the underlying 
    905        database, else the behavior is undefined. SAVEPOINT is commonly used to 
    906        run operations within a transaction that may fail, while continuing the 
    907        outer transaction. E.g.:: 
    908 
    909            from sqlalchemy import exc 
    910 
    911            with engine.begin() as connection: 
    912                trans = connection.begin_nested() 
    913                try: 
    914                    connection.execute(table.insert(), {"username": "sandy"}) 
    915                    trans.commit() 
    916                except exc.IntegrityError:  # catch for duplicate username 
    917                    trans.rollback()  # rollback to savepoint 
    918 
    919                # outer transaction continues 
    920                connection.execute(...) 
    921 
    922        If :meth:`_engine.Connection.begin_nested` is called without first 
    923        calling :meth:`_engine.Connection.begin` or 
    924        :meth:`_engine.Engine.begin`, the :class:`_engine.Connection` object 
    925        will "autobegin" the outer transaction first. This outer transaction 
    926        may be committed using "commit-as-you-go" style, e.g.:: 
    927 
    928            with engine.connect() as connection:  # begin() wasn't called 
    929 
    930                with connection.begin_nested():  # will auto-"begin()" first 
    931                    connection.execute(...) 
    932                # savepoint is released 
    933 
    934                connection.execute(...) 
    935 
    936                # explicitly commit outer transaction 
    937                connection.commit() 
    938 
    939                # can continue working with connection here 
    940 
    941        .. versionchanged:: 2.0 
    942 
    943            :meth:`_engine.Connection.begin_nested` will now participate 
    944            in the connection "autobegin" behavior that is new as of 
    945            2.0 / "future" style connections in 1.4. 
    946 
    947        .. seealso:: 
    948 
    949            :meth:`_engine.Connection.begin` 
    950 
    951            :ref:`session_begin_nested` - ORM support for SAVEPOINT 
    952 
    953        """ 
    954        if self._transaction is None: 
    955            self._autobegin() 
    956 
    957        return NestedTransaction(self) 
    958 
    959    def begin_twophase(self, xid: Optional[Any] = None) -> TwoPhaseTransaction: 
    960        """Begin a two-phase or XA transaction and return a transaction 
    961        handle. 
    962 
    963        The returned object is an instance of :class:`.TwoPhaseTransaction`, 
    964        which in addition to the methods provided by 
    965        :class:`.Transaction`, also provides a 
    966        :meth:`~.TwoPhaseTransaction.prepare` method. 
    967 
    968        :param xid: the two phase transaction id.  If not supplied, a 
    969          random id will be generated. 
    970 
    971        .. seealso:: 
    972 
    973            :meth:`_engine.Connection.begin` 
    974 
    975            :meth:`_engine.Connection.begin_twophase` 
    976 
    977        """ 
    978 
    979        if self._transaction is not None: 
    980            raise exc.InvalidRequestError( 
    981                "Cannot start a two phase transaction when a transaction " 
    982                "is already in progress." 
    983            ) 
    984        if xid is None: 
    985            xid = self.engine.dialect.create_xid() 
    986        return TwoPhaseTransaction(self, xid) 
    987 
    988    def commit(self) -> None: 
    989        """Commit the transaction that is currently in progress. 
    990 
    991        This method commits the current transaction if one has been started. 
    992        If no transaction was started, the method has no effect, assuming 
    993        the connection is in a non-invalidated state. 
    994 
    995        A transaction is begun on a :class:`_engine.Connection` automatically 
    996        whenever a statement is first executed, or when the 
    997        :meth:`_engine.Connection.begin` method is called. 
    998 
    999        .. note:: The :meth:`_engine.Connection.commit` method only acts upon 
    1000          the primary database transaction that is linked to the 
    1001          :class:`_engine.Connection` object.  It does not operate upon a 
    1002          SAVEPOINT that would have been invoked from the 
    1003          :meth:`_engine.Connection.begin_nested` method; for control of a 
    1004          SAVEPOINT, call :meth:`_engine.NestedTransaction.commit` on the 
    1005          :class:`_engine.NestedTransaction` that is returned by the 
    1006          :meth:`_engine.Connection.begin_nested` method itself. 
    1007 
    1008 
    1009        """ 
    1010        if self._transaction: 
    1011            self._transaction.commit() 
    1012 
    1013    def rollback(self) -> None: 
    1014        """Roll back the transaction that is currently in progress. 
    1015 
    1016        This method rolls back the current transaction if one has been started. 
    1017        If no transaction was started, the method has no effect.  If a 
    1018        transaction was started and the connection is in an invalidated state, 
    1019        the transaction is cleared using this method. 
    1020 
    1021        A transaction is begun on a :class:`_engine.Connection` automatically 
    1022        whenever a statement is first executed, or when the 
    1023        :meth:`_engine.Connection.begin` method is called. 
    1024 
    1025        .. note:: The :meth:`_engine.Connection.rollback` method only acts 
    1026          upon the primary database transaction that is linked to the 
    1027          :class:`_engine.Connection` object.  It does not operate upon a 
    1028          SAVEPOINT that would have been invoked from the 
    1029          :meth:`_engine.Connection.begin_nested` method; for control of a 
    1030          SAVEPOINT, call :meth:`_engine.NestedTransaction.rollback` on the 
    1031          :class:`_engine.NestedTransaction` that is returned by the 
    1032          :meth:`_engine.Connection.begin_nested` method itself. 
    1033 
    1034 
    1035        """ 
    1036        if self._transaction: 
    1037            self._transaction.rollback() 
    1038 
    1039    def recover_twophase(self) -> List[Any]: 
    1040        return self.engine.dialect.do_recover_twophase(self) 
    1041 
    1042    def rollback_prepared(self, xid: Any, recover: bool = False) -> None: 
    1043        self.engine.dialect.do_rollback_twophase(self, xid, recover=recover) 
    1044 
    1045    def commit_prepared(self, xid: Any, recover: bool = False) -> None: 
    1046        self.engine.dialect.do_commit_twophase(self, xid, recover=recover) 
    1047 
    1048    def in_transaction(self) -> bool: 
    1049        """Return True if a transaction is in progress.""" 
    1050        return self._transaction is not None and self._transaction.is_active 
    1051 
    1052    def in_nested_transaction(self) -> bool: 
    1053        """Return True if a transaction is in progress.""" 
    1054        return ( 
    1055            self._nested_transaction is not None 
    1056            and self._nested_transaction.is_active 
    1057        ) 
    1058 
    1059    def _is_autocommit_isolation(self) -> bool: 
    1060        opt_iso = self._execution_options.get("isolation_level", None) 
    1061        return bool( 
    1062            opt_iso == "AUTOCOMMIT" 
    1063            or ( 
    1064                opt_iso is None 
    1065                and self.engine.dialect._on_connect_isolation_level 
    1066                == "AUTOCOMMIT" 
    1067            ) 
    1068        ) 
    1069 
    1070    def _get_required_transaction(self) -> RootTransaction: 
    1071        trans = self._transaction 
    1072        if trans is None: 
    1073            raise exc.InvalidRequestError("connection is not in a transaction") 
    1074        return trans 
    1075 
    1076    def _get_required_nested_transaction(self) -> NestedTransaction: 
    1077        trans = self._nested_transaction 
    1078        if trans is None: 
    1079            raise exc.InvalidRequestError( 
    1080                "connection is not in a nested transaction" 
    1081            ) 
    1082        return trans 
    1083 
    1084    def get_transaction(self) -> Optional[RootTransaction]: 
    1085        """Return the current root transaction in progress, if any. 
    1086 
    1087        .. versionadded:: 1.4 
    1088 
    1089        """ 
    1090 
    1091        return self._transaction 
    1092 
    1093    def get_nested_transaction(self) -> Optional[NestedTransaction]: 
    1094        """Return the current nested transaction in progress, if any. 
    1095 
    1096        .. versionadded:: 1.4 
    1097 
    1098        """ 
    1099        return self._nested_transaction 
    1100 
    1101    def _begin_impl(self, transaction: RootTransaction) -> None: 
    1102        if self._echo: 
    1103            if self._is_autocommit_isolation(): 
    1104                self._log_info( 
    1105                    "BEGIN (implicit; DBAPI should not BEGIN due to " 
    1106                    "autocommit mode)" 
    1107                ) 
    1108            else: 
    1109                self._log_info("BEGIN (implicit)") 
    1110 
    1111        self.__in_begin = True 
    1112 
    1113        if self._has_events or self.engine._has_events: 
    1114            self.dispatch.begin(self) 
    1115 
    1116        try: 
    1117            self.engine.dialect.do_begin(self.connection) 
    1118        except BaseException as e: 
    1119            self._handle_dbapi_exception(e, None, None, None, None) 
    1120        finally: 
    1121            self.__in_begin = False 
    1122 
    1123    def _rollback_impl(self) -> None: 
    1124        if self._has_events or self.engine._has_events: 
    1125            self.dispatch.rollback(self) 
    1126 
    1127        if self._still_open_and_dbapi_connection_is_valid: 
    1128            if self._echo: 
    1129                if self._is_autocommit_isolation(): 
    1130                    if self.dialect.skip_autocommit_rollback: 
    1131                        self._log_info( 
    1132                            "ROLLBACK will be skipped by " 
    1133                            "skip_autocommit_rollback" 
    1134                        ) 
    1135                    else: 
    1136                        self._log_info( 
    1137                            "ROLLBACK using DBAPI connection.rollback(); " 
    1138                            "set skip_autocommit_rollback to prevent fully" 
    1139                        ) 
    1140                else: 
    1141                    self._log_info("ROLLBACK") 
    1142            try: 
    1143                self.engine.dialect.do_rollback(self.connection) 
    1144            except BaseException as e: 
    1145                self._handle_dbapi_exception(e, None, None, None, None) 
    1146 
    1147    def _commit_impl(self) -> None: 
    1148        if self._has_events or self.engine._has_events: 
    1149            self.dispatch.commit(self) 
    1150 
    1151        if self._echo: 
    1152            if self._is_autocommit_isolation(): 
    1153                self._log_info( 
    1154                    "COMMIT using DBAPI connection.commit(), " 
    1155                    "has no effect due to autocommit mode" 
    1156                ) 
    1157            else: 
    1158                self._log_info("COMMIT") 
    1159        try: 
    1160            self.engine.dialect.do_commit(self.connection) 
    1161        except BaseException as e: 
    1162            self._handle_dbapi_exception(e, None, None, None, None) 
    1163 
    1164    def _savepoint_impl(self, name: Optional[str] = None) -> str: 
    1165        if self._has_events or self.engine._has_events: 
    1166            self.dispatch.savepoint(self, name) 
    1167 
    1168        if name is None: 
    1169            self.__savepoint_seq += 1 
    1170            name = "sa_savepoint_%s" % self.__savepoint_seq 
    1171        self.engine.dialect.do_savepoint(self, name) 
    1172        return name 
    1173 
    1174    def _rollback_to_savepoint_impl(self, name: str) -> None: 
    1175        if self._has_events or self.engine._has_events: 
    1176            self.dispatch.rollback_savepoint(self, name, None) 
    1177 
    1178        if self._still_open_and_dbapi_connection_is_valid: 
    1179            self.engine.dialect.do_rollback_to_savepoint(self, name) 
    1180 
    1181    def _release_savepoint_impl(self, name: str) -> None: 
    1182        if self._has_events or self.engine._has_events: 
    1183            self.dispatch.release_savepoint(self, name, None) 
    1184 
    1185        self.engine.dialect.do_release_savepoint(self, name) 
    1186 
    1187    def _begin_twophase_impl(self, transaction: TwoPhaseTransaction) -> None: 
    1188        if self._echo: 
    1189            self._log_info("BEGIN TWOPHASE (implicit)") 
    1190        if self._has_events or self.engine._has_events: 
    1191            self.dispatch.begin_twophase(self, transaction.xid) 
    1192 
    1193        self.__in_begin = True 
    1194        try: 
    1195            self.engine.dialect.do_begin_twophase(self, transaction.xid) 
    1196        except BaseException as e: 
    1197            self._handle_dbapi_exception(e, None, None, None, None) 
    1198        finally: 
    1199            self.__in_begin = False 
    1200 
    1201    def _prepare_twophase_impl(self, xid: Any) -> None: 
    1202        if self._has_events or self.engine._has_events: 
    1203            self.dispatch.prepare_twophase(self, xid) 
    1204 
    1205        assert isinstance(self._transaction, TwoPhaseTransaction) 
    1206        try: 
    1207            self.engine.dialect.do_prepare_twophase(self, xid) 
    1208        except BaseException as e: 
    1209            self._handle_dbapi_exception(e, None, None, None, None) 
    1210 
    1211    def _rollback_twophase_impl(self, xid: Any, is_prepared: bool) -> None: 
    1212        if self._has_events or self.engine._has_events: 
    1213            self.dispatch.rollback_twophase(self, xid, is_prepared) 
    1214 
    1215        if self._still_open_and_dbapi_connection_is_valid: 
    1216            assert isinstance(self._transaction, TwoPhaseTransaction) 
    1217            try: 
    1218                self.engine.dialect.do_rollback_twophase( 
    1219                    self, xid, is_prepared 
    1220                ) 
    1221            except BaseException as e: 
    1222                self._handle_dbapi_exception(e, None, None, None, None) 
    1223 
    1224    def _commit_twophase_impl(self, xid: Any, is_prepared: bool) -> None: 
    1225        if self._has_events or self.engine._has_events: 
    1226            self.dispatch.commit_twophase(self, xid, is_prepared) 
    1227 
    1228        assert isinstance(self._transaction, TwoPhaseTransaction) 
    1229        try: 
    1230            self.engine.dialect.do_commit_twophase(self, xid, is_prepared) 
    1231        except BaseException as e: 
    1232            self._handle_dbapi_exception(e, None, None, None, None) 
    1233 
    1234    def close(self) -> None: 
    1235        """Close this :class:`_engine.Connection`. 
    1236 
    1237        This results in a release of the underlying database 
    1238        resources, that is, the DBAPI connection referenced 
    1239        internally. The DBAPI connection is typically restored 
    1240        back to the connection-holding :class:`_pool.Pool` referenced 
    1241        by the :class:`_engine.Engine` that produced this 
    1242        :class:`_engine.Connection`. Any transactional state present on 
    1243        the DBAPI connection is also unconditionally released via 
    1244        the DBAPI connection's ``rollback()`` method, regardless 
    1245        of any :class:`.Transaction` object that may be 
    1246        outstanding with regards to this :class:`_engine.Connection`. 
    1247 
    1248        This has the effect of also calling :meth:`_engine.Connection.rollback` 
    1249        if any transaction is in place. 
    1250 
    1251        After :meth:`_engine.Connection.close` is called, the 
    1252        :class:`_engine.Connection` is permanently in a closed state, 
    1253        and will allow no further operations. 
    1254 
    1255        """ 
    1256 
    1257        if self._transaction: 
    1258            self._transaction.close() 
    1259            skip_reset = True 
    1260        else: 
    1261            skip_reset = False 
    1262 
    1263        if self._dbapi_connection is not None: 
    1264            conn = self._dbapi_connection 
    1265 
    1266            # as we just closed the transaction, close the connection 
    1267            # pool connection without doing an additional reset 
    1268            if skip_reset: 
    1269                cast("_ConnectionFairy", conn)._close_special( 
    1270                    transaction_reset=True 
    1271                ) 
    1272            else: 
    1273                conn.close() 
    1274 
    1275            # There is a slight chance that conn.close() may have 
    1276            # triggered an invalidation here in which case 
    1277            # _dbapi_connection would already be None, however usually 
    1278            # it will be non-None here and in a "closed" state. 
    1279            self._dbapi_connection = None 
    1280        self.__can_reconnect = False 
    1281 
    1282    @overload 
    1283    def scalar( 
    1284        self, 
    1285        statement: TypedReturnsRows[_T], 
    1286        parameters: Optional[_CoreSingleExecuteParams] = None, 
    1287        *, 
    1288        execution_options: Optional[CoreExecuteOptionsParameter] = None, 
    1289    ) -> Optional[_T]: ... 
    1290 
    1291    @overload 
    1292    def scalar( 
    1293        self, 
    1294        statement: Executable, 
    1295        parameters: Optional[_CoreSingleExecuteParams] = None, 
    1296        *, 
    1297        execution_options: Optional[CoreExecuteOptionsParameter] = None, 
    1298    ) -> Any: ... 
    1299 
    1300    def scalar( 
    1301        self, 
    1302        statement: Executable, 
    1303        parameters: Optional[_CoreSingleExecuteParams] = None, 
    1304        *, 
    1305        execution_options: Optional[CoreExecuteOptionsParameter] = None, 
    1306    ) -> Any: 
    1307        r"""Executes a SQL statement construct and returns a scalar object. 
    1308 
    1309        This method is shorthand for invoking the 
    1310        :meth:`_engine.Result.scalar` method after invoking the 
    1311        :meth:`_engine.Connection.execute` method.  Parameters are equivalent. 
    1312 
    1313        :return: a scalar Python value representing the first column of the 
    1314         first row returned. 
    1315 
    1316        """ 
    1317        distilled_parameters = _distill_params_20(parameters) 
    1318        try: 
    1319            meth = statement._execute_on_scalar 
    1320        except AttributeError as err: 
    1321            raise exc.ObjectNotExecutableError(statement) from err 
    1322        else: 
    1323            return meth( 
    1324                self, 
    1325                distilled_parameters, 
    1326                execution_options or NO_OPTIONS, 
    1327            ) 
    1328 
    1329    @overload 
    1330    def scalars( 
    1331        self, 
    1332        statement: TypedReturnsRows[_T], 
    1333        parameters: Optional[_CoreAnyExecuteParams] = None, 
    1334        *, 
    1335        execution_options: Optional[CoreExecuteOptionsParameter] = None, 
    1336    ) -> ScalarResult[_T]: ... 
    1337 
    1338    @overload 
    1339    def scalars( 
    1340        self, 
    1341        statement: Executable, 
    1342        parameters: Optional[_CoreAnyExecuteParams] = None, 
    1343        *, 
    1344        execution_options: Optional[CoreExecuteOptionsParameter] = None, 
    1345    ) -> ScalarResult[Any]: ... 
    1346 
    1347    def scalars( 
    1348        self, 
    1349        statement: Executable, 
    1350        parameters: Optional[_CoreAnyExecuteParams] = None, 
    1351        *, 
    1352        execution_options: Optional[CoreExecuteOptionsParameter] = None, 
    1353    ) -> ScalarResult[Any]: 
    1354        """Executes and returns a scalar result set, which yields scalar values 
    1355        from the first column of each row. 
    1356 
    1357        This method is equivalent to calling :meth:`_engine.Connection.execute` 
    1358        to receive a :class:`_result.Result` object, then invoking the 
    1359        :meth:`_result.Result.scalars` method to produce a 
    1360        :class:`_result.ScalarResult` instance. 
    1361 
    1362        :return: a :class:`_result.ScalarResult` 
    1363 
    1364        .. versionadded:: 1.4.24 
    1365 
    1366        """ 
    1367 
    1368        return self.execute( 
    1369            statement, parameters, execution_options=execution_options 
    1370        ).scalars() 
    1371 
    1372    @overload 
    1373    def execute( 
    1374        self, 
    1375        statement: TypedReturnsRows[Unpack[_Ts]], 
    1376        parameters: Optional[_CoreAnyExecuteParams] = None, 
    1377        *, 
    1378        execution_options: Optional[CoreExecuteOptionsParameter] = None, 
    1379    ) -> CursorResult[Unpack[_Ts]]: ... 
    1380 
    1381    @overload 
    1382    def execute( 
    1383        self, 
    1384        statement: Executable, 
    1385        parameters: Optional[_CoreAnyExecuteParams] = None, 
    1386        *, 
    1387        execution_options: Optional[CoreExecuteOptionsParameter] = None, 
    1388    ) -> CursorResult[Unpack[TupleAny]]: ... 
    1389 
    1390    def execute( 
    1391        self, 
    1392        statement: Executable, 
    1393        parameters: Optional[_CoreAnyExecuteParams] = None, 
    1394        *, 
    1395        execution_options: Optional[CoreExecuteOptionsParameter] = None, 
    1396    ) -> CursorResult[Unpack[TupleAny]]: 
    1397        r"""Executes a SQL statement construct and returns a 
    1398        :class:`_engine.CursorResult`. 
    1399 
    1400        :param statement: The statement to be executed.  This is always 
    1401         an object that is in both the :class:`_expression.ClauseElement` and 
    1402         :class:`_expression.Executable` hierarchies, including: 
    1403 
    1404         * :class:`_expression.Select` 
    1405         * :class:`_expression.Insert`, :class:`_expression.Update`, 
    1406           :class:`_expression.Delete` 
    1407         * :class:`_expression.TextClause` and 
    1408           :class:`_expression.TextualSelect` 
    1409         * :class:`_schema.DDL` and objects which inherit from 
    1410           :class:`_schema.ExecutableDDLElement` 
    1411 
    1412        :param parameters: parameters which will be bound into the statement. 
    1413         This may be either a dictionary of parameter names to values, 
    1414         or a mutable sequence (e.g. a list) of dictionaries.  When a 
    1415         list of dictionaries is passed, the underlying statement execution 
    1416         will make use of the DBAPI ``cursor.executemany()`` method. 
    1417         When a single dictionary is passed, the DBAPI ``cursor.execute()`` 
    1418         method will be used. 
    1419 
    1420        :param execution_options: optional dictionary of execution options, 
    1421         which will be associated with the statement execution.  This 
    1422         dictionary can provide a subset of the options that are accepted 
    1423         by :meth:`_engine.Connection.execution_options`. 
    1424 
    1425        :return: a :class:`_engine.Result` object. 
    1426 
    1427        """ 
    1428        distilled_parameters = _distill_params_20(parameters) 
    1429        try: 
    1430            meth = statement._execute_on_connection 
    1431        except AttributeError as err: 
    1432            raise exc.ObjectNotExecutableError(statement) from err 
    1433        else: 
    1434            return meth( 
    1435                self, 
    1436                distilled_parameters, 
    1437                execution_options or NO_OPTIONS, 
    1438            ) 
    1439 
    1440    def _execute_function( 
    1441        self, 
    1442        func: FunctionElement[Any], 
    1443        distilled_parameters: _CoreMultiExecuteParams, 
    1444        execution_options: CoreExecuteOptionsParameter, 
    1445    ) -> CursorResult[Unpack[TupleAny]]: 
    1446        """Execute a sql.FunctionElement object.""" 
    1447 
    1448        return self._execute_clauseelement( 
    1449            func.select(), distilled_parameters, execution_options 
    1450        ) 
    1451 
    1452    def _execute_default( 
    1453        self, 
    1454        default: DefaultGenerator, 
    1455        distilled_parameters: _CoreMultiExecuteParams, 
    1456        execution_options: CoreExecuteOptionsParameter, 
    1457    ) -> Any: 
    1458        """Execute a schema.ColumnDefault object.""" 
    1459 
    1460        exec_opts = self._execution_options.merge_with(execution_options) 
    1461 
    1462        event_multiparams: Optional[_CoreMultiExecuteParams] 
    1463        event_params: Optional[_CoreAnyExecuteParams] 
    1464 
    1465        # note for event handlers, the "distilled parameters" which is always 
    1466        # a list of dicts is broken out into separate "multiparams" and 
    1467        # "params" collections, which allows the handler to distinguish 
    1468        # between an executemany and execute style set of parameters. 
    1469        if self._has_events or self.engine._has_events: 
    1470            ( 
    1471                default, 
    1472                distilled_parameters, 
    1473                event_multiparams, 
    1474                event_params, 
    1475            ) = self._invoke_before_exec_event( 
    1476                default, distilled_parameters, exec_opts 
    1477            ) 
    1478        else: 
    1479            event_multiparams = event_params = None 
    1480 
    1481        try: 
    1482            conn = self._dbapi_connection 
    1483            if conn is None: 
    1484                conn = self._revalidate_connection() 
    1485 
    1486            dialect = self.dialect 
    1487            ctx = dialect.execution_ctx_cls._init_default( 
    1488                dialect, self, conn, exec_opts 
    1489            ) 
    1490        except (exc.PendingRollbackError, exc.ResourceClosedError): 
    1491            raise 
    1492        except BaseException as e: 
    1493            self._handle_dbapi_exception(e, None, None, None, None) 
    1494 
    1495        ret = ctx._exec_default(None, default, None) 
    1496 
    1497        if self._has_events or self.engine._has_events: 
    1498            self.dispatch.after_execute( 
    1499                self, 
    1500                default, 
    1501                event_multiparams, 
    1502                event_params, 
    1503                exec_opts, 
    1504                ret, 
    1505            ) 
    1506 
    1507        return ret 
    1508 
    1509    def _execute_ddl( 
    1510        self, 
    1511        ddl: ExecutableDDLElement, 
    1512        distilled_parameters: _CoreMultiExecuteParams, 
    1513        execution_options: CoreExecuteOptionsParameter, 
    1514    ) -> CursorResult[Unpack[TupleAny]]: 
    1515        """Execute a schema.DDL object.""" 
    1516 
    1517        exec_opts = ddl._execution_options.merge_with( 
    1518            self._execution_options, execution_options 
    1519        ) 
    1520 
    1521        event_multiparams: Optional[_CoreMultiExecuteParams] 
    1522        event_params: Optional[_CoreSingleExecuteParams] 
    1523 
    1524        if self._has_events or self.engine._has_events: 
    1525            ( 
    1526                ddl, 
    1527                distilled_parameters, 
    1528                event_multiparams, 
    1529                event_params, 
    1530            ) = self._invoke_before_exec_event( 
    1531                ddl, distilled_parameters, exec_opts 
    1532            ) 
    1533        else: 
    1534            event_multiparams = event_params = None 
    1535 
    1536        schema_translate_map = exec_opts.get("schema_translate_map", None) 
    1537 
    1538        dialect = self.dialect 
    1539 
    1540        compiled = ddl.compile( 
    1541            dialect=dialect, schema_translate_map=schema_translate_map 
    1542        ) 
    1543        ret = self._execute_context( 
    1544            dialect, 
    1545            dialect.execution_ctx_cls._init_ddl, 
    1546            compiled, 
    1547            None, 
    1548            exec_opts, 
    1549            compiled, 
    1550        ) 
    1551        if self._has_events or self.engine._has_events: 
    1552            self.dispatch.after_execute( 
    1553                self, 
    1554                ddl, 
    1555                event_multiparams, 
    1556                event_params, 
    1557                exec_opts, 
    1558                ret, 
    1559            ) 
    1560        return ret 
    1561 
    1562    def _invoke_before_exec_event( 
    1563        self, 
    1564        elem: Any, 
    1565        distilled_params: _CoreMultiExecuteParams, 
    1566        execution_options: _ExecuteOptions, 
    1567    ) -> Tuple[ 
    1568        Any, 
    1569        _CoreMultiExecuteParams, 
    1570        _CoreMultiExecuteParams, 
    1571        _CoreSingleExecuteParams, 
    1572    ]: 
    1573        event_multiparams: _CoreMultiExecuteParams 
    1574        event_params: _CoreSingleExecuteParams 
    1575 
    1576        if len(distilled_params) == 1: 
    1577            event_multiparams, event_params = [], distilled_params[0] 
    1578        else: 
    1579            event_multiparams, event_params = distilled_params, {} 
    1580 
    1581        for fn in self.dispatch.before_execute: 
    1582            elem, event_multiparams, event_params = fn( 
    1583                self, 
    1584                elem, 
    1585                event_multiparams, 
    1586                event_params, 
    1587                execution_options, 
    1588            ) 
    1589 
    1590        if event_multiparams: 
    1591            distilled_params = list(event_multiparams) 
    1592            if event_params: 
    1593                raise exc.InvalidRequestError( 
    1594                    "Event handler can't return non-empty multiparams " 
    1595                    "and params at the same time" 
    1596                ) 
    1597        elif event_params: 
    1598            distilled_params = [event_params] 
    1599        else: 
    1600            distilled_params = [] 
    1601 
    1602        return elem, distilled_params, event_multiparams, event_params 
    1603 
    1604    def _execute_clauseelement( 
    1605        self, 
    1606        elem: Executable, 
    1607        distilled_parameters: _CoreMultiExecuteParams, 
    1608        execution_options: CoreExecuteOptionsParameter, 
    1609    ) -> CursorResult[Unpack[TupleAny]]: 
    1610        """Execute a sql.ClauseElement object.""" 
    1611 
    1612        exec_opts = elem._execution_options.merge_with( 
    1613            self._execution_options, execution_options 
    1614        ) 
    1615 
    1616        has_events = self._has_events or self.engine._has_events 
    1617        if has_events: 
    1618            ( 
    1619                elem, 
    1620                distilled_parameters, 
    1621                event_multiparams, 
    1622                event_params, 
    1623            ) = self._invoke_before_exec_event( 
    1624                elem, distilled_parameters, exec_opts 
    1625            ) 
    1626 
    1627        if distilled_parameters: 
    1628            # ensure we don't retain a link to the view object for keys() 
    1629            # which links to the values, which we don't want to cache 
    1630            keys = sorted(distilled_parameters[0]) 
    1631            for_executemany = len(distilled_parameters) > 1 
    1632        else: 
    1633            keys = [] 
    1634            for_executemany = False 
    1635 
    1636        dialect = self.dialect 
    1637 
    1638        schema_translate_map = exec_opts.get("schema_translate_map", None) 
    1639 
    1640        compiled_cache: Optional[CompiledCacheType] = exec_opts.get( 
    1641            "compiled_cache", self.engine._compiled_cache 
    1642        ) 
    1643 
    1644        compiled_sql, extracted_params, cache_hit = elem._compile_w_cache( 
    1645            dialect=dialect, 
    1646            compiled_cache=compiled_cache, 
    1647            column_keys=keys, 
    1648            for_executemany=for_executemany, 
    1649            schema_translate_map=schema_translate_map, 
    1650            linting=self.dialect.compiler_linting | compiler.WARN_LINTING, 
    1651        ) 
    1652        ret = self._execute_context( 
    1653            dialect, 
    1654            dialect.execution_ctx_cls._init_compiled, 
    1655            compiled_sql, 
    1656            distilled_parameters, 
    1657            exec_opts, 
    1658            compiled_sql, 
    1659            distilled_parameters, 
    1660            elem, 
    1661            extracted_params, 
    1662            cache_hit=cache_hit, 
    1663        ) 
    1664        if has_events: 
    1665            self.dispatch.after_execute( 
    1666                self, 
    1667                elem, 
    1668                event_multiparams, 
    1669                event_params, 
    1670                exec_opts, 
    1671                ret, 
    1672            ) 
    1673        return ret 
    1674 
    1675    def exec_driver_sql( 
    1676        self, 
    1677        statement: str, 
    1678        parameters: Optional[_DBAPIAnyExecuteParams] = None, 
    1679        execution_options: Optional[CoreExecuteOptionsParameter] = None, 
    1680    ) -> CursorResult[Unpack[TupleAny]]: 
    1681        r"""Executes a string SQL statement on the DBAPI cursor directly, 
    1682        without any SQL compilation steps. 
    1683 
    1684        This can be used to pass any string directly to the 
    1685        ``cursor.execute()`` method of the DBAPI in use. 
    1686 
    1687        :param statement: The statement str to be executed.   Bound parameters 
    1688         must use the underlying DBAPI's paramstyle, such as "qmark", 
    1689         "pyformat", "format", etc. 
    1690 
    1691        :param parameters: represent bound parameter values to be used in the 
    1692         execution.  The format is one of:   a dictionary of named parameters, 
    1693         a tuple of positional parameters, or a list containing either 
    1694         dictionaries or tuples for multiple-execute support. 
    1695 
    1696        :return: a :class:`_engine.CursorResult`. 
    1697 
    1698         E.g. multiple dictionaries:: 
    1699 
    1700 
    1701             conn.exec_driver_sql( 
    1702                 "INSERT INTO table (id, value) VALUES (%(id)s, %(value)s)", 
    1703                 [{"id": 1, "value": "v1"}, {"id": 2, "value": "v2"}], 
    1704             ) 
    1705 
    1706         Single dictionary:: 
    1707 
    1708             conn.exec_driver_sql( 
    1709                 "INSERT INTO table (id, value) VALUES (%(id)s, %(value)s)", 
    1710                 dict(id=1, value="v1"), 
    1711             ) 
    1712 
    1713         Single tuple:: 
    1714 
    1715             conn.exec_driver_sql( 
    1716                 "INSERT INTO table (id, value) VALUES (?, ?)", (1, "v1") 
    1717             ) 
    1718 
    1719         .. note:: The :meth:`_engine.Connection.exec_driver_sql` method does 
    1720             not participate in the 
    1721             :meth:`_events.ConnectionEvents.before_execute` and 
    1722             :meth:`_events.ConnectionEvents.after_execute` events.   To 
    1723             intercept calls to :meth:`_engine.Connection.exec_driver_sql`, use 
    1724             :meth:`_events.ConnectionEvents.before_cursor_execute` and 
    1725             :meth:`_events.ConnectionEvents.after_cursor_execute`. 
    1726 
    1727         .. seealso:: 
    1728 
    1729            :pep:`249` 
    1730 
    1731        """ 
    1732 
    1733        distilled_parameters = _distill_raw_params(parameters) 
    1734 
    1735        exec_opts = self._execution_options.merge_with(execution_options) 
    1736 
    1737        dialect = self.dialect 
    1738        ret = self._execute_context( 
    1739            dialect, 
    1740            dialect.execution_ctx_cls._init_statement, 
    1741            statement, 
    1742            None, 
    1743            exec_opts, 
    1744            statement, 
    1745            distilled_parameters, 
    1746        ) 
    1747 
    1748        return ret 
    1749 
    1750    def _execute_context( 
    1751        self, 
    1752        dialect: Dialect, 
    1753        constructor: Callable[..., ExecutionContext], 
    1754        statement: Union[str, Compiled], 
    1755        parameters: Optional[_AnyMultiExecuteParams], 
    1756        execution_options: _ExecuteOptions, 
    1757        *args: Any, 
    1758        **kw: Any, 
    1759    ) -> CursorResult[Unpack[TupleAny]]: 
    1760        """Create an :class:`.ExecutionContext` and execute, returning 
    1761        a :class:`_engine.CursorResult`.""" 
    1762 
    1763        if execution_options: 
    1764            yp = execution_options.get("yield_per", None) 
    1765            if yp: 
    1766                execution_options = execution_options.union( 
    1767                    {"stream_results": True, "max_row_buffer": yp} 
    1768                ) 
    1769        try: 
    1770            conn = self._dbapi_connection 
    1771            if conn is None: 
    1772                conn = self._revalidate_connection() 
    1773 
    1774            context = constructor( 
    1775                dialect, self, conn, execution_options, *args, **kw 
    1776            ) 
    1777        except (exc.PendingRollbackError, exc.ResourceClosedError): 
    1778            raise 
    1779        except BaseException as e: 
    1780            self._handle_dbapi_exception( 
    1781                e, str(statement), parameters, None, None 
    1782            ) 
    1783 
    1784        if ( 
    1785            self._transaction 
    1786            and not self._transaction.is_active 
    1787            or ( 
    1788                self._nested_transaction 
    1789                and not self._nested_transaction.is_active 
    1790            ) 
    1791        ): 
    1792            self._invalid_transaction() 
    1793 
    1794        elif self._trans_context_manager: 
    1795            TransactionalContext._trans_ctx_check(self) 
    1796 
    1797        if self._transaction is None: 
    1798            self._autobegin() 
    1799 
    1800        context.pre_exec() 
    1801 
    1802        if context.execute_style is ExecuteStyle.INSERTMANYVALUES: 
    1803            return self._exec_insertmany_context(dialect, context) 
    1804        else: 
    1805            return self._exec_single_context( 
    1806                dialect, context, statement, parameters 
    1807            ) 
    1808 
    1809    def _exec_single_context( 
    1810        self, 
    1811        dialect: Dialect, 
    1812        context: ExecutionContext, 
    1813        statement: Union[str, Compiled], 
    1814        parameters: Optional[_AnyMultiExecuteParams], 
    1815    ) -> CursorResult[Unpack[TupleAny]]: 
    1816        """continue the _execute_context() method for a single DBAPI 
    1817        cursor.execute() or cursor.executemany() call. 
    1818 
    1819        """ 
    1820        if dialect.bind_typing is BindTyping.SETINPUTSIZES: 
    1821            generic_setinputsizes = context._prepare_set_input_sizes() 
    1822 
    1823            if generic_setinputsizes: 
    1824                try: 
    1825                    dialect.do_set_input_sizes( 
    1826                        context.cursor, generic_setinputsizes, context 
    1827                    ) 
    1828                except BaseException as e: 
    1829                    self._handle_dbapi_exception( 
    1830                        e, str(statement), parameters, None, context 
    1831                    ) 
    1832 
    1833        cursor, str_statement, parameters = ( 
    1834            context.cursor, 
    1835            context.statement, 
    1836            context.parameters, 
    1837        ) 
    1838 
    1839        effective_parameters: Optional[_AnyExecuteParams] 
    1840 
    1841        if not context.executemany: 
    1842            effective_parameters = parameters[0] 
    1843        else: 
    1844            effective_parameters = parameters 
    1845 
    1846        if self._has_events or self.engine._has_events: 
    1847            for fn in self.dispatch.before_cursor_execute: 
    1848                str_statement, effective_parameters = fn( 
    1849                    self, 
    1850                    cursor, 
    1851                    str_statement, 
    1852                    effective_parameters, 
    1853                    context, 
    1854                    context.executemany, 
    1855                ) 
    1856 
    1857        if self._echo: 
    1858            self._log_info(str_statement) 
    1859 
    1860            stats = context._get_cache_stats() 
    1861 
    1862            if not self.engine.hide_parameters: 
    1863                self._log_info( 
    1864                    "[%s] %r", 
    1865                    stats, 
    1866                    sql_util._repr_params( 
    1867                        effective_parameters, 
    1868                        batches=10, 
    1869                        ismulti=context.executemany, 
    1870                    ), 
    1871                ) 
    1872            else: 
    1873                self._log_info( 
    1874                    "[%s] [SQL parameters hidden due to hide_parameters=True]", 
    1875                    stats, 
    1876                ) 
    1877 
    1878        evt_handled: bool = False 
    1879        try: 
    1880            if context.execute_style is ExecuteStyle.EXECUTEMANY: 
    1881                effective_parameters = cast( 
    1882                    "_CoreMultiExecuteParams", effective_parameters 
    1883                ) 
    1884                if self.dialect._has_events: 
    1885                    for fn in self.dialect.dispatch.do_executemany: 
    1886                        if fn( 
    1887                            cursor, 
    1888                            str_statement, 
    1889                            effective_parameters, 
    1890                            context, 
    1891                        ): 
    1892                            evt_handled = True 
    1893                            break 
    1894                if not evt_handled: 
    1895                    self.dialect.do_executemany( 
    1896                        cursor, 
    1897                        str_statement, 
    1898                        effective_parameters, 
    1899                        context, 
    1900                    ) 
    1901            elif not effective_parameters and context.no_parameters: 
    1902                if self.dialect._has_events: 
    1903                    for fn in self.dialect.dispatch.do_execute_no_params: 
    1904                        if fn(cursor, str_statement, context): 
    1905                            evt_handled = True 
    1906                            break 
    1907                if not evt_handled: 
    1908                    self.dialect.do_execute_no_params( 
    1909                        cursor, str_statement, context 
    1910                    ) 
    1911            else: 
    1912                effective_parameters = cast( 
    1913                    "_CoreSingleExecuteParams", effective_parameters 
    1914                ) 
    1915                if self.dialect._has_events: 
    1916                    for fn in self.dialect.dispatch.do_execute: 
    1917                        if fn( 
    1918                            cursor, 
    1919                            str_statement, 
    1920                            effective_parameters, 
    1921                            context, 
    1922                        ): 
    1923                            evt_handled = True 
    1924                            break 
    1925                if not evt_handled: 
    1926                    self.dialect.do_execute( 
    1927                        cursor, str_statement, effective_parameters, context 
    1928                    ) 
    1929 
    1930            if self._has_events or self.engine._has_events: 
    1931                self.dispatch.after_cursor_execute( 
    1932                    self, 
    1933                    cursor, 
    1934                    str_statement, 
    1935                    effective_parameters, 
    1936                    context, 
    1937                    context.executemany, 
    1938                ) 
    1939 
    1940            context.post_exec() 
    1941 
    1942            result = context._setup_result_proxy() 
    1943 
    1944        except BaseException as e: 
    1945            self._handle_dbapi_exception( 
    1946                e, str_statement, effective_parameters, cursor, context 
    1947            ) 
    1948 
    1949        return result 
    1950 
    1951    def _exec_insertmany_context( 
    1952        self, 
    1953        dialect: Dialect, 
    1954        context: ExecutionContext, 
    1955    ) -> CursorResult[Unpack[TupleAny]]: 
    1956        """continue the _execute_context() method for an "insertmanyvalues" 
    1957        operation, which will invoke DBAPI 
    1958        cursor.execute() one or more times with individual log and 
    1959        event hook calls. 
    1960 
    1961        """ 
    1962 
    1963        if dialect.bind_typing is BindTyping.SETINPUTSIZES: 
    1964            generic_setinputsizes = context._prepare_set_input_sizes() 
    1965        else: 
    1966            generic_setinputsizes = None 
    1967 
    1968        cursor, str_statement, parameters = ( 
    1969            context.cursor, 
    1970            context.statement, 
    1971            context.parameters, 
    1972        ) 
    1973 
    1974        effective_parameters = parameters 
    1975 
    1976        engine_events = self._has_events or self.engine._has_events 
    1977        if self.dialect._has_events: 
    1978            do_execute_dispatch: Iterable[Any] = ( 
    1979                self.dialect.dispatch.do_execute 
    1980            ) 
    1981        else: 
    1982            do_execute_dispatch = () 
    1983 
    1984        if self._echo: 
    1985            stats = context._get_cache_stats() + " (insertmanyvalues)" 
    1986 
    1987        preserve_rowcount = context.execution_options.get( 
    1988            "preserve_rowcount", False 
    1989        ) 
    1990        rowcount = 0 
    1991 
    1992        for imv_batch in dialect._deliver_insertmanyvalues_batches( 
    1993            self, 
    1994            cursor, 
    1995            str_statement, 
    1996            effective_parameters, 
    1997            generic_setinputsizes, 
    1998            context, 
    1999        ): 
    2000            if imv_batch.processed_setinputsizes: 
    2001                try: 
    2002                    dialect.do_set_input_sizes( 
    2003                        context.cursor, 
    2004                        imv_batch.processed_setinputsizes, 
    2005                        context, 
    2006                    ) 
    2007                except BaseException as e: 
    2008                    self._handle_dbapi_exception( 
    2009                        e, 
    2010                        sql_util._long_statement(imv_batch.replaced_statement), 
    2011                        imv_batch.replaced_parameters, 
    2012                        None, 
    2013                        context, 
    2014                        is_sub_exec=True, 
    2015                    ) 
    2016 
    2017            sub_stmt = imv_batch.replaced_statement 
    2018            sub_params = imv_batch.replaced_parameters 
    2019 
    2020            if engine_events: 
    2021                for fn in self.dispatch.before_cursor_execute: 
    2022                    sub_stmt, sub_params = fn( 
    2023                        self, 
    2024                        cursor, 
    2025                        sub_stmt, 
    2026                        sub_params, 
    2027                        context, 
    2028                        True, 
    2029                    ) 
    2030 
    2031            if self._echo: 
    2032                self._log_info(sql_util._long_statement(sub_stmt)) 
    2033 
    2034                imv_stats = f""" {imv_batch.batchnum}/{ 
    2035                            imv_batch.total_batches 
    2036                } ({ 
    2037                    'ordered' 
    2038                    if imv_batch.rows_sorted else 'unordered' 
    2039                }{ 
    2040                    '; batch not supported' 
    2041                    if imv_batch.is_downgraded 
    2042                    else '' 
    2043                })""" 
    2044 
    2045                if imv_batch.batchnum == 1: 
    2046                    stats += imv_stats 
    2047                else: 
    2048                    stats = f"insertmanyvalues{imv_stats}" 
    2049 
    2050                if not self.engine.hide_parameters: 
    2051                    self._log_info( 
    2052                        "[%s] %r", 
    2053                        stats, 
    2054                        sql_util._repr_params( 
    2055                            sub_params, 
    2056                            batches=10, 
    2057                            ismulti=False, 
    2058                        ), 
    2059                    ) 
    2060                else: 
    2061                    self._log_info( 
    2062                        "[%s] [SQL parameters hidden due to " 
    2063                        "hide_parameters=True]", 
    2064                        stats, 
    2065                    ) 
    2066 
    2067            try: 
    2068                for fn in do_execute_dispatch: 
    2069                    if fn( 
    2070                        cursor, 
    2071                        sub_stmt, 
    2072                        sub_params, 
    2073                        context, 
    2074                    ): 
    2075                        break 
    2076                else: 
    2077                    dialect.do_execute( 
    2078                        cursor, 
    2079                        sub_stmt, 
    2080                        sub_params, 
    2081                        context, 
    2082                    ) 
    2083 
    2084            except BaseException as e: 
    2085                self._handle_dbapi_exception( 
    2086                    e, 
    2087                    sql_util._long_statement(sub_stmt), 
    2088                    sub_params, 
    2089                    cursor, 
    2090                    context, 
    2091                    is_sub_exec=True, 
    2092                ) 
    2093 
    2094            if engine_events: 
    2095                self.dispatch.after_cursor_execute( 
    2096                    self, 
    2097                    cursor, 
    2098                    str_statement, 
    2099                    effective_parameters, 
    2100                    context, 
    2101                    context.executemany, 
    2102                ) 
    2103 
    2104            if preserve_rowcount: 
    2105                rowcount += imv_batch.current_batch_size 
    2106 
    2107        try: 
    2108            context.post_exec() 
    2109 
    2110            if preserve_rowcount: 
    2111                context._rowcount = rowcount  # type: ignore[attr-defined] 
    2112 
    2113            result = context._setup_result_proxy() 
    2114 
    2115        except BaseException as e: 
    2116            self._handle_dbapi_exception( 
    2117                e, str_statement, effective_parameters, cursor, context 
    2118            ) 
    2119 
    2120        return result 
    2121 
    2122    def _cursor_execute( 
    2123        self, 
    2124        cursor: DBAPICursor, 
    2125        statement: str, 
    2126        parameters: _DBAPISingleExecuteParams, 
    2127        context: Optional[ExecutionContext] = None, 
    2128    ) -> None: 
    2129        """Execute a statement + params on the given cursor. 
    2130 
    2131        Adds appropriate logging and exception handling. 
    2132 
    2133        This method is used by DefaultDialect for special-case 
    2134        executions, such as for sequences and column defaults. 
    2135        The path of statement execution in the majority of cases 
    2136        terminates at _execute_context(). 
    2137 
    2138        """ 
    2139        if self._has_events or self.engine._has_events: 
    2140            for fn in self.dispatch.before_cursor_execute: 
    2141                statement, parameters = fn( 
    2142                    self, cursor, statement, parameters, context, False 
    2143                ) 
    2144 
    2145        if self._echo: 
    2146            self._log_info(statement) 
    2147            self._log_info("[raw sql] %r", parameters) 
    2148        try: 
    2149            for fn in ( 
    2150                () 
    2151                if not self.dialect._has_events 
    2152                else self.dialect.dispatch.do_execute 
    2153            ): 
    2154                if fn(cursor, statement, parameters, context): 
    2155                    break 
    2156            else: 
    2157                self.dialect.do_execute(cursor, statement, parameters, context) 
    2158        except BaseException as e: 
    2159            self._handle_dbapi_exception( 
    2160                e, statement, parameters, cursor, context 
    2161            ) 
    2162 
    2163        if self._has_events or self.engine._has_events: 
    2164            self.dispatch.after_cursor_execute( 
    2165                self, cursor, statement, parameters, context, False 
    2166            ) 
    2167 
    2168    def _safe_close_cursor(self, cursor: DBAPICursor) -> None: 
    2169        """Close the given cursor, catching exceptions 
    2170        and turning into log warnings. 
    2171 
    2172        """ 
    2173        try: 
    2174            cursor.close() 
    2175        except Exception: 
    2176            # log the error through the connection pool's logger. 
    2177            self.engine.pool.logger.error( 
    2178                "Error closing cursor", exc_info=True 
    2179            ) 
    2180 
    2181    _reentrant_error = False 
    2182    _is_disconnect = False 
    2183 
    2184    def _handle_dbapi_exception( 
    2185        self, 
    2186        e: BaseException, 
    2187        statement: Optional[str], 
    2188        parameters: Optional[_AnyExecuteParams], 
    2189        cursor: Optional[DBAPICursor], 
    2190        context: Optional[ExecutionContext], 
    2191        is_sub_exec: bool = False, 
    2192    ) -> NoReturn: 
    2193        exc_info = sys.exc_info() 
    2194 
    2195        is_exit_exception = util.is_exit_exception(e) 
    2196 
    2197        if not self._is_disconnect: 
    2198            self._is_disconnect = ( 
    2199                isinstance(e, self.dialect.loaded_dbapi.Error) 
    2200                and not self.closed 
    2201                and self.dialect.is_disconnect( 
    2202                    e, 
    2203                    self._dbapi_connection if not self.invalidated else None, 
    2204                    cursor, 
    2205                ) 
    2206            ) or (is_exit_exception and not self.closed) 
    2207 
    2208        invalidate_pool_on_disconnect = not is_exit_exception 
    2209 
    2210        ismulti: bool = ( 
    2211            not is_sub_exec and context.executemany 
    2212            if context is not None 
    2213            else False 
    2214        ) 
    2215        if self._reentrant_error: 
    2216            raise exc.DBAPIError.instance( 
    2217                statement, 
    2218                parameters, 
    2219                e, 
    2220                self.dialect.loaded_dbapi.Error, 
    2221                hide_parameters=self.engine.hide_parameters, 
    2222                dialect=self.dialect, 
    2223                ismulti=ismulti, 
    2224            ).with_traceback(exc_info[2]) from e 
    2225        self._reentrant_error = True 
    2226        try: 
    2227            # non-DBAPI error - if we already got a context, 
    2228            # or there's no string statement, don't wrap it 
    2229            should_wrap = isinstance(e, self.dialect.loaded_dbapi.Error) or ( 
    2230                statement is not None 
    2231                and context is None 
    2232                and not is_exit_exception 
    2233            ) 
    2234 
    2235            if should_wrap: 
    2236                sqlalchemy_exception = exc.DBAPIError.instance( 
    2237                    statement, 
    2238                    parameters, 
    2239                    cast(Exception, e), 
    2240                    self.dialect.loaded_dbapi.Error, 
    2241                    hide_parameters=self.engine.hide_parameters, 
    2242                    connection_invalidated=self._is_disconnect, 
    2243                    dialect=self.dialect, 
    2244                    ismulti=ismulti, 
    2245                ) 
    2246            else: 
    2247                sqlalchemy_exception = None 
    2248 
    2249            newraise = None 
    2250 
    2251            if (self.dialect._has_events) and not self._execution_options.get( 
    2252                "skip_user_error_events", False 
    2253            ): 
    2254                ctx = ExceptionContextImpl( 
    2255                    e, 
    2256                    sqlalchemy_exception, 
    2257                    self.engine, 
    2258                    self.dialect, 
    2259                    self, 
    2260                    cursor, 
    2261                    statement, 
    2262                    parameters, 
    2263                    context, 
    2264                    self._is_disconnect, 
    2265                    invalidate_pool_on_disconnect, 
    2266                    False, 
    2267                ) 
    2268 
    2269                for fn in self.dialect.dispatch.handle_error: 
    2270                    try: 
    2271                        # handler returns an exception; 
    2272                        # call next handler in a chain 
    2273                        per_fn = fn(ctx) 
    2274                        if per_fn is not None: 
    2275                            ctx.chained_exception = newraise = per_fn 
    2276                    except Exception as _raised: 
    2277                        # handler raises an exception - stop processing 
    2278                        newraise = _raised 
    2279                        break 
    2280 
    2281                if self._is_disconnect != ctx.is_disconnect: 
    2282                    self._is_disconnect = ctx.is_disconnect 
    2283                    if sqlalchemy_exception: 
    2284                        sqlalchemy_exception.connection_invalidated = ( 
    2285                            ctx.is_disconnect 
    2286                        ) 
    2287 
    2288                # set up potentially user-defined value for 
    2289                # invalidate pool. 
    2290                invalidate_pool_on_disconnect = ( 
    2291                    ctx.invalidate_pool_on_disconnect 
    2292                ) 
    2293 
    2294            if should_wrap and context: 
    2295                context.handle_dbapi_exception(e) 
    2296 
    2297            if not self._is_disconnect: 
    2298                if cursor: 
    2299                    self._safe_close_cursor(cursor) 
    2300                # "autorollback" was mostly relevant in 1.x series. 
    2301                # It's very unlikely to reach here, as the connection 
    2302                # does autobegin so when we are here, we are usually 
    2303                # in an explicit / semi-explicit transaction. 
    2304                # however we have a test which manufactures this 
    2305                # scenario in any case using an event handler. 
    2306                # test/engine/test_execute.py-> test_actual_autorollback 
    2307                if not self.in_transaction(): 
    2308                    self._rollback_impl() 
    2309 
    2310            if newraise: 
    2311                raise newraise.with_traceback(exc_info[2]) from e 
    2312            elif should_wrap: 
    2313                assert sqlalchemy_exception is not None 
    2314                raise sqlalchemy_exception.with_traceback(exc_info[2]) from e 
    2315            else: 
    2316                assert exc_info[1] is not None 
    2317                raise exc_info[1].with_traceback(exc_info[2]) 
    2318        finally: 
    2319            del self._reentrant_error 
    2320            if self._is_disconnect: 
    2321                del self._is_disconnect 
    2322                if not self.invalidated: 
    2323                    dbapi_conn_wrapper = self._dbapi_connection 
    2324                    assert dbapi_conn_wrapper is not None 
    2325                    if invalidate_pool_on_disconnect: 
    2326                        self.engine.pool._invalidate(dbapi_conn_wrapper, e) 
    2327                    self.invalidate(e) 
    2328 
    2329    @classmethod 
    2330    def _handle_dbapi_exception_noconnection( 
    2331        cls, 
    2332        e: BaseException, 
    2333        dialect: Dialect, 
    2334        engine: Optional[Engine] = None, 
    2335        is_disconnect: Optional[bool] = None, 
    2336        invalidate_pool_on_disconnect: bool = True, 
    2337        is_pre_ping: bool = False, 
    2338    ) -> NoReturn: 
    2339        exc_info = sys.exc_info() 
    2340 
    2341        if is_disconnect is None: 
    2342            is_disconnect = isinstance( 
    2343                e, dialect.loaded_dbapi.Error 
    2344            ) and dialect.is_disconnect(e, None, None) 
    2345 
    2346        should_wrap = isinstance(e, dialect.loaded_dbapi.Error) 
    2347 
    2348        if should_wrap: 
    2349            sqlalchemy_exception = exc.DBAPIError.instance( 
    2350                None, 
    2351                None, 
    2352                cast(Exception, e), 
    2353                dialect.loaded_dbapi.Error, 
    2354                hide_parameters=( 
    2355                    engine.hide_parameters if engine is not None else False 
    2356                ), 
    2357                connection_invalidated=is_disconnect, 
    2358                dialect=dialect, 
    2359            ) 
    2360        else: 
    2361            sqlalchemy_exception = None 
    2362 
    2363        newraise = None 
    2364 
    2365        if dialect._has_events: 
    2366            ctx = ExceptionContextImpl( 
    2367                e, 
    2368                sqlalchemy_exception, 
    2369                engine, 
    2370                dialect, 
    2371                None, 
    2372                None, 
    2373                None, 
    2374                None, 
    2375                None, 
    2376                is_disconnect, 
    2377                invalidate_pool_on_disconnect, 
    2378                is_pre_ping, 
    2379            ) 
    2380            for fn in dialect.dispatch.handle_error: 
    2381                try: 
    2382                    # handler returns an exception; 
    2383                    # call next handler in a chain 
    2384                    per_fn = fn(ctx) 
    2385                    if per_fn is not None: 
    2386                        ctx.chained_exception = newraise = per_fn 
    2387                except Exception as _raised: 
    2388                    # handler raises an exception - stop processing 
    2389                    newraise = _raised 
    2390                    break 
    2391 
    2392            if sqlalchemy_exception and is_disconnect != ctx.is_disconnect: 
    2393                sqlalchemy_exception.connection_invalidated = ctx.is_disconnect 
    2394 
    2395        if newraise: 
    2396            raise newraise.with_traceback(exc_info[2]) from e 
    2397        elif should_wrap: 
    2398            assert sqlalchemy_exception is not None 
    2399            raise sqlalchemy_exception.with_traceback(exc_info[2]) from e 
    2400        else: 
    2401            assert exc_info[1] is not None 
    2402            raise exc_info[1].with_traceback(exc_info[2]) 
    2403 
    2404    def _run_ddl_visitor( 
    2405        self, 
    2406        visitorcallable: Type[InvokeDDLBase], 
    2407        element: SchemaVisitable, 
    2408        **kwargs: Any, 
    2409    ) -> None: 
    2410        """run a DDL visitor. 
    2411 
    2412        This method is only here so that the MockConnection can change the 
    2413        options given to the visitor so that "checkfirst" is skipped. 
    2414 
    2415        """ 
    2416        visitorcallable( 
    2417            dialect=self.dialect, connection=self, **kwargs 
    2418        ).traverse_single(element) 
    2419 
    2420 
    2421class ExceptionContextImpl(ExceptionContext): 
    2422    """Implement the :class:`.ExceptionContext` interface.""" 
    2423 
    2424    __slots__ = ( 
    2425        "connection", 
    2426        "engine", 
    2427        "dialect", 
    2428        "cursor", 
    2429        "statement", 
    2430        "parameters", 
    2431        "original_exception", 
    2432        "sqlalchemy_exception", 
    2433        "chained_exception", 
    2434        "execution_context", 
    2435        "is_disconnect", 
    2436        "invalidate_pool_on_disconnect", 
    2437        "is_pre_ping", 
    2438    ) 
    2439 
    2440    def __init__( 
    2441        self, 
    2442        exception: BaseException, 
    2443        sqlalchemy_exception: Optional[exc.StatementError], 
    2444        engine: Optional[Engine], 
    2445        dialect: Dialect, 
    2446        connection: Optional[Connection], 
    2447        cursor: Optional[DBAPICursor], 
    2448        statement: Optional[str], 
    2449        parameters: Optional[_DBAPIAnyExecuteParams], 
    2450        context: Optional[ExecutionContext], 
    2451        is_disconnect: bool, 
    2452        invalidate_pool_on_disconnect: bool, 
    2453        is_pre_ping: bool, 
    2454    ): 
    2455        self.engine = engine 
    2456        self.dialect = dialect 
    2457        self.connection = connection 
    2458        self.sqlalchemy_exception = sqlalchemy_exception 
    2459        self.original_exception = exception 
    2460        self.execution_context = context 
    2461        self.statement = statement 
    2462        self.parameters = parameters 
    2463        self.is_disconnect = is_disconnect 
    2464        self.invalidate_pool_on_disconnect = invalidate_pool_on_disconnect 
    2465        self.is_pre_ping = is_pre_ping 
    2466 
    2467 
    2468class Transaction(TransactionalContext): 
    2469    """Represent a database transaction in progress. 
    2470 
    2471    The :class:`.Transaction` object is procured by 
    2472    calling the :meth:`_engine.Connection.begin` method of 
    2473    :class:`_engine.Connection`:: 
    2474 
    2475        from sqlalchemy import create_engine 
    2476 
    2477        engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test") 
    2478        connection = engine.connect() 
    2479        trans = connection.begin() 
    2480        connection.execute(text("insert into x (a, b) values (1, 2)")) 
    2481        trans.commit() 
    2482 
    2483    The object provides :meth:`.rollback` and :meth:`.commit` 
    2484    methods in order to control transaction boundaries.  It 
    2485    also implements a context manager interface so that 
    2486    the Python ``with`` statement can be used with the 
    2487    :meth:`_engine.Connection.begin` method:: 
    2488 
    2489        with connection.begin(): 
    2490            connection.execute(text("insert into x (a, b) values (1, 2)")) 
    2491 
    2492    The Transaction object is **not** threadsafe. 
    2493 
    2494    .. seealso:: 
    2495 
    2496        :meth:`_engine.Connection.begin` 
    2497 
    2498        :meth:`_engine.Connection.begin_twophase` 
    2499 
    2500        :meth:`_engine.Connection.begin_nested` 
    2501 
    2502    .. index:: 
    2503      single: thread safety; Transaction 
    2504    """  # noqa 
    2505 
    2506    __slots__ = () 
    2507 
    2508    _is_root: bool = False 
    2509    is_active: bool 
    2510    connection: Connection 
    2511 
    2512    def __init__(self, connection: Connection): 
    2513        raise NotImplementedError() 
    2514 
    2515    @property 
    2516    def _deactivated_from_connection(self) -> bool: 
    2517        """True if this transaction is totally deactivated from the connection 
    2518        and therefore can no longer affect its state. 
    2519 
    2520        """ 
    2521        raise NotImplementedError() 
    2522 
    2523    def _do_close(self) -> None: 
    2524        raise NotImplementedError() 
    2525 
    2526    def _do_rollback(self) -> None: 
    2527        raise NotImplementedError() 
    2528 
    2529    def _do_commit(self) -> None: 
    2530        raise NotImplementedError() 
    2531 
    2532    @property 
    2533    def is_valid(self) -> bool: 
    2534        return self.is_active and not self.connection.invalidated 
    2535 
    2536    def close(self) -> None: 
    2537        """Close this :class:`.Transaction`. 
    2538 
    2539        If this transaction is the base transaction in a begin/commit 
    2540        nesting, the transaction will rollback().  Otherwise, the 
    2541        method returns. 
    2542 
    2543        This is used to cancel a Transaction without affecting the scope of 
    2544        an enclosing transaction. 
    2545 
    2546        """ 
    2547        try: 
    2548            self._do_close() 
    2549        finally: 
    2550            assert not self.is_active 
    2551 
    2552    def rollback(self) -> None: 
    2553        """Roll back this :class:`.Transaction`. 
    2554 
    2555        The implementation of this may vary based on the type of transaction in 
    2556        use: 
    2557 
    2558        * For a simple database transaction (e.g. :class:`.RootTransaction`), 
    2559          it corresponds to a ROLLBACK. 
    2560 
    2561        * For a :class:`.NestedTransaction`, it corresponds to a 
    2562          "ROLLBACK TO SAVEPOINT" operation. 
    2563 
    2564        * For a :class:`.TwoPhaseTransaction`, DBAPI-specific methods for two 
    2565          phase transactions may be used. 
    2566 
    2567 
    2568        """ 
    2569        try: 
    2570            self._do_rollback() 
    2571        finally: 
    2572            assert not self.is_active 
    2573 
    2574    def commit(self) -> None: 
    2575        """Commit this :class:`.Transaction`. 
    2576 
    2577        The implementation of this may vary based on the type of transaction in 
    2578        use: 
    2579 
    2580        * For a simple database transaction (e.g. :class:`.RootTransaction`), 
    2581          it corresponds to a COMMIT. 
    2582 
    2583        * For a :class:`.NestedTransaction`, it corresponds to a 
    2584          "RELEASE SAVEPOINT" operation. 
    2585 
    2586        * For a :class:`.TwoPhaseTransaction`, DBAPI-specific methods for two 
    2587          phase transactions may be used. 
    2588 
    2589        """ 
    2590        try: 
    2591            self._do_commit() 
    2592        finally: 
    2593            assert not self.is_active 
    2594 
    2595    def _get_subject(self) -> Connection: 
    2596        return self.connection 
    2597 
    2598    def _transaction_is_active(self) -> bool: 
    2599        return self.is_active 
    2600 
    2601    def _transaction_is_closed(self) -> bool: 
    2602        return not self._deactivated_from_connection 
    2603 
    2604    def _rollback_can_be_called(self) -> bool: 
    2605        # for RootTransaction / NestedTransaction, it's safe to call 
    2606        # rollback() even if the transaction is deactive and no warnings 
    2607        # will be emitted.  tested in 
    2608        # test_transaction.py -> test_no_rollback_in_deactive(?:_savepoint)? 
    2609        return True 
    2610 
    2611 
    2612class RootTransaction(Transaction): 
    2613    """Represent the "root" transaction on a :class:`_engine.Connection`. 
    2614 
    2615    This corresponds to the current "BEGIN/COMMIT/ROLLBACK" that's occurring 
    2616    for the :class:`_engine.Connection`. The :class:`_engine.RootTransaction` 
    2617    is created by calling upon the :meth:`_engine.Connection.begin` method, and 
    2618    remains associated with the :class:`_engine.Connection` throughout its 
    2619    active span. The current :class:`_engine.RootTransaction` in use is 
    2620    accessible via the :attr:`_engine.Connection.get_transaction` method of 
    2621    :class:`_engine.Connection`. 
    2622 
    2623    In :term:`2.0 style` use, the :class:`_engine.Connection` also employs 
    2624    "autobegin" behavior that will create a new 
    2625    :class:`_engine.RootTransaction` whenever a connection in a 
    2626    non-transactional state is used to emit commands on the DBAPI connection. 
    2627    The scope of the :class:`_engine.RootTransaction` in 2.0 style 
    2628    use can be controlled using the :meth:`_engine.Connection.commit` and 
    2629    :meth:`_engine.Connection.rollback` methods. 
    2630 
    2631 
    2632    """ 
    2633 
    2634    _is_root = True 
    2635 
    2636    __slots__ = ("connection", "is_active") 
    2637 
    2638    def __init__(self, connection: Connection): 
    2639        assert connection._transaction is None 
    2640        if connection._trans_context_manager: 
    2641            TransactionalContext._trans_ctx_check(connection) 
    2642        self.connection = connection 
    2643        self._connection_begin_impl() 
    2644        connection._transaction = self 
    2645 
    2646        self.is_active = True 
    2647 
    2648    def _deactivate_from_connection(self) -> None: 
    2649        if self.is_active: 
    2650            assert self.connection._transaction is self 
    2651            self.is_active = False 
    2652 
    2653        elif self.connection._transaction is not self: 
    2654            util.warn("transaction already deassociated from connection") 
    2655 
    2656    @property 
    2657    def _deactivated_from_connection(self) -> bool: 
    2658        return self.connection._transaction is not self 
    2659 
    2660    def _connection_begin_impl(self) -> None: 
    2661        self.connection._begin_impl(self) 
    2662 
    2663    def _connection_rollback_impl(self) -> None: 
    2664        self.connection._rollback_impl() 
    2665 
    2666    def _connection_commit_impl(self) -> None: 
    2667        self.connection._commit_impl() 
    2668 
    2669    def _close_impl(self, try_deactivate: bool = False) -> None: 
    2670        try: 
    2671            if self.is_active: 
    2672                self._connection_rollback_impl() 
    2673 
    2674            if self.connection._nested_transaction: 
    2675                self.connection._nested_transaction._cancel() 
    2676        finally: 
    2677            if self.is_active or try_deactivate: 
    2678                self._deactivate_from_connection() 
    2679            if self.connection._transaction is self: 
    2680                self.connection._transaction = None 
    2681 
    2682        assert not self.is_active 
    2683        assert self.connection._transaction is not self 
    2684 
    2685    def _do_close(self) -> None: 
    2686        self._close_impl() 
    2687 
    2688    def _do_rollback(self) -> None: 
    2689        self._close_impl(try_deactivate=True) 
    2690 
    2691    def _do_commit(self) -> None: 
    2692        if self.is_active: 
    2693            assert self.connection._transaction is self 
    2694 
    2695            try: 
    2696                self._connection_commit_impl() 
    2697            finally: 
    2698                # whether or not commit succeeds, cancel any 
    2699                # nested transactions, make this transaction "inactive" 
    2700                # and remove it as a reset agent 
    2701                if self.connection._nested_transaction: 
    2702                    self.connection._nested_transaction._cancel() 
    2703 
    2704                self._deactivate_from_connection() 
    2705 
    2706            # ...however only remove as the connection's current transaction 
    2707            # if commit succeeded.  otherwise it stays on so that a rollback 
    2708            # needs to occur. 
    2709            self.connection._transaction = None 
    2710        else: 
    2711            if self.connection._transaction is self: 
    2712                self.connection._invalid_transaction() 
    2713            else: 
    2714                raise exc.InvalidRequestError("This transaction is inactive") 
    2715 
    2716        assert not self.is_active 
    2717        assert self.connection._transaction is not self 
    2718 
    2719 
    2720class NestedTransaction(Transaction): 
    2721    """Represent a 'nested', or SAVEPOINT transaction. 
    2722 
    2723    The :class:`.NestedTransaction` object is created by calling the 
    2724    :meth:`_engine.Connection.begin_nested` method of 
    2725    :class:`_engine.Connection`. 
    2726 
    2727    When using :class:`.NestedTransaction`, the semantics of "begin" / 
    2728    "commit" / "rollback" are as follows: 
    2729 
    2730    * the "begin" operation corresponds to the "BEGIN SAVEPOINT" command, where 
    2731      the savepoint is given an explicit name that is part of the state 
    2732      of this object. 
    2733 
    2734    * The :meth:`.NestedTransaction.commit` method corresponds to a 
    2735      "RELEASE SAVEPOINT" operation, using the savepoint identifier associated 
    2736      with this :class:`.NestedTransaction`. 
    2737 
    2738    * The :meth:`.NestedTransaction.rollback` method corresponds to a 
    2739      "ROLLBACK TO SAVEPOINT" operation, using the savepoint identifier 
    2740      associated with this :class:`.NestedTransaction`. 
    2741 
    2742    The rationale for mimicking the semantics of an outer transaction in 
    2743    terms of savepoints so that code may deal with a "savepoint" transaction 
    2744    and an "outer" transaction in an agnostic way. 
    2745 
    2746    .. seealso:: 
    2747 
    2748        :ref:`session_begin_nested` - ORM version of the SAVEPOINT API. 
    2749 
    2750    """ 
    2751 
    2752    __slots__ = ("connection", "is_active", "_savepoint", "_previous_nested") 
    2753 
    2754    _savepoint: str 
    2755 
    2756    def __init__(self, connection: Connection): 
    2757        assert connection._transaction is not None 
    2758        if connection._trans_context_manager: 
    2759            TransactionalContext._trans_ctx_check(connection) 
    2760        self.connection = connection 
    2761        self._savepoint = self.connection._savepoint_impl() 
    2762        self.is_active = True 
    2763        self._previous_nested = connection._nested_transaction 
    2764        connection._nested_transaction = self 
    2765 
    2766    def _deactivate_from_connection(self, warn: bool = True) -> None: 
    2767        if self.connection._nested_transaction is self: 
    2768            self.connection._nested_transaction = self._previous_nested 
    2769        elif warn: 
    2770            util.warn( 
    2771                "nested transaction already deassociated from connection" 
    2772            ) 
    2773 
    2774    @property 
    2775    def _deactivated_from_connection(self) -> bool: 
    2776        return self.connection._nested_transaction is not self 
    2777 
    2778    def _cancel(self) -> None: 
    2779        # called by RootTransaction when the outer transaction is 
    2780        # committed, rolled back, or closed to cancel all savepoints 
    2781        # without any action being taken 
    2782        self.is_active = False 
    2783        self._deactivate_from_connection() 
    2784        if self._previous_nested: 
    2785            self._previous_nested._cancel() 
    2786 
    2787    def _close_impl( 
    2788        self, deactivate_from_connection: bool, warn_already_deactive: bool 
    2789    ) -> None: 
    2790        try: 
    2791            if ( 
    2792                self.is_active 
    2793                and self.connection._transaction 
    2794                and self.connection._transaction.is_active 
    2795            ): 
    2796                self.connection._rollback_to_savepoint_impl(self._savepoint) 
    2797        finally: 
    2798            self.is_active = False 
    2799 
    2800            if deactivate_from_connection: 
    2801                self._deactivate_from_connection(warn=warn_already_deactive) 
    2802 
    2803        assert not self.is_active 
    2804        if deactivate_from_connection: 
    2805            assert self.connection._nested_transaction is not self 
    2806 
    2807    def _do_close(self) -> None: 
    2808        self._close_impl(True, False) 
    2809 
    2810    def _do_rollback(self) -> None: 
    2811        self._close_impl(True, True) 
    2812 
    2813    def _do_commit(self) -> None: 
    2814        if self.is_active: 
    2815            try: 
    2816                self.connection._release_savepoint_impl(self._savepoint) 
    2817            finally: 
    2818                # nested trans becomes inactive on failed release 
    2819                # unconditionally.  this prevents it from trying to 
    2820                # emit SQL when it rolls back. 
    2821                self.is_active = False 
    2822 
    2823            # but only de-associate from connection if it succeeded 
    2824            self._deactivate_from_connection() 
    2825        else: 
    2826            if self.connection._nested_transaction is self: 
    2827                self.connection._invalid_transaction() 
    2828            else: 
    2829                raise exc.InvalidRequestError( 
    2830                    "This nested transaction is inactive" 
    2831                ) 
    2832 
    2833 
    2834class TwoPhaseTransaction(RootTransaction): 
    2835    """Represent a two-phase transaction. 
    2836 
    2837    A new :class:`.TwoPhaseTransaction` object may be procured 
    2838    using the :meth:`_engine.Connection.begin_twophase` method. 
    2839 
    2840    The interface is the same as that of :class:`.Transaction` 
    2841    with the addition of the :meth:`prepare` method. 
    2842 
    2843    """ 
    2844 
    2845    __slots__ = ("xid", "_is_prepared") 
    2846 
    2847    xid: Any 
    2848 
    2849    def __init__(self, connection: Connection, xid: Any): 
    2850        self._is_prepared = False 
    2851        self.xid = xid 
    2852        super().__init__(connection) 
    2853 
    2854    def prepare(self) -> None: 
    2855        """Prepare this :class:`.TwoPhaseTransaction`. 
    2856 
    2857        After a PREPARE, the transaction can be committed. 
    2858 
    2859        """ 
    2860        if not self.is_active: 
    2861            raise exc.InvalidRequestError("This transaction is inactive") 
    2862        self.connection._prepare_twophase_impl(self.xid) 
    2863        self._is_prepared = True 
    2864 
    2865    def _connection_begin_impl(self) -> None: 
    2866        self.connection._begin_twophase_impl(self) 
    2867 
    2868    def _connection_rollback_impl(self) -> None: 
    2869        self.connection._rollback_twophase_impl(self.xid, self._is_prepared) 
    2870 
    2871    def _connection_commit_impl(self) -> None: 
    2872        self.connection._commit_twophase_impl(self.xid, self._is_prepared) 
    2873 
    2874 
    2875class Engine( 
    2876    ConnectionEventsTarget, log.Identified, inspection.Inspectable["Inspector"] 
    2877): 
    2878    """ 
    2879    Connects a :class:`~sqlalchemy.pool.Pool` and 
    2880    :class:`~sqlalchemy.engine.interfaces.Dialect` together to provide a 
    2881    source of database connectivity and behavior. 
    2882 
    2883    An :class:`_engine.Engine` object is instantiated publicly using the 
    2884    :func:`~sqlalchemy.create_engine` function. 
    2885 
    2886    .. seealso:: 
    2887 
    2888        :doc:`/core/engines` 
    2889 
    2890        :ref:`connections_toplevel` 
    2891 
    2892    """ 
    2893 
    2894    dispatch: dispatcher[ConnectionEventsTarget] 
    2895 
    2896    _compiled_cache: Optional[CompiledCacheType] 
    2897 
    2898    _execution_options: _ExecuteOptions = _EMPTY_EXECUTION_OPTS 
    2899    _has_events: bool = False 
    2900    _connection_cls: Type[Connection] = Connection 
    2901    _sqla_logger_namespace: str = "sqlalchemy.engine.Engine" 
    2902    _is_future: bool = False 
    2903 
    2904    _schema_translate_map: Optional[SchemaTranslateMapType] = None 
    2905    _option_cls: Type[OptionEngine] 
    2906 
    2907    dialect: Dialect 
    2908    pool: Pool 
    2909    url: URL 
    2910    hide_parameters: bool 
    2911 
    2912    def __init__( 
    2913        self, 
    2914        pool: Pool, 
    2915        dialect: Dialect, 
    2916        url: URL, 
    2917        logging_name: Optional[str] = None, 
    2918        echo: Optional[_EchoFlagType] = None, 
    2919        query_cache_size: int = 500, 
    2920        execution_options: Optional[Mapping[str, Any]] = None, 
    2921        hide_parameters: bool = False, 
    2922    ): 
    2923        self.pool = pool 
    2924        self.url = url 
    2925        self.dialect = dialect 
    2926        if logging_name: 
    2927            self.logging_name = logging_name 
    2928        self.echo = echo 
    2929        self.hide_parameters = hide_parameters 
    2930        if query_cache_size != 0: 
    2931            self._compiled_cache = util.LRUCache( 
    2932                query_cache_size, size_alert=self._lru_size_alert 
    2933            ) 
    2934        else: 
    2935            self._compiled_cache = None 
    2936        log.instance_logger(self, echoflag=echo) 
    2937        if execution_options: 
    2938            self.update_execution_options(**execution_options) 
    2939 
    2940    def _lru_size_alert(self, cache: util.LRUCache[Any, Any]) -> None: 
    2941        if self._should_log_info(): 
    2942            self.logger.info( 
    2943                "Compiled cache size pruning from %d items to %d.  " 
    2944                "Increase cache size to reduce the frequency of pruning.", 
    2945                len(cache), 
    2946                cache.capacity, 
    2947            ) 
    2948 
    2949    @property 
    2950    def engine(self) -> Engine: 
    2951        """Returns this :class:`.Engine`. 
    2952 
    2953        Used for legacy schemes that accept :class:`.Connection` / 
    2954        :class:`.Engine` objects within the same variable. 
    2955 
    2956        """ 
    2957        return self 
    2958 
    2959    def clear_compiled_cache(self) -> None: 
    2960        """Clear the compiled cache associated with the dialect. 
    2961 
    2962        This applies **only** to the built-in cache that is established 
    2963        via the :paramref:`_engine.create_engine.query_cache_size` parameter. 
    2964        It will not impact any dictionary caches that were passed via the 
    2965        :paramref:`.Connection.execution_options.compiled_cache` parameter. 
    2966 
    2967        .. versionadded:: 1.4 
    2968 
    2969        """ 
    2970        if self._compiled_cache: 
    2971            self._compiled_cache.clear() 
    2972 
    2973    def update_execution_options(self, **opt: Any) -> None: 
    2974        r"""Update the default execution_options dictionary 
    2975        of this :class:`_engine.Engine`. 
    2976 
    2977        The given keys/values in \**opt are added to the 
    2978        default execution options that will be used for 
    2979        all connections.  The initial contents of this dictionary 
    2980        can be sent via the ``execution_options`` parameter 
    2981        to :func:`_sa.create_engine`. 
    2982 
    2983        .. seealso:: 
    2984 
    2985            :meth:`_engine.Connection.execution_options` 
    2986 
    2987            :meth:`_engine.Engine.execution_options` 
    2988 
    2989        """ 
    2990        self.dispatch.set_engine_execution_options(self, opt) 
    2991        self._execution_options = self._execution_options.union(opt) 
    2992        self.dialect.set_engine_execution_options(self, opt) 
    2993 
    2994    @overload 
    2995    def execution_options( 
    2996        self, 
    2997        *, 
    2998        compiled_cache: Optional[CompiledCacheType] = ..., 
    2999        logging_token: str = ..., 
    3000        isolation_level: IsolationLevel = ..., 
    3001        insertmanyvalues_page_size: int = ..., 
    3002        schema_translate_map: Optional[SchemaTranslateMapType] = ..., 
    3003        **opt: Any, 
    3004    ) -> OptionEngine: ... 
    3005 
    3006    @overload 
    3007    def execution_options(self, **opt: Any) -> OptionEngine: ... 
    3008 
    3009    def execution_options(self, **opt: Any) -> OptionEngine: 
    3010        """Return a new :class:`_engine.Engine` that will provide 
    3011        :class:`_engine.Connection` objects with the given execution options. 
    3012 
    3013        The returned :class:`_engine.Engine` remains related to the original 
    3014        :class:`_engine.Engine` in that it shares the same connection pool and 
    3015        other state: 
    3016 
    3017        * The :class:`_pool.Pool` used by the new :class:`_engine.Engine` 
    3018          is the 
    3019          same instance.  The :meth:`_engine.Engine.dispose` 
    3020          method will replace 
    3021          the connection pool instance for the parent engine as well 
    3022          as this one. 
    3023        * Event listeners are "cascaded" - meaning, the new 
    3024          :class:`_engine.Engine` 
    3025          inherits the events of the parent, and new events can be associated 
    3026          with the new :class:`_engine.Engine` individually. 
    3027        * The logging configuration and logging_name is copied from the parent 
    3028          :class:`_engine.Engine`. 
    3029 
    3030        The intent of the :meth:`_engine.Engine.execution_options` method is 
    3031        to implement schemes where multiple :class:`_engine.Engine` 
    3032        objects refer to the same connection pool, but are differentiated 
    3033        by options that affect some execution-level behavior for each 
    3034        engine.    One such example is breaking into separate "reader" and 
    3035        "writer" :class:`_engine.Engine` instances, where one 
    3036        :class:`_engine.Engine` 
    3037        has a lower :term:`isolation level` setting configured or is even 
    3038        transaction-disabled using "autocommit".  An example of this 
    3039        configuration is at :ref:`dbapi_autocommit_multiple`. 
    3040 
    3041        Another example is one that 
    3042        uses a custom option ``shard_id`` which is consumed by an event 
    3043        to change the current schema on a database connection:: 
    3044 
    3045            from sqlalchemy import event 
    3046            from sqlalchemy.engine import Engine 
    3047 
    3048            primary_engine = create_engine("mysql+mysqldb://") 
    3049            shard1 = primary_engine.execution_options(shard_id="shard1") 
    3050            shard2 = primary_engine.execution_options(shard_id="shard2") 
    3051 
    3052            shards = {"default": "base", "shard_1": "db1", "shard_2": "db2"} 
    3053 
    3054 
    3055            @event.listens_for(Engine, "before_cursor_execute") 
    3056            def _switch_shard(conn, cursor, stmt, params, context, executemany): 
    3057                shard_id = conn.get_execution_options().get("shard_id", "default") 
    3058                current_shard = conn.info.get("current_shard", None) 
    3059 
    3060                if current_shard != shard_id: 
    3061                    cursor.execute("use %s" % shards[shard_id]) 
    3062                    conn.info["current_shard"] = shard_id 
    3063 
    3064        The above recipe illustrates two :class:`_engine.Engine` objects that 
    3065        will each serve as factories for :class:`_engine.Connection` objects 
    3066        that have pre-established "shard_id" execution options present. A 
    3067        :meth:`_events.ConnectionEvents.before_cursor_execute` event handler 
    3068        then interprets this execution option to emit a MySQL ``use`` statement 
    3069        to switch databases before a statement execution, while at the same 
    3070        time keeping track of which database we've established using the 
    3071        :attr:`_engine.Connection.info` dictionary. 
    3072 
    3073        .. seealso:: 
    3074 
    3075            :meth:`_engine.Connection.execution_options` 
    3076            - update execution options 
    3077            on a :class:`_engine.Connection` object. 
    3078 
    3079            :meth:`_engine.Engine.update_execution_options` 
    3080            - update the execution 
    3081            options for a given :class:`_engine.Engine` in place. 
    3082 
    3083            :meth:`_engine.Engine.get_execution_options` 
    3084 
    3085 
    3086        """  # noqa: E501 
    3087        return self._option_cls(self, opt) 
    3088 
    3089    def get_execution_options(self) -> _ExecuteOptions: 
    3090        """Get the non-SQL options which will take effect during execution. 
    3091 
    3092        .. seealso:: 
    3093 
    3094            :meth:`_engine.Engine.execution_options` 
    3095        """ 
    3096        return self._execution_options 
    3097 
    3098    @property 
    3099    def name(self) -> str: 
    3100        """String name of the :class:`~sqlalchemy.engine.interfaces.Dialect` 
    3101        in use by this :class:`Engine`. 
    3102 
    3103        """ 
    3104 
    3105        return self.dialect.name 
    3106 
    3107    @property 
    3108    def driver(self) -> str: 
    3109        """Driver name of the :class:`~sqlalchemy.engine.interfaces.Dialect` 
    3110        in use by this :class:`Engine`. 
    3111 
    3112        """ 
    3113 
    3114        return self.dialect.driver 
    3115 
    3116    echo = log.echo_property() 
    3117 
    3118    def __repr__(self) -> str: 
    3119        return "Engine(%r)" % (self.url,) 
    3120 
    3121    def dispose(self, close: bool = True) -> None: 
    3122        """Dispose of the connection pool used by this 
    3123        :class:`_engine.Engine`. 
    3124 
    3125        A new connection pool is created immediately after the old one has been 
    3126        disposed. The previous connection pool is disposed either actively, by 
    3127        closing out all currently checked-in connections in that pool, or 
    3128        passively, by losing references to it but otherwise not closing any 
    3129        connections. The latter strategy is more appropriate for an initializer 
    3130        in a forked Python process. 
    3131 
    3132        :param close: if left at its default of ``True``, has the 
    3133         effect of fully closing all **currently checked in** 
    3134         database connections.  Connections that are still checked out 
    3135         will **not** be closed, however they will no longer be associated 
    3136         with this :class:`_engine.Engine`, 
    3137         so when they are closed individually, eventually the 
    3138         :class:`_pool.Pool` which they are associated with will 
    3139         be garbage collected and they will be closed out fully, if 
    3140         not already closed on checkin. 
    3141 
    3142         If set to ``False``, the previous connection pool is de-referenced, 
    3143         and otherwise not touched in any way. 
    3144 
    3145        .. versionadded:: 1.4.33  Added the :paramref:`.Engine.dispose.close` 
    3146            parameter to allow the replacement of a connection pool in a child 
    3147            process without interfering with the connections used by the parent 
    3148            process. 
    3149 
    3150 
    3151        .. seealso:: 
    3152 
    3153            :ref:`engine_disposal` 
    3154 
    3155            :ref:`pooling_multiprocessing` 
    3156 
    3157        """ 
    3158        if close: 
    3159            self.pool.dispose() 
    3160        self.pool = self.pool.recreate() 
    3161        self.dispatch.engine_disposed(self) 
    3162 
    3163    @contextlib.contextmanager 
    3164    def _optional_conn_ctx_manager( 
    3165        self, connection: Optional[Connection] = None 
    3166    ) -> Iterator[Connection]: 
    3167        if connection is None: 
    3168            with self.connect() as conn: 
    3169                yield conn 
    3170        else: 
    3171            yield connection 
    3172 
    3173    @contextlib.contextmanager 
    3174    def begin(self) -> Iterator[Connection]: 
    3175        """Return a context manager delivering a :class:`_engine.Connection` 
    3176        with a :class:`.Transaction` established. 
    3177 
    3178        E.g.:: 
    3179 
    3180            with engine.begin() as conn: 
    3181                conn.execute(text("insert into table (x, y, z) values (1, 2, 3)")) 
    3182                conn.execute(text("my_special_procedure(5)")) 
    3183 
    3184        Upon successful operation, the :class:`.Transaction` 
    3185        is committed.  If an error is raised, the :class:`.Transaction` 
    3186        is rolled back. 
    3187 
    3188        .. seealso:: 
    3189 
    3190            :meth:`_engine.Engine.connect` - procure a 
    3191            :class:`_engine.Connection` from 
    3192            an :class:`_engine.Engine`. 
    3193 
    3194            :meth:`_engine.Connection.begin` - start a :class:`.Transaction` 
    3195            for a particular :class:`_engine.Connection`. 
    3196 
    3197        """  # noqa: E501 
    3198        with self.connect() as conn: 
    3199            with conn.begin(): 
    3200                yield conn 
    3201 
    3202    def _run_ddl_visitor( 
    3203        self, 
    3204        visitorcallable: Type[InvokeDDLBase], 
    3205        element: SchemaVisitable, 
    3206        **kwargs: Any, 
    3207    ) -> None: 
    3208        with self.begin() as conn: 
    3209            conn._run_ddl_visitor(visitorcallable, element, **kwargs) 
    3210 
    3211    def connect(self) -> Connection: 
    3212        """Return a new :class:`_engine.Connection` object. 
    3213 
    3214        The :class:`_engine.Connection` acts as a Python context manager, so 
    3215        the typical use of this method looks like:: 
    3216 
    3217            with engine.connect() as connection: 
    3218                connection.execute(text("insert into table values ('foo')")) 
    3219                connection.commit() 
    3220 
    3221        Where above, after the block is completed, the connection is "closed" 
    3222        and its underlying DBAPI resources are returned to the connection pool. 
    3223        This also has the effect of rolling back any transaction that 
    3224        was explicitly begun or was begun via autobegin, and will 
    3225        emit the :meth:`_events.ConnectionEvents.rollback` event if one was 
    3226        started and is still in progress. 
    3227 
    3228        .. seealso:: 
    3229 
    3230            :meth:`_engine.Engine.begin` 
    3231 
    3232        """ 
    3233 
    3234        return self._connection_cls(self) 
    3235 
    3236    def raw_connection(self) -> PoolProxiedConnection: 
    3237        """Return a "raw" DBAPI connection from the connection pool. 
    3238 
    3239        The returned object is a proxied version of the DBAPI 
    3240        connection object used by the underlying driver in use. 
    3241        The object will have all the same behavior as the real DBAPI 
    3242        connection, except that its ``close()`` method will result in the 
    3243        connection being returned to the pool, rather than being closed 
    3244        for real. 
    3245 
    3246        This method provides direct DBAPI connection access for 
    3247        special situations when the API provided by 
    3248        :class:`_engine.Connection` 
    3249        is not needed.   When a :class:`_engine.Connection` object is already 
    3250        present, the DBAPI connection is available using 
    3251        the :attr:`_engine.Connection.connection` accessor. 
    3252 
    3253        .. seealso:: 
    3254 
    3255            :ref:`dbapi_connections` 
    3256 
    3257        """ 
    3258        return self.pool.connect() 
    3259 
    3260 
    3261class OptionEngineMixin(log.Identified): 
    3262    _sa_propagate_class_events = False 
    3263 
    3264    dispatch: dispatcher[ConnectionEventsTarget] 
    3265    _compiled_cache: Optional[CompiledCacheType] 
    3266    dialect: Dialect 
    3267    pool: Pool 
    3268    url: URL 
    3269    hide_parameters: bool 
    3270    echo: log.echo_property 
    3271 
    3272    def __init__( 
    3273        self, proxied: Engine, execution_options: CoreExecuteOptionsParameter 
    3274    ): 
    3275        self._proxied = proxied 
    3276        self.url = proxied.url 
    3277        self.dialect = proxied.dialect 
    3278        self.logging_name = proxied.logging_name 
    3279        self.echo = proxied.echo 
    3280        self._compiled_cache = proxied._compiled_cache 
    3281        self.hide_parameters = proxied.hide_parameters 
    3282        log.instance_logger(self, echoflag=self.echo) 
    3283 
    3284        # note: this will propagate events that are assigned to the parent 
    3285        # engine after this OptionEngine is created.   Since we share 
    3286        # the events of the parent we also disallow class-level events 
    3287        # to apply to the OptionEngine class directly. 
    3288        # 
    3289        # the other way this can work would be to transfer existing 
    3290        # events only, using: 
    3291        # self.dispatch._update(proxied.dispatch) 
    3292        # 
    3293        # that might be more appropriate however it would be a behavioral 
    3294        # change for logic that assigns events to the parent engine and 
    3295        # would like it to take effect for the already-created sub-engine. 
    3296        self.dispatch = self.dispatch._join(proxied.dispatch) 
    3297 
    3298        self._execution_options = proxied._execution_options 
    3299        self.update_execution_options(**execution_options) 
    3300 
    3301    def update_execution_options(self, **opt: Any) -> None: 
    3302        raise NotImplementedError() 
    3303 
    3304    if not typing.TYPE_CHECKING: 
    3305        # https://github.com/python/typing/discussions/1095 
    3306 
    3307        @property 
    3308        def pool(self) -> Pool: 
    3309            return self._proxied.pool 
    3310 
    3311        @pool.setter 
    3312        def pool(self, pool: Pool) -> None: 
    3313            self._proxied.pool = pool 
    3314 
    3315        @property 
    3316        def _has_events(self) -> bool: 
    3317            return self._proxied._has_events or self.__dict__.get( 
    3318                "_has_events", False 
    3319            ) 
    3320 
    3321        @_has_events.setter 
    3322        def _has_events(self, value: bool) -> None: 
    3323            self.__dict__["_has_events"] = value 
    3324 
    3325 
    3326class OptionEngine(OptionEngineMixin, Engine): 
    3327    def update_execution_options(self, **opt: Any) -> None: 
    3328        Engine.update_execution_options(self, **opt) 
    3329 
    3330 
    3331Engine._option_cls = OptionEngine