1# engine/default.py 
    2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 
    3# <see AUTHORS file> 
    4# 
    5# This module is part of SQLAlchemy and is released under 
    6# the MIT License: https://www.opensource.org/licenses/mit-license.php 
    7# mypy: allow-untyped-defs, allow-untyped-calls 
    8 
    9"""Default implementations of per-dialect sqlalchemy.engine classes. 
    10 
    11These are semi-private implementation classes which are only of importance 
    12to database dialect authors; dialects will usually use the classes here 
    13as the base class for their own corresponding classes. 
    14 
    15""" 
    16 
    17from __future__ import annotations 
    18 
    19import functools 
    20import operator 
    21import random 
    22import re 
    23from time import perf_counter 
    24import typing 
    25from typing import Any 
    26from typing import Callable 
    27from typing import cast 
    28from typing import Dict 
    29from typing import Final 
    30from typing import List 
    31from typing import Literal 
    32from typing import Mapping 
    33from typing import MutableMapping 
    34from typing import MutableSequence 
    35from typing import Optional 
    36from typing import Sequence 
    37from typing import Set 
    38from typing import Tuple 
    39from typing import Type 
    40from typing import TYPE_CHECKING 
    41from typing import Union 
    42import weakref 
    43 
    44from . import characteristics 
    45from . import cursor as _cursor 
    46from . import interfaces 
    47from .base import Connection 
    48from .interfaces import CacheStats 
    49from .interfaces import DBAPICursor 
    50from .interfaces import Dialect 
    51from .interfaces import ExecuteStyle 
    52from .interfaces import ExecutionContext 
    53from .reflection import ObjectKind 
    54from .reflection import ObjectScope 
    55from .. import event 
    56from .. import exc 
    57from .. import pool 
    58from .. import util 
    59from ..sql import compiler 
    60from ..sql import dml 
    61from ..sql import expression 
    62from ..sql import type_api 
    63from ..sql import util as sql_util 
    64from ..sql._typing import is_tuple_type 
    65from ..sql.base import _NoArg 
    66from ..sql.compiler import AggregateOrderByStyle 
    67from ..sql.compiler import DDLCompiler 
    68from ..sql.compiler import InsertmanyvaluesSentinelOpts 
    69from ..sql.compiler import SQLCompiler 
    70from ..sql.elements import quoted_name 
    71from ..util.typing import TupleAny 
    72from ..util.typing import Unpack 
    73 
    74if typing.TYPE_CHECKING: 
    75    from types import ModuleType 
    76 
    77    from .base import Engine 
    78    from .cursor import ResultFetchStrategy 
    79    from .interfaces import _CoreMultiExecuteParams 
    80    from .interfaces import _CoreSingleExecuteParams 
    81    from .interfaces import _DBAPICursorDescription 
    82    from .interfaces import _DBAPIMultiExecuteParams 
    83    from .interfaces import _DBAPISingleExecuteParams 
    84    from .interfaces import _ExecuteOptions 
    85    from .interfaces import _MutableCoreSingleExecuteParams 
    86    from .interfaces import _ParamStyle 
    87    from .interfaces import ConnectArgsType 
    88    from .interfaces import DBAPIConnection 
    89    from .interfaces import DBAPIModule 
    90    from .interfaces import DBAPIType 
    91    from .interfaces import IsolationLevel 
    92    from .row import Row 
    93    from .url import URL 
    94    from ..event import _ListenerFnType 
    95    from ..pool import Pool 
    96    from ..pool import PoolProxiedConnection 
    97    from ..sql import Executable 
    98    from ..sql.compiler import Compiled 
    99    from ..sql.compiler import Linting 
    100    from ..sql.compiler import ResultColumnsEntry 
    101    from ..sql.dml import DMLState 
    102    from ..sql.dml import UpdateBase 
    103    from ..sql.elements import BindParameter 
    104    from ..sql.schema import Column 
    105    from ..sql.type_api import _BindProcessorType 
    106    from ..sql.type_api import _ResultProcessorType 
    107    from ..sql.type_api import TypeEngine 
    108 
    109 
    110# When we're handed literal SQL, ensure it's a SELECT query 
    111SERVER_SIDE_CURSOR_RE = re.compile(r"\s*SELECT", re.I | re.UNICODE) 
    112 
    113 
    114( 
    115    CACHE_HIT, 
    116    CACHE_MISS, 
    117    CACHING_DISABLED, 
    118    NO_CACHE_KEY, 
    119    NO_DIALECT_SUPPORT, 
    120) = list(CacheStats) 
    121 
    122 
    123class DefaultDialect(Dialect): 
    124    """Default implementation of Dialect""" 
    125 
    126    statement_compiler = compiler.SQLCompiler 
    127    ddl_compiler = compiler.DDLCompiler 
    128    type_compiler_cls = compiler.GenericTypeCompiler 
    129 
    130    preparer = compiler.IdentifierPreparer 
    131    supports_alter = True 
    132    supports_comments = False 
    133    supports_constraint_comments = False 
    134    inline_comments = False 
    135    supports_statement_cache = True 
    136 
    137    div_is_floordiv = True 
    138 
    139    bind_typing = interfaces.BindTyping.NONE 
    140 
    141    include_set_input_sizes: Optional[Set[Any]] = None 
    142    exclude_set_input_sizes: Optional[Set[Any]] = None 
    143 
    144    # the first value we'd get for an autoincrement column. 
    145    default_sequence_base = 1 
    146 
    147    # most DBAPIs happy with this for execute(). 
    148    # not cx_oracle. 
    149    execute_sequence_format = tuple 
    150 
    151    supports_schemas = True 
    152    supports_views = True 
    153    supports_sequences = False 
    154    sequences_optional = False 
    155    preexecute_autoincrement_sequences = False 
    156    supports_identity_columns = False 
    157    postfetch_lastrowid = True 
    158    favor_returning_over_lastrowid = False 
    159    insert_null_pk_still_autoincrements = False 
    160    update_returning = False 
    161    delete_returning = False 
    162    update_returning_multifrom = False 
    163    delete_returning_multifrom = False 
    164    insert_returning = False 
    165 
    166    aggregate_order_by_style = AggregateOrderByStyle.INLINE 
    167 
    168    cte_follows_insert = False 
    169 
    170    supports_native_enum = False 
    171    supports_native_boolean = False 
    172    supports_native_uuid = False 
    173    returns_native_bytes = False 
    174 
    175    non_native_boolean_check_constraint = True 
    176 
    177    supports_simple_order_by_label = True 
    178 
    179    tuple_in_values = False 
    180 
    181    connection_characteristics = util.immutabledict( 
    182        { 
    183            "isolation_level": characteristics.IsolationLevelCharacteristic(), 
    184            "logging_token": characteristics.LoggingTokenCharacteristic(), 
    185        } 
    186    ) 
    187 
    188    engine_config_types: Mapping[str, Any] = util.immutabledict( 
    189        { 
    190            "pool_timeout": util.asint, 
    191            "echo": util.bool_or_str("debug"), 
    192            "echo_pool": util.bool_or_str("debug"), 
    193            "pool_recycle": util.asint, 
    194            "pool_size": util.asint, 
    195            "max_overflow": util.asint, 
    196            "future": util.asbool, 
    197        } 
    198    ) 
    199 
    200    # if the NUMERIC type 
    201    # returns decimal.Decimal. 
    202    # *not* the FLOAT type however. 
    203    supports_native_decimal = False 
    204 
    205    name = "default" 
    206 
    207    # length at which to truncate 
    208    # any identifier. 
    209    max_identifier_length = 9999 
    210    _user_defined_max_identifier_length: Optional[int] = None 
    211 
    212    isolation_level: Optional[str] = None 
    213 
    214    # sub-categories of max_identifier_length. 
    215    # currently these accommodate for MySQL which allows alias names 
    216    # of 255 but DDL names only of 64. 
    217    max_index_name_length: Optional[int] = None 
    218    max_constraint_name_length: Optional[int] = None 
    219 
    220    supports_sane_rowcount = True 
    221    supports_sane_multi_rowcount = True 
    222    colspecs: MutableMapping[Type[TypeEngine[Any]], Type[TypeEngine[Any]]] = {} 
    223    default_paramstyle = "named" 
    224 
    225    supports_default_values = False 
    226    """dialect supports INSERT... DEFAULT VALUES syntax""" 
    227 
    228    supports_default_metavalue = False 
    229    """dialect supports INSERT... VALUES (DEFAULT) syntax""" 
    230 
    231    default_metavalue_token = "DEFAULT" 
    232    """for INSERT... VALUES (DEFAULT) syntax, the token to put in the 
    233    parenthesis.""" 
    234 
    235    # not sure if this is a real thing but the compiler will deliver it 
    236    # if this is the only flag enabled. 
    237    supports_empty_insert = True 
    238    """dialect supports INSERT () VALUES ()""" 
    239 
    240    supports_multivalues_insert = False 
    241 
    242    use_insertmanyvalues: bool = False 
    243 
    244    use_insertmanyvalues_wo_returning: bool = False 
    245 
    246    insertmanyvalues_implicit_sentinel: InsertmanyvaluesSentinelOpts = ( 
    247        InsertmanyvaluesSentinelOpts.NOT_SUPPORTED 
    248    ) 
    249 
    250    insertmanyvalues_page_size: int = 1000 
    251    insertmanyvalues_max_parameters = 32700 
    252 
    253    supports_is_distinct_from = True 
    254 
    255    supports_server_side_cursors = False 
    256 
    257    server_side_cursors = False 
    258 
    259    # extra record-level locking features (#4860) 
    260    supports_for_update_of = False 
    261 
    262    server_version_info = None 
    263 
    264    default_schema_name: Optional[str] = None 
    265 
    266    # indicates symbol names are 
    267    # UPPERCASED if they are case insensitive 
    268    # within the database. 
    269    # if this is True, the methods normalize_name() 
    270    # and denormalize_name() must be provided. 
    271    requires_name_normalize = False 
    272 
    273    is_async = False 
    274 
    275    has_terminate = False 
    276 
    277    # TODO: this is not to be part of 2.0.  implement rudimentary binary 
    278    # literals for SQLite, PostgreSQL, MySQL only within 
    279    # _Binary.literal_processor 
    280    _legacy_binary_type_literal_encoding = "utf-8" 
    281 
    282    @util.deprecated_params( 
    283        empty_in_strategy=( 
    284            "1.4", 
    285            "The :paramref:`_sa.create_engine.empty_in_strategy` keyword is " 
    286            "deprecated, and no longer has any effect.  All IN expressions " 
    287            "are now rendered using " 
    288            'the "expanding parameter" strategy which renders a set of bound' 
    289            'expressions, or an "empty set" SELECT, at statement execution' 
    290            "time.", 
    291        ), 
    292        server_side_cursors=( 
    293            "1.4", 
    294            "The :paramref:`_sa.create_engine.server_side_cursors` parameter " 
    295            "is deprecated and will be removed in a future release.  Please " 
    296            "use the " 
    297            ":paramref:`_engine.Connection.execution_options.stream_results` " 
    298            "parameter.", 
    299        ), 
    300    ) 
    301    def __init__( 
    302        self, 
    303        paramstyle: Optional[_ParamStyle] = None, 
    304        isolation_level: Optional[IsolationLevel] = None, 
    305        dbapi: Optional[ModuleType] = None, 
    306        implicit_returning: Literal[True] = True, 
    307        supports_native_boolean: Optional[bool] = None, 
    308        max_identifier_length: Optional[int] = None, 
    309        label_length: Optional[int] = None, 
    310        insertmanyvalues_page_size: Union[_NoArg, int] = _NoArg.NO_ARG, 
    311        use_insertmanyvalues: Optional[bool] = None, 
    312        # util.deprecated_params decorator cannot render the 
    313        # Linting.NO_LINTING constant 
    314        compiler_linting: Linting = int(compiler.NO_LINTING),  # type: ignore 
    315        server_side_cursors: bool = False, 
    316        skip_autocommit_rollback: bool = False, 
    317        **kwargs: Any, 
    318    ): 
    319        if server_side_cursors: 
    320            if not self.supports_server_side_cursors: 
    321                raise exc.ArgumentError( 
    322                    "Dialect %s does not support server side cursors" % self 
    323                ) 
    324            else: 
    325                self.server_side_cursors = True 
    326 
    327        if getattr(self, "use_setinputsizes", False): 
    328            util.warn_deprecated( 
    329                "The dialect-level use_setinputsizes attribute is " 
    330                "deprecated.  Please use " 
    331                "bind_typing = BindTyping.SETINPUTSIZES", 
    332                "2.0", 
    333            ) 
    334            self.bind_typing = interfaces.BindTyping.SETINPUTSIZES 
    335 
    336        self.positional = False 
    337        self._ischema = None 
    338 
    339        self.dbapi = dbapi 
    340 
    341        self.skip_autocommit_rollback = skip_autocommit_rollback 
    342 
    343        if paramstyle is not None: 
    344            self.paramstyle = paramstyle 
    345        elif self.dbapi is not None: 
    346            self.paramstyle = self.dbapi.paramstyle 
    347        else: 
    348            self.paramstyle = self.default_paramstyle 
    349        self.positional = self.paramstyle in ( 
    350            "qmark", 
    351            "format", 
    352            "numeric", 
    353            "numeric_dollar", 
    354        ) 
    355        self.identifier_preparer = self.preparer(self) 
    356        self._on_connect_isolation_level = isolation_level 
    357 
    358        legacy_tt_callable = getattr(self, "type_compiler", None) 
    359        if legacy_tt_callable is not None: 
    360            tt_callable = cast( 
    361                Type[compiler.GenericTypeCompiler], 
    362                self.type_compiler, 
    363            ) 
    364        else: 
    365            tt_callable = self.type_compiler_cls 
    366 
    367        self.type_compiler_instance = self.type_compiler = tt_callable(self) 
    368 
    369        if supports_native_boolean is not None: 
    370            self.supports_native_boolean = supports_native_boolean 
    371 
    372        self._user_defined_max_identifier_length = max_identifier_length 
    373        if self._user_defined_max_identifier_length: 
    374            self.max_identifier_length = ( 
    375                self._user_defined_max_identifier_length 
    376            ) 
    377        self.label_length = label_length 
    378        self.compiler_linting = compiler_linting 
    379 
    380        if use_insertmanyvalues is not None: 
    381            self.use_insertmanyvalues = use_insertmanyvalues 
    382 
    383        if insertmanyvalues_page_size is not _NoArg.NO_ARG: 
    384            self.insertmanyvalues_page_size = insertmanyvalues_page_size 
    385 
    386    @property 
    387    @util.deprecated( 
    388        "2.0", 
    389        "full_returning is deprecated, please use insert_returning, " 
    390        "update_returning, delete_returning", 
    391    ) 
    392    def full_returning(self): 
    393        return ( 
    394            self.insert_returning 
    395            and self.update_returning 
    396            and self.delete_returning 
    397        ) 
    398 
    399    @util.memoized_property 
    400    def insert_executemany_returning(self): 
    401        """Default implementation for insert_executemany_returning, if not 
    402        otherwise overridden by the specific dialect. 
    403 
    404        The default dialect determines "insert_executemany_returning" is 
    405        available if the dialect in use has opted into using the 
    406        "use_insertmanyvalues" feature. If they haven't opted into that, then 
    407        this attribute is False, unless the dialect in question overrides this 
    408        and provides some other implementation (such as the Oracle Database 
    409        dialects). 
    410 
    411        """ 
    412        return self.insert_returning and self.use_insertmanyvalues 
    413 
    414    @util.memoized_property 
    415    def insert_executemany_returning_sort_by_parameter_order(self): 
    416        """Default implementation for 
    417        insert_executemany_returning_deterministic_order, if not otherwise 
    418        overridden by the specific dialect. 
    419 
    420        The default dialect determines "insert_executemany_returning" can have 
    421        deterministic order only if the dialect in use has opted into using the 
    422        "use_insertmanyvalues" feature, which implements deterministic ordering 
    423        using client side sentinel columns only by default.  The 
    424        "insertmanyvalues" feature also features alternate forms that can 
    425        use server-generated PK values as "sentinels", but those are only 
    426        used if the :attr:`.Dialect.insertmanyvalues_implicit_sentinel` 
    427        bitflag enables those alternate SQL forms, which are disabled 
    428        by default. 
    429 
    430        If the dialect in use hasn't opted into that, then this attribute is 
    431        False, unless the dialect in question overrides this and provides some 
    432        other implementation (such as the Oracle Database dialects). 
    433 
    434        """ 
    435        return self.insert_returning and self.use_insertmanyvalues 
    436 
    437    update_executemany_returning = False 
    438    delete_executemany_returning = False 
    439 
    440    @util.memoized_property 
    441    def loaded_dbapi(self) -> DBAPIModule: 
    442        if self.dbapi is None: 
    443            raise exc.InvalidRequestError( 
    444                f"Dialect {self} does not have a Python DBAPI established " 
    445                "and cannot be used for actual database interaction" 
    446            ) 
    447        return self.dbapi 
    448 
    449    @util.memoized_property 
    450    def _bind_typing_render_casts(self): 
    451        return self.bind_typing is interfaces.BindTyping.RENDER_CASTS 
    452 
    453    def _ensure_has_table_connection(self, arg: Connection) -> None: 
    454        if not isinstance(arg, Connection): 
    455            raise exc.ArgumentError( 
    456                "The argument passed to Dialect.has_table() should be a " 
    457                "%s, got %s. " 
    458                "Additionally, the Dialect.has_table() method is for " 
    459                "internal dialect " 
    460                "use only; please use " 
    461                "``inspect(some_engine).has_table(<tablename>>)`` " 
    462                "for public API use." % (Connection, type(arg)) 
    463            ) 
    464 
    465    @util.memoized_property 
    466    def _supports_statement_cache(self): 
    467        ssc = self.__class__.__dict__.get("supports_statement_cache", None) 
    468        if ssc is None: 
    469            util.warn( 
    470                "Dialect %s:%s will not make use of SQL compilation caching " 
    471                "as it does not set the 'supports_statement_cache' attribute " 
    472                "to ``True``.  This can have " 
    473                "significant performance implications including some " 
    474                "performance degradations in comparison to prior SQLAlchemy " 
    475                "versions.  Dialect maintainers should seek to set this " 
    476                "attribute to True after appropriate development and testing " 
    477                "for SQLAlchemy 1.4 caching support.   Alternatively, this " 
    478                "attribute may be set to False which will disable this " 
    479                "warning." % (self.name, self.driver), 
    480                code="cprf", 
    481            ) 
    482 
    483        return bool(ssc) 
    484 
    485    @util.memoized_property 
    486    def _type_memos(self): 
    487        return weakref.WeakKeyDictionary() 
    488 
    489    @property 
    490    def dialect_description(self):  # type: ignore[override] 
    491        return self.name + "+" + self.driver 
    492 
    493    @property 
    494    def supports_sane_rowcount_returning(self): 
    495        """True if this dialect supports sane rowcount even if RETURNING is 
    496        in use. 
    497 
    498        For dialects that don't support RETURNING, this is synonymous with 
    499        ``supports_sane_rowcount``. 
    500 
    501        """ 
    502        return self.supports_sane_rowcount 
    503 
    504    @classmethod 
    505    def get_pool_class(cls, url: URL) -> Type[Pool]: 
    506        default: Type[pool.Pool] 
    507        if cls.is_async: 
    508            default = pool.AsyncAdaptedQueuePool 
    509        else: 
    510            default = pool.QueuePool 
    511 
    512        return getattr(cls, "poolclass", default) 
    513 
    514    def get_dialect_pool_class(self, url: URL) -> Type[Pool]: 
    515        return self.get_pool_class(url) 
    516 
    517    @classmethod 
    518    def load_provisioning(cls): 
    519        package = ".".join(cls.__module__.split(".")[0:-1]) 
    520        try: 
    521            __import__(package + ".provision") 
    522        except ImportError: 
    523            pass 
    524 
    525    def _builtin_onconnect(self) -> Optional[_ListenerFnType]: 
    526        if self._on_connect_isolation_level is not None: 
    527 
    528            def builtin_connect(dbapi_conn, conn_rec): 
    529                self._assert_and_set_isolation_level( 
    530                    dbapi_conn, self._on_connect_isolation_level 
    531                ) 
    532 
    533            return builtin_connect 
    534        else: 
    535            return None 
    536 
    537    def initialize(self, connection: Connection) -> None: 
    538        try: 
    539            self.server_version_info = self._get_server_version_info( 
    540                connection 
    541            ) 
    542        except NotImplementedError: 
    543            self.server_version_info = None 
    544        try: 
    545            self.default_schema_name = self._get_default_schema_name( 
    546                connection 
    547            ) 
    548        except NotImplementedError: 
    549            self.default_schema_name = None 
    550 
    551        try: 
    552            self.default_isolation_level = self.get_default_isolation_level( 
    553                connection.connection.dbapi_connection 
    554            ) 
    555        except NotImplementedError: 
    556            self.default_isolation_level = None 
    557 
    558        if not self._user_defined_max_identifier_length: 
    559            max_ident_length = self._check_max_identifier_length(connection) 
    560            if max_ident_length: 
    561                self.max_identifier_length = max_ident_length 
    562 
    563        if ( 
    564            self.label_length 
    565            and self.label_length > self.max_identifier_length 
    566        ): 
    567            raise exc.ArgumentError( 
    568                "Label length of %d is greater than this dialect's" 
    569                " maximum identifier length of %d" 
    570                % (self.label_length, self.max_identifier_length) 
    571            ) 
    572 
    573    def on_connect(self) -> Optional[Callable[[Any], None]]: 
    574        # inherits the docstring from interfaces.Dialect.on_connect 
    575        return None 
    576 
    577    def _check_max_identifier_length(self, connection): 
    578        """Perform a connection / server version specific check to determine 
    579        the max_identifier_length. 
    580 
    581        If the dialect's class level max_identifier_length should be used, 
    582        can return None. 
    583 
    584        """ 
    585        return None 
    586 
    587    def get_default_isolation_level(self, dbapi_conn): 
    588        """Given a DBAPI connection, return its isolation level, or 
    589        a default isolation level if one cannot be retrieved. 
    590 
    591        May be overridden by subclasses in order to provide a 
    592        "fallback" isolation level for databases that cannot reliably 
    593        retrieve the actual isolation level. 
    594 
    595        By default, calls the :meth:`_engine.Interfaces.get_isolation_level` 
    596        method, propagating any exceptions raised. 
    597 
    598        """ 
    599        return self.get_isolation_level(dbapi_conn) 
    600 
    601    def type_descriptor(self, typeobj): 
    602        """Provide a database-specific :class:`.TypeEngine` object, given 
    603        the generic object which comes from the types module. 
    604 
    605        This method looks for a dictionary called 
    606        ``colspecs`` as a class or instance-level variable, 
    607        and passes on to :func:`_types.adapt_type`. 
    608 
    609        """ 
    610        return type_api.adapt_type(typeobj, self.colspecs) 
    611 
    612    def has_index(self, connection, table_name, index_name, schema=None, **kw): 
    613        if not self.has_table(connection, table_name, schema=schema, **kw): 
    614            return False 
    615        for idx in self.get_indexes( 
    616            connection, table_name, schema=schema, **kw 
    617        ): 
    618            if idx["name"] == index_name: 
    619                return True 
    620        else: 
    621            return False 
    622 
    623    def has_schema( 
    624        self, connection: Connection, schema_name: str, **kw: Any 
    625    ) -> bool: 
    626        return schema_name in self.get_schema_names(connection, **kw) 
    627 
    628    def validate_identifier(self, ident: str) -> None: 
    629        if len(ident) > self.max_identifier_length: 
    630            raise exc.IdentifierError( 
    631                "Identifier '%s' exceeds maximum length of %d characters" 
    632                % (ident, self.max_identifier_length) 
    633            ) 
    634 
    635    def connect(self, *cargs: Any, **cparams: Any) -> DBAPIConnection: 
    636        # inherits the docstring from interfaces.Dialect.connect 
    637        return self.loaded_dbapi.connect(*cargs, **cparams)  # type: ignore[no-any-return]  # NOQA: E501 
    638 
    639    def create_connect_args(self, url: URL) -> ConnectArgsType: 
    640        # inherits the docstring from interfaces.Dialect.create_connect_args 
    641        opts = url.translate_connect_args() 
    642        opts.update(url.query) 
    643        return ([], opts) 
    644 
    645    def set_engine_execution_options( 
    646        self, engine: Engine, opts: Mapping[str, Any] 
    647    ) -> None: 
    648        supported_names = set(self.connection_characteristics).intersection( 
    649            opts 
    650        ) 
    651        if supported_names: 
    652            characteristics: Mapping[str, Any] = util.immutabledict( 
    653                (name, opts[name]) for name in supported_names 
    654            ) 
    655 
    656            @event.listens_for(engine, "engine_connect") 
    657            def set_connection_characteristics(connection): 
    658                self._set_connection_characteristics( 
    659                    connection, characteristics 
    660                ) 
    661 
    662    def set_connection_execution_options( 
    663        self, connection: Connection, opts: Mapping[str, Any] 
    664    ) -> None: 
    665        supported_names = set(self.connection_characteristics).intersection( 
    666            opts 
    667        ) 
    668        if supported_names: 
    669            characteristics: Mapping[str, Any] = util.immutabledict( 
    670                (name, opts[name]) for name in supported_names 
    671            ) 
    672            self._set_connection_characteristics(connection, characteristics) 
    673 
    674    def _set_connection_characteristics(self, connection, characteristics): 
    675        characteristic_values = [ 
    676            (name, self.connection_characteristics[name], value) 
    677            for name, value in characteristics.items() 
    678        ] 
    679 
    680        if connection.in_transaction(): 
    681            trans_objs = [ 
    682                (name, obj) 
    683                for name, obj, _ in characteristic_values 
    684                if obj.transactional 
    685            ] 
    686            if trans_objs: 
    687                raise exc.InvalidRequestError( 
    688                    "This connection has already initialized a SQLAlchemy " 
    689                    "Transaction() object via begin() or autobegin; " 
    690                    "%s may not be altered unless rollback() or commit() " 
    691                    "is called first." 
    692                    % (", ".join(name for name, obj in trans_objs)) 
    693                ) 
    694 
    695        dbapi_connection = connection.connection.dbapi_connection 
    696        for _, characteristic, value in characteristic_values: 
    697            characteristic.set_connection_characteristic( 
    698                self, connection, dbapi_connection, value 
    699            ) 
    700        connection.connection._connection_record.finalize_callback.append( 
    701            functools.partial(self._reset_characteristics, characteristics) 
    702        ) 
    703 
    704    def _reset_characteristics(self, characteristics, dbapi_connection): 
    705        for characteristic_name in characteristics: 
    706            characteristic = self.connection_characteristics[ 
    707                characteristic_name 
    708            ] 
    709            characteristic.reset_characteristic(self, dbapi_connection) 
    710 
    711    def do_begin(self, dbapi_connection): 
    712        pass 
    713 
    714    def do_rollback(self, dbapi_connection): 
    715        if self.skip_autocommit_rollback and self.detect_autocommit_setting( 
    716            dbapi_connection 
    717        ): 
    718            return 
    719        dbapi_connection.rollback() 
    720 
    721    def do_commit(self, dbapi_connection): 
    722        dbapi_connection.commit() 
    723 
    724    def do_terminate(self, dbapi_connection): 
    725        self.do_close(dbapi_connection) 
    726 
    727    def do_close(self, dbapi_connection): 
    728        dbapi_connection.close() 
    729 
    730    @util.memoized_property 
    731    def _dialect_specific_select_one(self): 
    732        return str(expression.select(1).compile(dialect=self)) 
    733 
    734    def _do_ping_w_event(self, dbapi_connection: DBAPIConnection) -> bool: 
    735        try: 
    736            return self.do_ping(dbapi_connection) 
    737        except self.loaded_dbapi.Error as err: 
    738            is_disconnect = self.is_disconnect(err, dbapi_connection, None) 
    739 
    740            if self._has_events: 
    741                try: 
    742                    Connection._handle_dbapi_exception_noconnection( 
    743                        err, 
    744                        self, 
    745                        is_disconnect=is_disconnect, 
    746                        invalidate_pool_on_disconnect=False, 
    747                        is_pre_ping=True, 
    748                    ) 
    749                except exc.StatementError as new_err: 
    750                    is_disconnect = new_err.connection_invalidated 
    751 
    752            if is_disconnect: 
    753                return False 
    754            else: 
    755                raise 
    756 
    757    def do_ping(self, dbapi_connection: DBAPIConnection) -> bool: 
    758        cursor = dbapi_connection.cursor() 
    759        try: 
    760            cursor.execute(self._dialect_specific_select_one) 
    761        finally: 
    762            cursor.close() 
    763        return True 
    764 
    765    def create_xid(self): 
    766        """Create a random two-phase transaction ID. 
    767 
    768        This id will be passed to do_begin_twophase(), do_rollback_twophase(), 
    769        do_commit_twophase().  Its format is unspecified. 
    770        """ 
    771 
    772        return "_sa_%032x" % random.randint(0, 2**128) 
    773 
    774    def do_savepoint(self, connection, name): 
    775        connection.execute(expression.SavepointClause(name)) 
    776 
    777    def do_rollback_to_savepoint(self, connection, name): 
    778        connection.execute(expression.RollbackToSavepointClause(name)) 
    779 
    780    def do_release_savepoint(self, connection, name): 
    781        connection.execute(expression.ReleaseSavepointClause(name)) 
    782 
    783    def _deliver_insertmanyvalues_batches( 
    784        self, 
    785        connection, 
    786        cursor, 
    787        statement, 
    788        parameters, 
    789        generic_setinputsizes, 
    790        context, 
    791    ): 
    792        context = cast(DefaultExecutionContext, context) 
    793        compiled = cast(SQLCompiler, context.compiled) 
    794 
    795        _composite_sentinel_proc: Sequence[ 
    796            Optional[_ResultProcessorType[Any]] 
    797        ] = () 
    798        _scalar_sentinel_proc: Optional[_ResultProcessorType[Any]] = None 
    799        _sentinel_proc_initialized: bool = False 
    800 
    801        compiled_parameters = context.compiled_parameters 
    802 
    803        imv = compiled._insertmanyvalues 
    804        assert imv is not None 
    805 
    806        is_returning: Final[bool] = bool(compiled.effective_returning) 
    807        batch_size = context.execution_options.get( 
    808            "insertmanyvalues_page_size", self.insertmanyvalues_page_size 
    809        ) 
    810 
    811        if compiled.schema_translate_map: 
    812            schema_translate_map = context.execution_options.get( 
    813                "schema_translate_map", {} 
    814            ) 
    815        else: 
    816            schema_translate_map = None 
    817 
    818        if is_returning: 
    819            result: Optional[List[Any]] = [] 
    820            context._insertmanyvalues_rows = result 
    821 
    822            sort_by_parameter_order = imv.sort_by_parameter_order 
    823 
    824        else: 
    825            sort_by_parameter_order = False 
    826            result = None 
    827 
    828        for imv_batch in compiled._deliver_insertmanyvalues_batches( 
    829            statement, 
    830            parameters, 
    831            compiled_parameters, 
    832            generic_setinputsizes, 
    833            batch_size, 
    834            sort_by_parameter_order, 
    835            schema_translate_map, 
    836        ): 
    837            yield imv_batch 
    838 
    839            if is_returning: 
    840 
    841                try: 
    842                    rows = context.fetchall_for_returning(cursor) 
    843                except BaseException as be: 
    844                    connection._handle_dbapi_exception( 
    845                        be, 
    846                        sql_util._long_statement(imv_batch.replaced_statement), 
    847                        imv_batch.replaced_parameters, 
    848                        None, 
    849                        context, 
    850                        is_sub_exec=True, 
    851                    ) 
    852 
    853                # I would have thought "is_returning: Final[bool]" 
    854                # would have assured this but pylance thinks not 
    855                assert result is not None 
    856 
    857                if imv.num_sentinel_columns and not imv_batch.is_downgraded: 
    858                    composite_sentinel = imv.num_sentinel_columns > 1 
    859                    if imv.implicit_sentinel: 
    860                        # for implicit sentinel, which is currently single-col 
    861                        # integer autoincrement, do a simple sort. 
    862                        assert not composite_sentinel 
    863                        result.extend( 
    864                            sorted(rows, key=operator.itemgetter(-1)) 
    865                        ) 
    866                        continue 
    867 
    868                    # otherwise, create dictionaries to match up batches 
    869                    # with parameters 
    870                    assert imv.sentinel_param_keys 
    871                    assert imv.sentinel_columns 
    872 
    873                    _nsc = imv.num_sentinel_columns 
    874 
    875                    if not _sentinel_proc_initialized: 
    876                        if composite_sentinel: 
    877                            _composite_sentinel_proc = [ 
    878                                col.type._cached_result_processor( 
    879                                    self, cursor_desc[1] 
    880                                ) 
    881                                for col, cursor_desc in zip( 
    882                                    imv.sentinel_columns, 
    883                                    cursor.description[-_nsc:], 
    884                                ) 
    885                            ] 
    886                        else: 
    887                            _scalar_sentinel_proc = ( 
    888                                imv.sentinel_columns[0] 
    889                            ).type._cached_result_processor( 
    890                                self, cursor.description[-1][1] 
    891                            ) 
    892                        _sentinel_proc_initialized = True 
    893 
    894                    rows_by_sentinel: Union[ 
    895                        Dict[Tuple[Any, ...], Any], 
    896                        Dict[Any, Any], 
    897                    ] 
    898                    if composite_sentinel: 
    899                        rows_by_sentinel = { 
    900                            tuple( 
    901                                (proc(val) if proc else val) 
    902                                for val, proc in zip( 
    903                                    row[-_nsc:], _composite_sentinel_proc 
    904                                ) 
    905                            ): row 
    906                            for row in rows 
    907                        } 
    908                    elif _scalar_sentinel_proc: 
    909                        rows_by_sentinel = { 
    910                            _scalar_sentinel_proc(row[-1]): row for row in rows 
    911                        } 
    912                    else: 
    913                        rows_by_sentinel = {row[-1]: row for row in rows} 
    914 
    915                    if len(rows_by_sentinel) != len(imv_batch.batch): 
    916                        # see test_insert_exec.py:: 
    917                        # IMVSentinelTest::test_sentinel_incorrect_rowcount 
    918                        # for coverage / demonstration 
    919                        raise exc.InvalidRequestError( 
    920                            f"Sentinel-keyed result set did not produce " 
    921                            f"correct number of rows {len(imv_batch.batch)}; " 
    922                            "produced " 
    923                            f"{len(rows_by_sentinel)}.  Please ensure the " 
    924                            "sentinel column is fully unique and populated in " 
    925                            "all cases." 
    926                        ) 
    927 
    928                    try: 
    929                        ordered_rows = [ 
    930                            rows_by_sentinel[sentinel_keys] 
    931                            for sentinel_keys in imv_batch.sentinel_values 
    932                        ] 
    933                    except KeyError as ke: 
    934                        # see test_insert_exec.py:: 
    935                        # IMVSentinelTest::test_sentinel_cant_match_keys 
    936                        # for coverage / demonstration 
    937                        raise exc.InvalidRequestError( 
    938                            f"Can't match sentinel values in result set to " 
    939                            f"parameter sets; key {ke.args[0]!r} was not " 
    940                            "found. " 
    941                            "There may be a mismatch between the datatype " 
    942                            "passed to the DBAPI driver vs. that which it " 
    943                            "returns in a result row.  Ensure the given " 
    944                            "Python value matches the expected result type " 
    945                            "*exactly*, taking care to not rely upon implicit " 
    946                            "conversions which may occur such as when using " 
    947                            "strings in place of UUID or integer values, etc. " 
    948                        ) from ke 
    949 
    950                    result.extend(ordered_rows) 
    951 
    952                else: 
    953                    result.extend(rows) 
    954 
    955    def do_executemany(self, cursor, statement, parameters, context=None): 
    956        cursor.executemany(statement, parameters) 
    957 
    958    def do_execute(self, cursor, statement, parameters, context=None): 
    959        cursor.execute(statement, parameters) 
    960 
    961    def do_execute_no_params(self, cursor, statement, context=None): 
    962        cursor.execute(statement) 
    963 
    964    def is_disconnect( 
    965        self, 
    966        e: DBAPIModule.Error, 
    967        connection: Union[ 
    968            pool.PoolProxiedConnection, interfaces.DBAPIConnection, None 
    969        ], 
    970        cursor: Optional[interfaces.DBAPICursor], 
    971    ) -> bool: 
    972        return False 
    973 
    974    @util.memoized_instancemethod 
    975    def _gen_allowed_isolation_levels(self, dbapi_conn): 
    976        try: 
    977            raw_levels = list(self.get_isolation_level_values(dbapi_conn)) 
    978        except NotImplementedError: 
    979            return None 
    980        else: 
    981            normalized_levels = [ 
    982                level.replace("_", " ").upper() for level in raw_levels 
    983            ] 
    984            if raw_levels != normalized_levels: 
    985                raise ValueError( 
    986                    f"Dialect {self.name!r} get_isolation_level_values() " 
    987                    f"method should return names as UPPERCASE using spaces, " 
    988                    f"not underscores; got " 
    989                    f"{sorted(set(raw_levels).difference(normalized_levels))}" 
    990                ) 
    991            return tuple(normalized_levels) 
    992 
    993    def _assert_and_set_isolation_level(self, dbapi_conn, level): 
    994        level = level.replace("_", " ").upper() 
    995 
    996        _allowed_isolation_levels = self._gen_allowed_isolation_levels( 
    997            dbapi_conn 
    998        ) 
    999        if ( 
    1000            _allowed_isolation_levels 
    1001            and level not in _allowed_isolation_levels 
    1002        ): 
    1003            raise exc.ArgumentError( 
    1004                f"Invalid value {level!r} for isolation_level. " 
    1005                f"Valid isolation levels for {self.name!r} are " 
    1006                f"{', '.join(_allowed_isolation_levels)}" 
    1007            ) 
    1008 
    1009        self.set_isolation_level(dbapi_conn, level) 
    1010 
    1011    def reset_isolation_level(self, dbapi_conn): 
    1012        if self._on_connect_isolation_level is not None: 
    1013            assert ( 
    1014                self._on_connect_isolation_level == "AUTOCOMMIT" 
    1015                or self._on_connect_isolation_level 
    1016                == self.default_isolation_level 
    1017            ) 
    1018            self._assert_and_set_isolation_level( 
    1019                dbapi_conn, self._on_connect_isolation_level 
    1020            ) 
    1021        else: 
    1022            assert self.default_isolation_level is not None 
    1023            self._assert_and_set_isolation_level( 
    1024                dbapi_conn, 
    1025                self.default_isolation_level, 
    1026            ) 
    1027 
    1028    def normalize_name(self, name): 
    1029        if name is None: 
    1030            return None 
    1031 
    1032        name_lower = name.lower() 
    1033        name_upper = name.upper() 
    1034 
    1035        if name_upper == name_lower: 
    1036            # name has no upper/lower conversion, e.g. non-european characters. 
    1037            # return unchanged 
    1038            return name 
    1039        elif name_upper == name and not ( 
    1040            self.identifier_preparer._requires_quotes 
    1041        )(name_lower): 
    1042            # name is all uppercase and doesn't require quoting; normalize 
    1043            # to all lower case 
    1044            return name_lower 
    1045        elif name_lower == name: 
    1046            # name is all lower case, which if denormalized means we need to 
    1047            # force quoting on it 
    1048            return quoted_name(name, quote=True) 
    1049        else: 
    1050            # name is mixed case, means it will be quoted in SQL when used 
    1051            # later, no normalizes 
    1052            return name 
    1053 
    1054    def denormalize_name(self, name): 
    1055        if name is None: 
    1056            return None 
    1057 
    1058        name_lower = name.lower() 
    1059        name_upper = name.upper() 
    1060 
    1061        if name_upper == name_lower: 
    1062            # name has no upper/lower conversion, e.g. non-european characters. 
    1063            # return unchanged 
    1064            return name 
    1065        elif name_lower == name and not ( 
    1066            self.identifier_preparer._requires_quotes 
    1067        )(name_lower): 
    1068            name = name_upper 
    1069        return name 
    1070 
    1071    def get_driver_connection(self, connection: DBAPIConnection) -> Any: 
    1072        return connection 
    1073 
    1074    def _overrides_default(self, method): 
    1075        return ( 
    1076            getattr(type(self), method).__code__ 
    1077            is not getattr(DefaultDialect, method).__code__ 
    1078        ) 
    1079 
    1080    def _default_multi_reflect( 
    1081        self, 
    1082        single_tbl_method, 
    1083        connection, 
    1084        kind, 
    1085        schema, 
    1086        filter_names, 
    1087        scope, 
    1088        **kw, 
    1089    ): 
    1090        names_fns = [] 
    1091        temp_names_fns = [] 
    1092        if ObjectKind.TABLE in kind: 
    1093            names_fns.append(self.get_table_names) 
    1094            temp_names_fns.append(self.get_temp_table_names) 
    1095        if ObjectKind.VIEW in kind: 
    1096            names_fns.append(self.get_view_names) 
    1097            temp_names_fns.append(self.get_temp_view_names) 
    1098        if ObjectKind.MATERIALIZED_VIEW in kind: 
    1099            names_fns.append(self.get_materialized_view_names) 
    1100            # no temp materialized view at the moment 
    1101            # temp_names_fns.append(self.get_temp_materialized_view_names) 
    1102 
    1103        unreflectable = kw.pop("unreflectable", {}) 
    1104 
    1105        if ( 
    1106            filter_names 
    1107            and scope is ObjectScope.ANY 
    1108            and kind is ObjectKind.ANY 
    1109        ): 
    1110            # if names are given and no qualification on type of table 
    1111            # (i.e. the Table(..., autoload) case), take the names as given, 
    1112            # don't run names queries. If a table does not exit 
    1113            # NoSuchTableError is raised and it's skipped 
    1114 
    1115            # this also suits the case for mssql where we can reflect 
    1116            # individual temp tables but there's no temp_names_fn 
    1117            names = filter_names 
    1118        else: 
    1119            names = [] 
    1120            name_kw = {"schema": schema, **kw} 
    1121            fns = [] 
    1122            if ObjectScope.DEFAULT in scope: 
    1123                fns.extend(names_fns) 
    1124            if ObjectScope.TEMPORARY in scope: 
    1125                fns.extend(temp_names_fns) 
    1126 
    1127            for fn in fns: 
    1128                try: 
    1129                    names.extend(fn(connection, **name_kw)) 
    1130                except NotImplementedError: 
    1131                    pass 
    1132 
    1133        if filter_names: 
    1134            filter_names = set(filter_names) 
    1135 
    1136        # iterate over all the tables/views and call the single table method 
    1137        for table in names: 
    1138            if not filter_names or table in filter_names: 
    1139                key = (schema, table) 
    1140                try: 
    1141                    yield ( 
    1142                        key, 
    1143                        single_tbl_method( 
    1144                            connection, table, schema=schema, **kw 
    1145                        ), 
    1146                    ) 
    1147                except exc.UnreflectableTableError as err: 
    1148                    if key not in unreflectable: 
    1149                        unreflectable[key] = err 
    1150                except exc.NoSuchTableError: 
    1151                    pass 
    1152 
    1153    def get_multi_table_options(self, connection, **kw): 
    1154        return self._default_multi_reflect( 
    1155            self.get_table_options, connection, **kw 
    1156        ) 
    1157 
    1158    def get_multi_columns(self, connection, **kw): 
    1159        return self._default_multi_reflect(self.get_columns, connection, **kw) 
    1160 
    1161    def get_multi_pk_constraint(self, connection, **kw): 
    1162        return self._default_multi_reflect( 
    1163            self.get_pk_constraint, connection, **kw 
    1164        ) 
    1165 
    1166    def get_multi_foreign_keys(self, connection, **kw): 
    1167        return self._default_multi_reflect( 
    1168            self.get_foreign_keys, connection, **kw 
    1169        ) 
    1170 
    1171    def get_multi_indexes(self, connection, **kw): 
    1172        return self._default_multi_reflect(self.get_indexes, connection, **kw) 
    1173 
    1174    def get_multi_unique_constraints(self, connection, **kw): 
    1175        return self._default_multi_reflect( 
    1176            self.get_unique_constraints, connection, **kw 
    1177        ) 
    1178 
    1179    def get_multi_check_constraints(self, connection, **kw): 
    1180        return self._default_multi_reflect( 
    1181            self.get_check_constraints, connection, **kw 
    1182        ) 
    1183 
    1184    def get_multi_table_comment(self, connection, **kw): 
    1185        return self._default_multi_reflect( 
    1186            self.get_table_comment, connection, **kw 
    1187        ) 
    1188 
    1189 
    1190class StrCompileDialect(DefaultDialect): 
    1191    statement_compiler = compiler.StrSQLCompiler 
    1192    ddl_compiler = compiler.DDLCompiler 
    1193    type_compiler_cls = compiler.StrSQLTypeCompiler 
    1194    preparer = compiler.IdentifierPreparer 
    1195 
    1196    insert_returning = True 
    1197    update_returning = True 
    1198    delete_returning = True 
    1199 
    1200    supports_statement_cache = True 
    1201 
    1202    supports_identity_columns = True 
    1203 
    1204    supports_sequences = True 
    1205    sequences_optional = True 
    1206    preexecute_autoincrement_sequences = False 
    1207 
    1208    supports_native_boolean = True 
    1209 
    1210    supports_multivalues_insert = True 
    1211    supports_simple_order_by_label = True 
    1212 
    1213 
    1214class DefaultExecutionContext(ExecutionContext): 
    1215    isinsert = False 
    1216    isupdate = False 
    1217    isdelete = False 
    1218    is_crud = False 
    1219    is_text = False 
    1220    isddl = False 
    1221 
    1222    execute_style: ExecuteStyle = ExecuteStyle.EXECUTE 
    1223 
    1224    compiled: Optional[Compiled] = None 
    1225    result_column_struct: Optional[ 
    1226        Tuple[List[ResultColumnsEntry], bool, bool, bool, bool] 
    1227    ] = None 
    1228    returned_default_rows: Optional[Sequence[Row[Unpack[TupleAny]]]] = None 
    1229 
    1230    execution_options: _ExecuteOptions = util.EMPTY_DICT 
    1231 
    1232    cursor_fetch_strategy = _cursor._DEFAULT_FETCH 
    1233 
    1234    invoked_statement: Optional[Executable] = None 
    1235 
    1236    _is_implicit_returning = False 
    1237    _is_explicit_returning = False 
    1238    _is_supplemental_returning = False 
    1239    _is_server_side = False 
    1240 
    1241    _soft_closed = False 
    1242 
    1243    _rowcount: Optional[int] = None 
    1244 
    1245    # a hook for SQLite's translation of 
    1246    # result column names 
    1247    # NOTE: pyhive is using this hook, can't remove it :( 
    1248    _translate_colname: Optional[ 
    1249        Callable[[str], Tuple[str, Optional[str]]] 
    1250    ] = None 
    1251 
    1252    _expanded_parameters: Mapping[str, List[str]] = util.immutabledict() 
    1253    """used by set_input_sizes(). 
    1254 
    1255    This collection comes from ``ExpandedState.parameter_expansion``. 
    1256 
    1257    """ 
    1258 
    1259    cache_hit = NO_CACHE_KEY 
    1260 
    1261    root_connection: Connection 
    1262    _dbapi_connection: PoolProxiedConnection 
    1263    dialect: Dialect 
    1264    unicode_statement: str 
    1265    cursor: DBAPICursor 
    1266    compiled_parameters: List[_MutableCoreSingleExecuteParams] 
    1267    parameters: _DBAPIMultiExecuteParams 
    1268    extracted_parameters: Optional[Sequence[BindParameter[Any]]] 
    1269 
    1270    _empty_dict_params = cast("Mapping[str, Any]", util.EMPTY_DICT) 
    1271 
    1272    _insertmanyvalues_rows: Optional[List[Tuple[Any, ...]]] = None 
    1273    _num_sentinel_cols: int = 0 
    1274 
    1275    @classmethod 
    1276    def _init_ddl( 
    1277        cls, 
    1278        dialect: Dialect, 
    1279        connection: Connection, 
    1280        dbapi_connection: PoolProxiedConnection, 
    1281        execution_options: _ExecuteOptions, 
    1282        compiled_ddl: DDLCompiler, 
    1283    ) -> ExecutionContext: 
    1284        """Initialize execution context for an ExecutableDDLElement 
    1285        construct.""" 
    1286 
    1287        self = cls.__new__(cls) 
    1288        self.root_connection = connection 
    1289        self._dbapi_connection = dbapi_connection 
    1290        self.dialect = connection.dialect 
    1291 
    1292        self.compiled = compiled = compiled_ddl 
    1293        self.isddl = True 
    1294 
    1295        self.execution_options = execution_options 
    1296 
    1297        self.unicode_statement = str(compiled) 
    1298        if compiled.schema_translate_map: 
    1299            schema_translate_map = self.execution_options.get( 
    1300                "schema_translate_map", {} 
    1301            ) 
    1302 
    1303            rst = compiled.preparer._render_schema_translates 
    1304            self.unicode_statement = rst( 
    1305                self.unicode_statement, schema_translate_map 
    1306            ) 
    1307 
    1308        self.statement = self.unicode_statement 
    1309 
    1310        self.cursor = self.create_cursor() 
    1311        self.compiled_parameters = [] 
    1312 
    1313        if dialect.positional: 
    1314            self.parameters = [dialect.execute_sequence_format()] 
    1315        else: 
    1316            self.parameters = [self._empty_dict_params] 
    1317 
    1318        return self 
    1319 
    1320    @classmethod 
    1321    def _init_compiled( 
    1322        cls, 
    1323        dialect: Dialect, 
    1324        connection: Connection, 
    1325        dbapi_connection: PoolProxiedConnection, 
    1326        execution_options: _ExecuteOptions, 
    1327        compiled: SQLCompiler, 
    1328        parameters: _CoreMultiExecuteParams, 
    1329        invoked_statement: Executable, 
    1330        extracted_parameters: Optional[Sequence[BindParameter[Any]]], 
    1331        cache_hit: CacheStats = CacheStats.CACHING_DISABLED, 
    1332    ) -> ExecutionContext: 
    1333        """Initialize execution context for a Compiled construct.""" 
    1334 
    1335        self = cls.__new__(cls) 
    1336        self.root_connection = connection 
    1337        self._dbapi_connection = dbapi_connection 
    1338        self.dialect = connection.dialect 
    1339        self.extracted_parameters = extracted_parameters 
    1340        self.invoked_statement = invoked_statement 
    1341        self.compiled = compiled 
    1342        self.cache_hit = cache_hit 
    1343 
    1344        self.execution_options = execution_options 
    1345 
    1346        self.result_column_struct = ( 
    1347            compiled._result_columns, 
    1348            compiled._ordered_columns, 
    1349            compiled._textual_ordered_columns, 
    1350            compiled._ad_hoc_textual, 
    1351            compiled._loose_column_name_matching, 
    1352        ) 
    1353 
    1354        self.isinsert = ii = compiled.isinsert 
    1355        self.isupdate = iu = compiled.isupdate 
    1356        self.isdelete = id_ = compiled.isdelete 
    1357        self.is_text = compiled.isplaintext 
    1358 
    1359        if ii or iu or id_: 
    1360            dml_statement = compiled.compile_state.statement  # type: ignore 
    1361            if TYPE_CHECKING: 
    1362                assert isinstance(dml_statement, UpdateBase) 
    1363            self.is_crud = True 
    1364            self._is_explicit_returning = ier = bool(dml_statement._returning) 
    1365            self._is_implicit_returning = iir = bool( 
    1366                compiled.implicit_returning 
    1367            ) 
    1368            if iir and dml_statement._supplemental_returning: 
    1369                self._is_supplemental_returning = True 
    1370 
    1371            # dont mix implicit and explicit returning 
    1372            assert not (iir and ier) 
    1373 
    1374            if (ier or iir) and compiled.for_executemany: 
    1375                if ii and not self.dialect.insert_executemany_returning: 
    1376                    raise exc.InvalidRequestError( 
    1377                        f"Dialect {self.dialect.dialect_description} with " 
    1378                        f"current server capabilities does not support " 
    1379                        "INSERT..RETURNING when executemany is used" 
    1380                    ) 
    1381                elif ( 
    1382                    ii 
    1383                    and dml_statement._sort_by_parameter_order 
    1384                    and not self.dialect.insert_executemany_returning_sort_by_parameter_order  # noqa: E501 
    1385                ): 
    1386                    raise exc.InvalidRequestError( 
    1387                        f"Dialect {self.dialect.dialect_description} with " 
    1388                        f"current server capabilities does not support " 
    1389                        "INSERT..RETURNING with deterministic row ordering " 
    1390                        "when executemany is used" 
    1391                    ) 
    1392                elif ( 
    1393                    ii 
    1394                    and self.dialect.use_insertmanyvalues 
    1395                    and not compiled._insertmanyvalues 
    1396                ): 
    1397                    raise exc.InvalidRequestError( 
    1398                        'Statement does not have "insertmanyvalues" ' 
    1399                        "enabled, can't use INSERT..RETURNING with " 
    1400                        "executemany in this case." 
    1401                    ) 
    1402                elif iu and not self.dialect.update_executemany_returning: 
    1403                    raise exc.InvalidRequestError( 
    1404                        f"Dialect {self.dialect.dialect_description} with " 
    1405                        f"current server capabilities does not support " 
    1406                        "UPDATE..RETURNING when executemany is used" 
    1407                    ) 
    1408                elif id_ and not self.dialect.delete_executemany_returning: 
    1409                    raise exc.InvalidRequestError( 
    1410                        f"Dialect {self.dialect.dialect_description} with " 
    1411                        f"current server capabilities does not support " 
    1412                        "DELETE..RETURNING when executemany is used" 
    1413                    ) 
    1414 
    1415        if not parameters: 
    1416            self.compiled_parameters = [ 
    1417                compiled.construct_params( 
    1418                    extracted_parameters=extracted_parameters, 
    1419                    escape_names=False, 
    1420                ) 
    1421            ] 
    1422        else: 
    1423            self.compiled_parameters = [ 
    1424                compiled.construct_params( 
    1425                    m, 
    1426                    escape_names=False, 
    1427                    _group_number=grp, 
    1428                    extracted_parameters=extracted_parameters, 
    1429                ) 
    1430                for grp, m in enumerate(parameters) 
    1431            ] 
    1432 
    1433            if len(parameters) > 1: 
    1434                if self.isinsert and compiled._insertmanyvalues: 
    1435                    self.execute_style = ExecuteStyle.INSERTMANYVALUES 
    1436 
    1437                    imv = compiled._insertmanyvalues 
    1438                    if imv.sentinel_columns is not None: 
    1439                        self._num_sentinel_cols = imv.num_sentinel_columns 
    1440                else: 
    1441                    self.execute_style = ExecuteStyle.EXECUTEMANY 
    1442 
    1443        self.unicode_statement = compiled.string 
    1444 
    1445        self.cursor = self.create_cursor() 
    1446 
    1447        if self.compiled.insert_prefetch or self.compiled.update_prefetch: 
    1448            self._process_execute_defaults() 
    1449 
    1450        processors = compiled._bind_processors 
    1451 
    1452        flattened_processors: Mapping[ 
    1453            str, _BindProcessorType[Any] 
    1454        ] = processors  # type: ignore[assignment] 
    1455 
    1456        if compiled.literal_execute_params or compiled.post_compile_params: 
    1457            if self.executemany: 
    1458                raise exc.InvalidRequestError( 
    1459                    "'literal_execute' or 'expanding' parameters can't be " 
    1460                    "used with executemany()" 
    1461                ) 
    1462 
    1463            expanded_state = compiled._process_parameters_for_postcompile( 
    1464                self.compiled_parameters[0] 
    1465            ) 
    1466 
    1467            # re-assign self.unicode_statement 
    1468            self.unicode_statement = expanded_state.statement 
    1469 
    1470            self._expanded_parameters = expanded_state.parameter_expansion 
    1471 
    1472            flattened_processors = dict(processors)  # type: ignore 
    1473            flattened_processors.update(expanded_state.processors) 
    1474            positiontup = expanded_state.positiontup 
    1475        elif compiled.positional: 
    1476            positiontup = self.compiled.positiontup 
    1477        else: 
    1478            positiontup = None 
    1479 
    1480        if compiled.schema_translate_map: 
    1481            schema_translate_map = self.execution_options.get( 
    1482                "schema_translate_map", {} 
    1483            ) 
    1484            rst = compiled.preparer._render_schema_translates 
    1485            self.unicode_statement = rst( 
    1486                self.unicode_statement, schema_translate_map 
    1487            ) 
    1488 
    1489        # final self.unicode_statement is now assigned, encode if needed 
    1490        # by dialect 
    1491        self.statement = self.unicode_statement 
    1492 
    1493        # Convert the dictionary of bind parameter values 
    1494        # into a dict or list to be sent to the DBAPI's 
    1495        # execute() or executemany() method. 
    1496 
    1497        if compiled.positional: 
    1498            core_positional_parameters: MutableSequence[Sequence[Any]] = [] 
    1499            assert positiontup is not None 
    1500            for compiled_params in self.compiled_parameters: 
    1501                l_param: List[Any] = [ 
    1502                    ( 
    1503                        flattened_processors[key](compiled_params[key]) 
    1504                        if key in flattened_processors 
    1505                        else compiled_params[key] 
    1506                    ) 
    1507                    for key in positiontup 
    1508                ] 
    1509                core_positional_parameters.append( 
    1510                    dialect.execute_sequence_format(l_param) 
    1511                ) 
    1512 
    1513            self.parameters = core_positional_parameters 
    1514        else: 
    1515            core_dict_parameters: MutableSequence[Dict[str, Any]] = [] 
    1516            escaped_names = compiled.escaped_bind_names 
    1517 
    1518            # note that currently, "expanded" parameters will be present 
    1519            # in self.compiled_parameters in their quoted form.   This is 
    1520            # slightly inconsistent with the approach taken as of 
    1521            # #8056 where self.compiled_parameters is meant to contain unquoted 
    1522            # param names. 
    1523            d_param: Dict[str, Any] 
    1524            for compiled_params in self.compiled_parameters: 
    1525                if escaped_names: 
    1526                    d_param = { 
    1527                        escaped_names.get(key, key): ( 
    1528                            flattened_processors[key](compiled_params[key]) 
    1529                            if key in flattened_processors 
    1530                            else compiled_params[key] 
    1531                        ) 
    1532                        for key in compiled_params 
    1533                    } 
    1534                else: 
    1535                    d_param = { 
    1536                        key: ( 
    1537                            flattened_processors[key](compiled_params[key]) 
    1538                            if key in flattened_processors 
    1539                            else compiled_params[key] 
    1540                        ) 
    1541                        for key in compiled_params 
    1542                    } 
    1543 
    1544                core_dict_parameters.append(d_param) 
    1545 
    1546            self.parameters = core_dict_parameters 
    1547 
    1548        return self 
    1549 
    1550    @classmethod 
    1551    def _init_statement( 
    1552        cls, 
    1553        dialect: Dialect, 
    1554        connection: Connection, 
    1555        dbapi_connection: PoolProxiedConnection, 
    1556        execution_options: _ExecuteOptions, 
    1557        statement: str, 
    1558        parameters: _DBAPIMultiExecuteParams, 
    1559    ) -> ExecutionContext: 
    1560        """Initialize execution context for a string SQL statement.""" 
    1561 
    1562        self = cls.__new__(cls) 
    1563        self.root_connection = connection 
    1564        self._dbapi_connection = dbapi_connection 
    1565        self.dialect = connection.dialect 
    1566        self.is_text = True 
    1567 
    1568        self.execution_options = execution_options 
    1569 
    1570        if not parameters: 
    1571            if self.dialect.positional: 
    1572                self.parameters = [dialect.execute_sequence_format()] 
    1573            else: 
    1574                self.parameters = [self._empty_dict_params] 
    1575        elif isinstance(parameters[0], dialect.execute_sequence_format): 
    1576            self.parameters = parameters 
    1577        elif isinstance(parameters[0], dict): 
    1578            self.parameters = parameters 
    1579        else: 
    1580            self.parameters = [ 
    1581                dialect.execute_sequence_format(p) for p in parameters 
    1582            ] 
    1583 
    1584        if len(parameters) > 1: 
    1585            self.execute_style = ExecuteStyle.EXECUTEMANY 
    1586 
    1587        self.statement = self.unicode_statement = statement 
    1588 
    1589        self.cursor = self.create_cursor() 
    1590        return self 
    1591 
    1592    @classmethod 
    1593    def _init_default( 
    1594        cls, 
    1595        dialect: Dialect, 
    1596        connection: Connection, 
    1597        dbapi_connection: PoolProxiedConnection, 
    1598        execution_options: _ExecuteOptions, 
    1599    ) -> ExecutionContext: 
    1600        """Initialize execution context for a ColumnDefault construct.""" 
    1601 
    1602        self = cls.__new__(cls) 
    1603        self.root_connection = connection 
    1604        self._dbapi_connection = dbapi_connection 
    1605        self.dialect = connection.dialect 
    1606 
    1607        self.execution_options = execution_options 
    1608 
    1609        self.cursor = self.create_cursor() 
    1610        return self 
    1611 
    1612    def _get_cache_stats(self) -> str: 
    1613        if self.compiled is None: 
    1614            return "raw sql" 
    1615 
    1616        now = perf_counter() 
    1617 
    1618        ch = self.cache_hit 
    1619 
    1620        gen_time = self.compiled._gen_time 
    1621        assert gen_time is not None 
    1622 
    1623        if ch is NO_CACHE_KEY: 
    1624            return "no key %.5fs" % (now - gen_time,) 
    1625        elif ch is CACHE_HIT: 
    1626            return "cached since %.4gs ago" % (now - gen_time,) 
    1627        elif ch is CACHE_MISS: 
    1628            return "generated in %.5fs" % (now - gen_time,) 
    1629        elif ch is CACHING_DISABLED: 
    1630            if "_cache_disable_reason" in self.execution_options: 
    1631                return "caching disabled (%s) %.5fs " % ( 
    1632                    self.execution_options["_cache_disable_reason"], 
    1633                    now - gen_time, 
    1634                ) 
    1635            else: 
    1636                return "caching disabled %.5fs" % (now - gen_time,) 
    1637        elif ch is NO_DIALECT_SUPPORT: 
    1638            return "dialect %s+%s does not support caching %.5fs" % ( 
    1639                self.dialect.name, 
    1640                self.dialect.driver, 
    1641                now - gen_time, 
    1642            ) 
    1643        else: 
    1644            return "unknown" 
    1645 
    1646    @property 
    1647    def executemany(self):  # type: ignore[override] 
    1648        return self.execute_style in ( 
    1649            ExecuteStyle.EXECUTEMANY, 
    1650            ExecuteStyle.INSERTMANYVALUES, 
    1651        ) 
    1652 
    1653    @util.memoized_property 
    1654    def identifier_preparer(self): 
    1655        if self.compiled: 
    1656            return self.compiled.preparer 
    1657        elif "schema_translate_map" in self.execution_options: 
    1658            return self.dialect.identifier_preparer._with_schema_translate( 
    1659                self.execution_options["schema_translate_map"] 
    1660            ) 
    1661        else: 
    1662            return self.dialect.identifier_preparer 
    1663 
    1664    @util.memoized_property 
    1665    def engine(self): 
    1666        return self.root_connection.engine 
    1667 
    1668    @util.memoized_property 
    1669    def postfetch_cols(self) -> Optional[Sequence[Column[Any]]]: 
    1670        if TYPE_CHECKING: 
    1671            assert isinstance(self.compiled, SQLCompiler) 
    1672        return self.compiled.postfetch 
    1673 
    1674    @util.memoized_property 
    1675    def prefetch_cols(self) -> Optional[Sequence[Column[Any]]]: 
    1676        if TYPE_CHECKING: 
    1677            assert isinstance(self.compiled, SQLCompiler) 
    1678        if self.isinsert: 
    1679            return self.compiled.insert_prefetch 
    1680        elif self.isupdate: 
    1681            return self.compiled.update_prefetch 
    1682        else: 
    1683            return () 
    1684 
    1685    @util.memoized_property 
    1686    def no_parameters(self): 
    1687        return self.execution_options.get("no_parameters", False) 
    1688 
    1689    def _execute_scalar( 
    1690        self, 
    1691        stmt: str, 
    1692        type_: Optional[TypeEngine[Any]], 
    1693        parameters: Optional[_DBAPISingleExecuteParams] = None, 
    1694    ) -> Any: 
    1695        """Execute a string statement on the current cursor, returning a 
    1696        scalar result. 
    1697 
    1698        Used to fire off sequences, default phrases, and "select lastrowid" 
    1699        types of statements individually or in the context of a parent INSERT 
    1700        or UPDATE statement. 
    1701 
    1702        """ 
    1703 
    1704        conn = self.root_connection 
    1705 
    1706        if "schema_translate_map" in self.execution_options: 
    1707            schema_translate_map = self.execution_options.get( 
    1708                "schema_translate_map", {} 
    1709            ) 
    1710 
    1711            rst = self.identifier_preparer._render_schema_translates 
    1712            stmt = rst(stmt, schema_translate_map) 
    1713 
    1714        if not parameters: 
    1715            if self.dialect.positional: 
    1716                parameters = self.dialect.execute_sequence_format() 
    1717            else: 
    1718                parameters = {} 
    1719 
    1720        conn._cursor_execute(self.cursor, stmt, parameters, context=self) 
    1721        row = self.cursor.fetchone() 
    1722        if row is not None: 
    1723            r = row[0] 
    1724        else: 
    1725            r = None 
    1726        if type_ is not None: 
    1727            # apply type post processors to the result 
    1728            proc = type_._cached_result_processor( 
    1729                self.dialect, self.cursor.description[0][1] 
    1730            ) 
    1731            if proc: 
    1732                return proc(r) 
    1733        return r 
    1734 
    1735    @util.memoized_property 
    1736    def connection(self): 
    1737        return self.root_connection 
    1738 
    1739    def _use_server_side_cursor(self): 
    1740        if not self.dialect.supports_server_side_cursors: 
    1741            return False 
    1742 
    1743        if self.dialect.server_side_cursors: 
    1744            # this is deprecated 
    1745            use_server_side = self.execution_options.get( 
    1746                "stream_results", True 
    1747            ) and ( 
    1748                self.compiled 
    1749                and isinstance(self.compiled.statement, expression.Selectable) 
    1750                or ( 
    1751                    ( 
    1752                        not self.compiled 
    1753                        or isinstance( 
    1754                            self.compiled.statement, expression.TextClause 
    1755                        ) 
    1756                    ) 
    1757                    and self.unicode_statement 
    1758                    and SERVER_SIDE_CURSOR_RE.match(self.unicode_statement) 
    1759                ) 
    1760            ) 
    1761        else: 
    1762            use_server_side = self.execution_options.get( 
    1763                "stream_results", False 
    1764            ) 
    1765 
    1766        return use_server_side 
    1767 
    1768    def create_cursor(self) -> DBAPICursor: 
    1769        if ( 
    1770            # inlining initial preference checks for SS cursors 
    1771            self.dialect.supports_server_side_cursors 
    1772            and ( 
    1773                self.execution_options.get("stream_results", False) 
    1774                or ( 
    1775                    self.dialect.server_side_cursors 
    1776                    and self._use_server_side_cursor() 
    1777                ) 
    1778            ) 
    1779        ): 
    1780            self._is_server_side = True 
    1781            return self.create_server_side_cursor() 
    1782        else: 
    1783            self._is_server_side = False 
    1784            return self.create_default_cursor() 
    1785 
    1786    def fetchall_for_returning(self, cursor): 
    1787        return cursor.fetchall() 
    1788 
    1789    def create_default_cursor(self) -> DBAPICursor: 
    1790        return self._dbapi_connection.cursor() 
    1791 
    1792    def create_server_side_cursor(self) -> DBAPICursor: 
    1793        raise NotImplementedError() 
    1794 
    1795    def pre_exec(self): 
    1796        pass 
    1797 
    1798    def get_out_parameter_values(self, names): 
    1799        raise NotImplementedError( 
    1800            "This dialect does not support OUT parameters" 
    1801        ) 
    1802 
    1803    def post_exec(self): 
    1804        pass 
    1805 
    1806    def get_result_processor( 
    1807        self, type_: TypeEngine[Any], colname: str, coltype: DBAPIType 
    1808    ) -> Optional[_ResultProcessorType[Any]]: 
    1809        """Return a 'result processor' for a given type as present in 
    1810        cursor.description. 
    1811 
    1812        This has a default implementation that dialects can override 
    1813        for context-sensitive result type handling. 
    1814 
    1815        """ 
    1816        return type_._cached_result_processor(self.dialect, coltype) 
    1817 
    1818    def get_lastrowid(self) -> int: 
    1819        """return self.cursor.lastrowid, or equivalent, after an INSERT. 
    1820 
    1821        This may involve calling special cursor functions, issuing a new SELECT 
    1822        on the cursor (or a new one), or returning a stored value that was 
    1823        calculated within post_exec(). 
    1824 
    1825        This function will only be called for dialects which support "implicit" 
    1826        primary key generation, keep preexecute_autoincrement_sequences set to 
    1827        False, and when no explicit id value was bound to the statement. 
    1828 
    1829        The function is called once for an INSERT statement that would need to 
    1830        return the last inserted primary key for those dialects that make use 
    1831        of the lastrowid concept.  In these cases, it is called directly after 
    1832        :meth:`.ExecutionContext.post_exec`. 
    1833 
    1834        """ 
    1835        return self.cursor.lastrowid 
    1836 
    1837    def handle_dbapi_exception(self, e): 
    1838        pass 
    1839 
    1840    @util.non_memoized_property 
    1841    def rowcount(self) -> int: 
    1842        if self._rowcount is not None: 
    1843            return self._rowcount 
    1844        else: 
    1845            return self.cursor.rowcount 
    1846 
    1847    @property 
    1848    def _has_rowcount(self): 
    1849        return self._rowcount is not None 
    1850 
    1851    def supports_sane_rowcount(self): 
    1852        return self.dialect.supports_sane_rowcount 
    1853 
    1854    def supports_sane_multi_rowcount(self): 
    1855        return self.dialect.supports_sane_multi_rowcount 
    1856 
    1857    def _setup_result_proxy(self): 
    1858        exec_opt = self.execution_options 
    1859 
    1860        if self._rowcount is None and exec_opt.get("preserve_rowcount", False): 
    1861            self._rowcount = self.cursor.rowcount 
    1862 
    1863        yp: Optional[Union[int, bool]] 
    1864        if self.is_crud or self.is_text: 
    1865            result = self._setup_dml_or_text_result() 
    1866            yp = False 
    1867        else: 
    1868            yp = exec_opt.get("yield_per", None) 
    1869            sr = self._is_server_side or exec_opt.get("stream_results", False) 
    1870            strategy = self.cursor_fetch_strategy 
    1871            if sr and strategy is _cursor._DEFAULT_FETCH: 
    1872                strategy = _cursor.BufferedRowCursorFetchStrategy( 
    1873                    self.cursor, self.execution_options 
    1874                ) 
    1875            cursor_description: _DBAPICursorDescription = ( 
    1876                strategy.alternate_cursor_description 
    1877                or self.cursor.description 
    1878            ) 
    1879            if cursor_description is None: 
    1880                strategy = _cursor._NO_CURSOR_DQL 
    1881 
    1882            result = _cursor.CursorResult(self, strategy, cursor_description) 
    1883 
    1884        compiled = self.compiled 
    1885 
    1886        if ( 
    1887            compiled 
    1888            and not self.isddl 
    1889            and cast(SQLCompiler, compiled).has_out_parameters 
    1890        ): 
    1891            self._setup_out_parameters(result) 
    1892 
    1893        self._soft_closed = result._soft_closed 
    1894 
    1895        if yp: 
    1896            result = result.yield_per(yp) 
    1897 
    1898        return result 
    1899 
    1900    def _setup_out_parameters(self, result): 
    1901        compiled = cast(SQLCompiler, self.compiled) 
    1902 
    1903        out_bindparams = [ 
    1904            (param, name) 
    1905            for param, name in compiled.bind_names.items() 
    1906            if param.isoutparam 
    1907        ] 
    1908        out_parameters = {} 
    1909 
    1910        for bindparam, raw_value in zip( 
    1911            [param for param, name in out_bindparams], 
    1912            self.get_out_parameter_values( 
    1913                [name for param, name in out_bindparams] 
    1914            ), 
    1915        ): 
    1916            type_ = bindparam.type 
    1917            impl_type = type_.dialect_impl(self.dialect) 
    1918            dbapi_type = impl_type.get_dbapi_type(self.dialect.loaded_dbapi) 
    1919            result_processor = impl_type.result_processor( 
    1920                self.dialect, dbapi_type 
    1921            ) 
    1922            if result_processor is not None: 
    1923                raw_value = result_processor(raw_value) 
    1924            out_parameters[bindparam.key] = raw_value 
    1925 
    1926        result.out_parameters = out_parameters 
    1927 
    1928    def _setup_dml_or_text_result(self): 
    1929        compiled = cast(SQLCompiler, self.compiled) 
    1930 
    1931        strategy: ResultFetchStrategy = self.cursor_fetch_strategy 
    1932 
    1933        if self.isinsert: 
    1934            if ( 
    1935                self.execute_style is ExecuteStyle.INSERTMANYVALUES 
    1936                and compiled.effective_returning 
    1937            ): 
    1938                strategy = _cursor.FullyBufferedCursorFetchStrategy( 
    1939                    self.cursor, 
    1940                    initial_buffer=self._insertmanyvalues_rows, 
    1941                    # maintain alt cursor description if set by the 
    1942                    # dialect, e.g. mssql preserves it 
    1943                    alternate_description=( 
    1944                        strategy.alternate_cursor_description 
    1945                    ), 
    1946                ) 
    1947 
    1948            if compiled.postfetch_lastrowid: 
    1949                self.inserted_primary_key_rows = ( 
    1950                    self._setup_ins_pk_from_lastrowid() 
    1951                ) 
    1952            # else if not self._is_implicit_returning, 
    1953            # the default inserted_primary_key_rows accessor will 
    1954            # return an "empty" primary key collection when accessed. 
    1955 
    1956        if self._is_server_side and strategy is _cursor._DEFAULT_FETCH: 
    1957            strategy = _cursor.BufferedRowCursorFetchStrategy( 
    1958                self.cursor, self.execution_options 
    1959            ) 
    1960 
    1961        if strategy is _cursor._NO_CURSOR_DML: 
    1962            cursor_description = None 
    1963        else: 
    1964            cursor_description = ( 
    1965                strategy.alternate_cursor_description 
    1966                or self.cursor.description 
    1967            ) 
    1968 
    1969        if cursor_description is None: 
    1970            strategy = _cursor._NO_CURSOR_DML 
    1971        elif self._num_sentinel_cols: 
    1972            assert self.execute_style is ExecuteStyle.INSERTMANYVALUES 
    1973            # the sentinel columns are handled in CursorResult._init_metadata 
    1974            # using essentially _reduce 
    1975 
    1976        result: _cursor.CursorResult[Any] = _cursor.CursorResult( 
    1977            self, strategy, cursor_description 
    1978        ) 
    1979 
    1980        if self.isinsert: 
    1981            if self._is_implicit_returning: 
    1982                rows = result.all() 
    1983 
    1984                self.returned_default_rows = rows 
    1985 
    1986                self.inserted_primary_key_rows = ( 
    1987                    self._setup_ins_pk_from_implicit_returning(result, rows) 
    1988                ) 
    1989 
    1990                # test that it has a cursor metadata that is accurate. the 
    1991                # first row will have been fetched and current assumptions 
    1992                # are that the result has only one row, until executemany() 
    1993                # support is added here. 
    1994                assert result._metadata.returns_rows 
    1995 
    1996                # Insert statement has both return_defaults() and 
    1997                # returning().  rewind the result on the list of rows 
    1998                # we just used. 
    1999                if self._is_supplemental_returning: 
    2000                    result._rewind(rows) 
    2001                else: 
    2002                    result._soft_close() 
    2003            elif not self._is_explicit_returning: 
    2004                result._soft_close() 
    2005 
    2006                # we assume here the result does not return any rows. 
    2007                # *usually*, this will be true.  However, some dialects 
    2008                # such as that of MSSQL/pyodbc need to SELECT a post fetch 
    2009                # function so this is not necessarily true. 
    2010                # assert not result.returns_rows 
    2011 
    2012        elif self._is_implicit_returning: 
    2013            rows = result.all() 
    2014 
    2015            if rows: 
    2016                self.returned_default_rows = rows 
    2017            self._rowcount = len(rows) 
    2018 
    2019            if self._is_supplemental_returning: 
    2020                result._rewind(rows) 
    2021            else: 
    2022                result._soft_close() 
    2023 
    2024            # test that it has a cursor metadata that is accurate. 
    2025            # the rows have all been fetched however. 
    2026            assert result._metadata.returns_rows 
    2027 
    2028        elif not result._metadata.returns_rows: 
    2029            # no results, get rowcount 
    2030            # (which requires open cursor on some drivers) 
    2031            if self._rowcount is None: 
    2032                self._rowcount = self.cursor.rowcount 
    2033            result._soft_close() 
    2034        elif self.isupdate or self.isdelete: 
    2035            if self._rowcount is None: 
    2036                self._rowcount = self.cursor.rowcount 
    2037        return result 
    2038 
    2039    @util.memoized_property 
    2040    def inserted_primary_key_rows(self): 
    2041        # if no specific "get primary key" strategy was set up 
    2042        # during execution, return a "default" primary key based 
    2043        # on what's in the compiled_parameters and nothing else. 
    2044        return self._setup_ins_pk_from_empty() 
    2045 
    2046    def _setup_ins_pk_from_lastrowid(self): 
    2047        getter = cast( 
    2048            SQLCompiler, self.compiled 
    2049        )._inserted_primary_key_from_lastrowid_getter 
    2050        lastrowid = self.get_lastrowid() 
    2051        return [getter(lastrowid, self.compiled_parameters[0])] 
    2052 
    2053    def _setup_ins_pk_from_empty(self): 
    2054        getter = cast( 
    2055            SQLCompiler, self.compiled 
    2056        )._inserted_primary_key_from_lastrowid_getter 
    2057        return [getter(None, param) for param in self.compiled_parameters] 
    2058 
    2059    def _setup_ins_pk_from_implicit_returning(self, result, rows): 
    2060        if not rows: 
    2061            return [] 
    2062 
    2063        getter = cast( 
    2064            SQLCompiler, self.compiled 
    2065        )._inserted_primary_key_from_returning_getter 
    2066        compiled_params = self.compiled_parameters 
    2067 
    2068        return [ 
    2069            getter(row, param) for row, param in zip(rows, compiled_params) 
    2070        ] 
    2071 
    2072    def lastrow_has_defaults(self) -> bool: 
    2073        return (self.isinsert or self.isupdate) and bool( 
    2074            cast(SQLCompiler, self.compiled).postfetch 
    2075        ) 
    2076 
    2077    def _prepare_set_input_sizes( 
    2078        self, 
    2079    ) -> Optional[List[Tuple[str, Any, TypeEngine[Any]]]]: 
    2080        """Given a cursor and ClauseParameters, prepare arguments 
    2081        in order to call the appropriate 
    2082        style of ``setinputsizes()`` on the cursor, using DB-API types 
    2083        from the bind parameter's ``TypeEngine`` objects. 
    2084 
    2085        This method only called by those dialects which set the 
    2086        :attr:`.Dialect.bind_typing` attribute to 
    2087        :attr:`.BindTyping.SETINPUTSIZES`.  Python-oracledb and cx_Oracle are 
    2088        the only DBAPIs that requires setinputsizes(); pyodbc offers it as an 
    2089        option. 
    2090 
    2091        Prior to SQLAlchemy 2.0, the setinputsizes() approach was also used 
    2092        for pg8000 and asyncpg, which has been changed to inline rendering 
    2093        of casts. 
    2094 
    2095        """ 
    2096        if self.isddl or self.is_text: 
    2097            return None 
    2098 
    2099        compiled = cast(SQLCompiler, self.compiled) 
    2100 
    2101        inputsizes = compiled._get_set_input_sizes_lookup() 
    2102 
    2103        if inputsizes is None: 
    2104            return None 
    2105 
    2106        dialect = self.dialect 
    2107 
    2108        # all of the rest of this... cython? 
    2109 
    2110        if dialect._has_events: 
    2111            inputsizes = dict(inputsizes) 
    2112            dialect.dispatch.do_setinputsizes( 
    2113                inputsizes, self.cursor, self.statement, self.parameters, self 
    2114            ) 
    2115 
    2116        if compiled.escaped_bind_names: 
    2117            escaped_bind_names = compiled.escaped_bind_names 
    2118        else: 
    2119            escaped_bind_names = None 
    2120 
    2121        if dialect.positional: 
    2122            items = [ 
    2123                (key, compiled.binds[key]) 
    2124                for key in compiled.positiontup or () 
    2125            ] 
    2126        else: 
    2127            items = [ 
    2128                (key, bindparam) 
    2129                for bindparam, key in compiled.bind_names.items() 
    2130            ] 
    2131 
    2132        generic_inputsizes: List[Tuple[str, Any, TypeEngine[Any]]] = [] 
    2133        for key, bindparam in items: 
    2134            if bindparam in compiled.literal_execute_params: 
    2135                continue 
    2136 
    2137            if key in self._expanded_parameters: 
    2138                if is_tuple_type(bindparam.type): 
    2139                    num = len(bindparam.type.types) 
    2140                    dbtypes = inputsizes[bindparam] 
    2141                    generic_inputsizes.extend( 
    2142                        ( 
    2143                            ( 
    2144                                escaped_bind_names.get(paramname, paramname) 
    2145                                if escaped_bind_names is not None 
    2146                                else paramname 
    2147                            ), 
    2148                            dbtypes[idx % num], 
    2149                            bindparam.type.types[idx % num], 
    2150                        ) 
    2151                        for idx, paramname in enumerate( 
    2152                            self._expanded_parameters[key] 
    2153                        ) 
    2154                    ) 
    2155                else: 
    2156                    dbtype = inputsizes.get(bindparam, None) 
    2157                    generic_inputsizes.extend( 
    2158                        ( 
    2159                            ( 
    2160                                escaped_bind_names.get(paramname, paramname) 
    2161                                if escaped_bind_names is not None 
    2162                                else paramname 
    2163                            ), 
    2164                            dbtype, 
    2165                            bindparam.type, 
    2166                        ) 
    2167                        for paramname in self._expanded_parameters[key] 
    2168                    ) 
    2169            else: 
    2170                dbtype = inputsizes.get(bindparam, None) 
    2171 
    2172                escaped_name = ( 
    2173                    escaped_bind_names.get(key, key) 
    2174                    if escaped_bind_names is not None 
    2175                    else key 
    2176                ) 
    2177 
    2178                generic_inputsizes.append( 
    2179                    (escaped_name, dbtype, bindparam.type) 
    2180                ) 
    2181 
    2182        return generic_inputsizes 
    2183 
    2184    def _exec_default(self, column, default, type_): 
    2185        if default.is_sequence: 
    2186            return self.fire_sequence(default, type_) 
    2187        elif default.is_callable: 
    2188            # this codepath is not normally used as it's inlined 
    2189            # into _process_execute_defaults 
    2190            self.current_column = column 
    2191            return default.arg(self) 
    2192        elif default.is_clause_element: 
    2193            return self._exec_default_clause_element(column, default, type_) 
    2194        else: 
    2195            # this codepath is not normally used as it's inlined 
    2196            # into _process_execute_defaults 
    2197            return default.arg 
    2198 
    2199    def _exec_default_clause_element(self, column, default, type_): 
    2200        # execute a default that's a complete clause element.  Here, we have 
    2201        # to re-implement a miniature version of the compile->parameters-> 
    2202        # cursor.execute() sequence, since we don't want to modify the state 
    2203        # of the connection  / result in progress or create new connection/ 
    2204        # result objects etc. 
    2205        # .. versionchanged:: 1.4 
    2206 
    2207        if not default._arg_is_typed: 
    2208            default_arg = expression.type_coerce(default.arg, type_) 
    2209        else: 
    2210            default_arg = default.arg 
    2211        compiled = expression.select(default_arg).compile(dialect=self.dialect) 
    2212        compiled_params = compiled.construct_params() 
    2213        processors = compiled._bind_processors 
    2214        if compiled.positional: 
    2215            parameters = self.dialect.execute_sequence_format( 
    2216                [ 
    2217                    ( 
    2218                        processors[key](compiled_params[key])  # type: ignore 
    2219                        if key in processors 
    2220                        else compiled_params[key] 
    2221                    ) 
    2222                    for key in compiled.positiontup or () 
    2223                ] 
    2224            ) 
    2225        else: 
    2226            parameters = { 
    2227                key: ( 
    2228                    processors[key](compiled_params[key])  # type: ignore 
    2229                    if key in processors 
    2230                    else compiled_params[key] 
    2231                ) 
    2232                for key in compiled_params 
    2233            } 
    2234        return self._execute_scalar( 
    2235            str(compiled), type_, parameters=parameters 
    2236        ) 
    2237 
    2238    current_parameters: Optional[_CoreSingleExecuteParams] = None 
    2239    """A dictionary of parameters applied to the current row. 
    2240 
    2241    This attribute is only available in the context of a user-defined default 
    2242    generation function, e.g. as described at :ref:`context_default_functions`. 
    2243    It consists of a dictionary which includes entries for each column/value 
    2244    pair that is to be part of the INSERT or UPDATE statement. The keys of the 
    2245    dictionary will be the key value of each :class:`_schema.Column`, 
    2246    which is usually 
    2247    synonymous with the name. 
    2248 
    2249    Note that the :attr:`.DefaultExecutionContext.current_parameters` attribute 
    2250    does not accommodate for the "multi-values" feature of the 
    2251    :meth:`_expression.Insert.values` method.  The 
    2252    :meth:`.DefaultExecutionContext.get_current_parameters` method should be 
    2253    preferred. 
    2254 
    2255    .. seealso:: 
    2256 
    2257        :meth:`.DefaultExecutionContext.get_current_parameters` 
    2258 
    2259        :ref:`context_default_functions` 
    2260 
    2261    """ 
    2262 
    2263    def get_current_parameters(self, isolate_multiinsert_groups=True): 
    2264        """Return a dictionary of parameters applied to the current row. 
    2265 
    2266        This method can only be used in the context of a user-defined default 
    2267        generation function, e.g. as described at 
    2268        :ref:`context_default_functions`. When invoked, a dictionary is 
    2269        returned which includes entries for each column/value pair that is part 
    2270        of the INSERT or UPDATE statement. The keys of the dictionary will be 
    2271        the key value of each :class:`_schema.Column`, 
    2272        which is usually synonymous 
    2273        with the name. 
    2274 
    2275        :param isolate_multiinsert_groups=True: indicates that multi-valued 
    2276         INSERT constructs created using :meth:`_expression.Insert.values` 
    2277         should be 
    2278         handled by returning only the subset of parameters that are local 
    2279         to the current column default invocation.   When ``False``, the 
    2280         raw parameters of the statement are returned including the 
    2281         naming convention used in the case of multi-valued INSERT. 
    2282 
    2283        .. seealso:: 
    2284 
    2285            :attr:`.DefaultExecutionContext.current_parameters` 
    2286 
    2287            :ref:`context_default_functions` 
    2288 
    2289        """ 
    2290        try: 
    2291            parameters = self.current_parameters 
    2292            column = self.current_column 
    2293        except AttributeError: 
    2294            raise exc.InvalidRequestError( 
    2295                "get_current_parameters() can only be invoked in the " 
    2296                "context of a Python side column default function" 
    2297            ) 
    2298        else: 
    2299            assert column is not None 
    2300            assert parameters is not None 
    2301        compile_state = cast( 
    2302            "DMLState", cast(SQLCompiler, self.compiled).compile_state 
    2303        ) 
    2304        assert compile_state is not None 
    2305        if ( 
    2306            isolate_multiinsert_groups 
    2307            and dml.isinsert(compile_state) 
    2308            and compile_state._has_multi_parameters 
    2309        ): 
    2310            if column._is_multiparam_column: 
    2311                index = column.index + 1 
    2312                d = {column.original.key: parameters[column.key]} 
    2313            else: 
    2314                d = {column.key: parameters[column.key]} 
    2315                index = 0 
    2316            assert compile_state._dict_parameters is not None 
    2317            keys = compile_state._dict_parameters.keys() 
    2318            d.update( 
    2319                (key, parameters["%s_m%d" % (key, index)]) for key in keys 
    2320            ) 
    2321            return d 
    2322        else: 
    2323            return parameters 
    2324 
    2325    def get_insert_default(self, column): 
    2326        if column.default is None: 
    2327            return None 
    2328        else: 
    2329            return self._exec_default(column, column.default, column.type) 
    2330 
    2331    def get_update_default(self, column): 
    2332        if column.onupdate is None: 
    2333            return None 
    2334        else: 
    2335            return self._exec_default(column, column.onupdate, column.type) 
    2336 
    2337    def _process_execute_defaults(self): 
    2338        compiled = cast(SQLCompiler, self.compiled) 
    2339 
    2340        key_getter = compiled._within_exec_param_key_getter 
    2341 
    2342        sentinel_counter = 0 
    2343 
    2344        if compiled.insert_prefetch: 
    2345            prefetch_recs = [ 
    2346                ( 
    2347                    c, 
    2348                    key_getter(c), 
    2349                    c._default_description_tuple, 
    2350                    self.get_insert_default, 
    2351                ) 
    2352                for c in compiled.insert_prefetch 
    2353            ] 
    2354        elif compiled.update_prefetch: 
    2355            prefetch_recs = [ 
    2356                ( 
    2357                    c, 
    2358                    key_getter(c), 
    2359                    c._onupdate_description_tuple, 
    2360                    self.get_update_default, 
    2361                ) 
    2362                for c in compiled.update_prefetch 
    2363            ] 
    2364        else: 
    2365            prefetch_recs = [] 
    2366 
    2367        for param in self.compiled_parameters: 
    2368            self.current_parameters = param 
    2369 
    2370            for ( 
    2371                c, 
    2372                param_key, 
    2373                (arg, is_scalar, is_callable, is_sentinel), 
    2374                fallback, 
    2375            ) in prefetch_recs: 
    2376                if is_sentinel: 
    2377                    param[param_key] = sentinel_counter 
    2378                    sentinel_counter += 1 
    2379                elif is_scalar: 
    2380                    param[param_key] = arg 
    2381                elif is_callable: 
    2382                    self.current_column = c 
    2383                    param[param_key] = arg(self) 
    2384                else: 
    2385                    val = fallback(c) 
    2386                    if val is not None: 
    2387                        param[param_key] = val 
    2388 
    2389        del self.current_parameters 
    2390 
    2391 
    2392DefaultDialect.execution_ctx_cls = DefaultExecutionContext