1# orm/bulk_persistence.py 
    2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 
    3# <see AUTHORS file> 
    4# 
    5# This module is part of SQLAlchemy and is released under 
    6# the MIT License: https://www.opensource.org/licenses/mit-license.php 
    7# mypy: ignore-errors 
    8 
    9 
    10"""additional ORM persistence classes related to "bulk" operations, 
    11specifically outside of the flush() process. 
    12 
    13""" 
    14 
    15from __future__ import annotations 
    16 
    17from typing import Any 
    18from typing import cast 
    19from typing import Dict 
    20from typing import Iterable 
    21from typing import Literal 
    22from typing import Optional 
    23from typing import overload 
    24from typing import TYPE_CHECKING 
    25from typing import TypeVar 
    26from typing import Union 
    27 
    28from . import attributes 
    29from . import context 
    30from . import evaluator 
    31from . import exc as orm_exc 
    32from . import loading 
    33from . import persistence 
    34from .base import NO_VALUE 
    35from .context import _AbstractORMCompileState 
    36from .context import _ORMFromStatementCompileState 
    37from .context import FromStatement 
    38from .context import QueryContext 
    39from .interfaces import PropComparator 
    40from .. import exc as sa_exc 
    41from .. import util 
    42from ..engine import Dialect 
    43from ..engine import result as _result 
    44from ..sql import coercions 
    45from ..sql import dml 
    46from ..sql import expression 
    47from ..sql import roles 
    48from ..sql import select 
    49from ..sql import sqltypes 
    50from ..sql.base import _entity_namespace_key 
    51from ..sql.base import CompileState 
    52from ..sql.base import Options 
    53from ..sql.dml import DeleteDMLState 
    54from ..sql.dml import InsertDMLState 
    55from ..sql.dml import UpdateDMLState 
    56from ..util import EMPTY_DICT 
    57from ..util.typing import TupleAny 
    58from ..util.typing import Unpack 
    59 
    60if TYPE_CHECKING: 
    61    from ._typing import DMLStrategyArgument 
    62    from ._typing import OrmExecuteOptionsParameter 
    63    from ._typing import SynchronizeSessionArgument 
    64    from .mapper import Mapper 
    65    from .session import _BindArguments 
    66    from .session import ORMExecuteState 
    67    from .session import Session 
    68    from .session import SessionTransaction 
    69    from .state import InstanceState 
    70    from ..engine import Connection 
    71    from ..engine import cursor 
    72    from ..engine.interfaces import _CoreAnyExecuteParams 
    73 
    74_O = TypeVar("_O", bound=object) 
    75 
    76 
    77@overload 
    78def _bulk_insert( 
    79    mapper: Mapper[_O], 
    80    mappings: Union[Iterable[InstanceState[_O]], Iterable[Dict[str, Any]]], 
    81    session_transaction: SessionTransaction, 
    82    *, 
    83    isstates: bool, 
    84    return_defaults: bool, 
    85    render_nulls: bool, 
    86    use_orm_insert_stmt: Literal[None] = ..., 
    87    execution_options: Optional[OrmExecuteOptionsParameter] = ..., 
    88) -> None: ... 
    89 
    90 
    91@overload 
    92def _bulk_insert( 
    93    mapper: Mapper[_O], 
    94    mappings: Union[Iterable[InstanceState[_O]], Iterable[Dict[str, Any]]], 
    95    session_transaction: SessionTransaction, 
    96    *, 
    97    isstates: bool, 
    98    return_defaults: bool, 
    99    render_nulls: bool, 
    100    use_orm_insert_stmt: Optional[dml.Insert] = ..., 
    101    execution_options: Optional[OrmExecuteOptionsParameter] = ..., 
    102) -> cursor.CursorResult[Any]: ... 
    103 
    104 
    105def _bulk_insert( 
    106    mapper: Mapper[_O], 
    107    mappings: Union[Iterable[InstanceState[_O]], Iterable[Dict[str, Any]]], 
    108    session_transaction: SessionTransaction, 
    109    *, 
    110    isstates: bool, 
    111    return_defaults: bool, 
    112    render_nulls: bool, 
    113    use_orm_insert_stmt: Optional[dml.Insert] = None, 
    114    execution_options: Optional[OrmExecuteOptionsParameter] = None, 
    115) -> Optional[cursor.CursorResult[Any]]: 
    116    base_mapper = mapper.base_mapper 
    117 
    118    if session_transaction.session.connection_callable: 
    119        raise NotImplementedError( 
    120            "connection_callable / per-instance sharding " 
    121            "not supported in bulk_insert()" 
    122        ) 
    123 
    124    if isstates: 
    125        if TYPE_CHECKING: 
    126            mappings = cast(Iterable[InstanceState[_O]], mappings) 
    127 
    128        if return_defaults: 
    129            # list of states allows us to attach .key for return_defaults case 
    130            states = [(state, state.dict) for state in mappings] 
    131            mappings = [dict_ for (state, dict_) in states] 
    132        else: 
    133            mappings = [state.dict for state in mappings] 
    134    else: 
    135        if TYPE_CHECKING: 
    136            mappings = cast(Iterable[Dict[str, Any]], mappings) 
    137 
    138        if return_defaults: 
    139            # use dictionaries given, so that newly populated defaults 
    140            # can be delivered back to the caller (see #11661). This is **not** 
    141            # compatible with other use cases such as a session-executed 
    142            # insert() construct, as this will confuse the case of 
    143            # insert-per-subclass for joined inheritance cases (see 
    144            # test_bulk_statements.py::BulkDMLReturningJoinedInhTest). 
    145            # 
    146            # So in this conditional, we have **only** called 
    147            # session.bulk_insert_mappings() which does not have this 
    148            # requirement 
    149            mappings = list(mappings) 
    150        else: 
    151            # for all other cases we need to establish a local dictionary 
    152            # so that the incoming dictionaries aren't mutated 
    153            mappings = [dict(m) for m in mappings] 
    154        _expand_other_attrs(mapper, mappings) 
    155 
    156    connection = session_transaction.connection(base_mapper) 
    157 
    158    return_result: Optional[cursor.CursorResult[Any]] = None 
    159 
    160    mappers_to_run = [ 
    161        (table, mp) 
    162        for table, mp in base_mapper._sorted_tables.items() 
    163        if table in mapper._pks_by_table 
    164    ] 
    165 
    166    if return_defaults: 
    167        # not used by new-style bulk inserts, only used for legacy 
    168        bookkeeping = True 
    169    elif len(mappers_to_run) > 1: 
    170        # if we have more than one table, mapper to run where we will be 
    171        # either horizontally splicing, or copying values between tables, 
    172        # we need the "bookkeeping" / deterministic returning order 
    173        bookkeeping = True 
    174    else: 
    175        bookkeeping = False 
    176 
    177    for table, super_mapper in mappers_to_run: 
    178        # find bindparams in the statement. For bulk, we don't really know if 
    179        # a key in the params applies to a different table since we are 
    180        # potentially inserting for multiple tables here; looking at the 
    181        # bindparam() is a lot more direct.   in most cases this will 
    182        # use _generate_cache_key() which is memoized, although in practice 
    183        # the ultimate statement that's executed is probably not the same 
    184        # object so that memoization might not matter much. 
    185        extra_bp_names = ( 
    186            [ 
    187                b.key 
    188                for b in use_orm_insert_stmt._get_embedded_bindparams() 
    189                if b.key in mappings[0] 
    190            ] 
    191            if use_orm_insert_stmt is not None 
    192            else () 
    193        ) 
    194 
    195        records = ( 
    196            ( 
    197                None, 
    198                state_dict, 
    199                params, 
    200                mapper, 
    201                connection, 
    202                value_params, 
    203                has_all_pks, 
    204                has_all_defaults, 
    205            ) 
    206            for ( 
    207                state, 
    208                state_dict, 
    209                params, 
    210                mp, 
    211                conn, 
    212                value_params, 
    213                has_all_pks, 
    214                has_all_defaults, 
    215            ) in persistence._collect_insert_commands( 
    216                table, 
    217                ((None, mapping, mapper, connection) for mapping in mappings), 
    218                bulk=True, 
    219                return_defaults=bookkeeping, 
    220                render_nulls=render_nulls, 
    221                include_bulk_keys=extra_bp_names, 
    222            ) 
    223        ) 
    224 
    225        result = persistence._emit_insert_statements( 
    226            base_mapper, 
    227            None, 
    228            super_mapper, 
    229            table, 
    230            records, 
    231            bookkeeping=bookkeeping, 
    232            use_orm_insert_stmt=use_orm_insert_stmt, 
    233            execution_options=execution_options, 
    234        ) 
    235        if use_orm_insert_stmt is not None: 
    236            if not use_orm_insert_stmt._returning or return_result is None: 
    237                return_result = result 
    238            elif result.returns_rows: 
    239                assert bookkeeping 
    240                return_result = return_result.splice_horizontally(result) 
    241 
    242    if return_defaults and isstates: 
    243        identity_cls = mapper._identity_class 
    244        identity_props = [p.key for p in mapper._identity_key_props] 
    245        for state, dict_ in states: 
    246            state.key = ( 
    247                identity_cls, 
    248                tuple([dict_[key] for key in identity_props]), 
    249                None, 
    250            ) 
    251 
    252    if use_orm_insert_stmt is not None: 
    253        assert return_result is not None 
    254        return return_result 
    255 
    256 
    257@overload 
    258def _bulk_update( 
    259    mapper: Mapper[Any], 
    260    mappings: Union[Iterable[InstanceState[_O]], Iterable[Dict[str, Any]]], 
    261    session_transaction: SessionTransaction, 
    262    *, 
    263    isstates: bool, 
    264    update_changed_only: bool, 
    265    use_orm_update_stmt: Literal[None] = ..., 
    266    enable_check_rowcount: bool = True, 
    267) -> None: ... 
    268 
    269 
    270@overload 
    271def _bulk_update( 
    272    mapper: Mapper[Any], 
    273    mappings: Union[Iterable[InstanceState[_O]], Iterable[Dict[str, Any]]], 
    274    session_transaction: SessionTransaction, 
    275    *, 
    276    isstates: bool, 
    277    update_changed_only: bool, 
    278    use_orm_update_stmt: Optional[dml.Update] = ..., 
    279    enable_check_rowcount: bool = True, 
    280) -> _result.Result[Unpack[TupleAny]]: ... 
    281 
    282 
    283def _bulk_update( 
    284    mapper: Mapper[Any], 
    285    mappings: Union[Iterable[InstanceState[_O]], Iterable[Dict[str, Any]]], 
    286    session_transaction: SessionTransaction, 
    287    *, 
    288    isstates: bool, 
    289    update_changed_only: bool, 
    290    use_orm_update_stmt: Optional[dml.Update] = None, 
    291    enable_check_rowcount: bool = True, 
    292) -> Optional[_result.Result[Unpack[TupleAny]]]: 
    293    base_mapper = mapper.base_mapper 
    294 
    295    search_keys = mapper._primary_key_propkeys 
    296    if mapper._version_id_prop: 
    297        search_keys = {mapper._version_id_prop.key}.union(search_keys) 
    298 
    299    def _changed_dict(mapper, state): 
    300        return { 
    301            k: v 
    302            for k, v in state.dict.items() 
    303            if k in state.committed_state or k in search_keys 
    304        } 
    305 
    306    if isstates: 
    307        if update_changed_only: 
    308            mappings = [_changed_dict(mapper, state) for state in mappings] 
    309        else: 
    310            mappings = [state.dict for state in mappings] 
    311    else: 
    312        mappings = [dict(m) for m in mappings] 
    313        _expand_other_attrs(mapper, mappings) 
    314 
    315    if session_transaction.session.connection_callable: 
    316        raise NotImplementedError( 
    317            "connection_callable / per-instance sharding " 
    318            "not supported in bulk_update()" 
    319        ) 
    320 
    321    connection = session_transaction.connection(base_mapper) 
    322 
    323    # find bindparams in the statement. see _bulk_insert for similar 
    324    # notes for the insert case 
    325    extra_bp_names = ( 
    326        [ 
    327            b.key 
    328            for b in use_orm_update_stmt._get_embedded_bindparams() 
    329            if b.key in mappings[0] 
    330        ] 
    331        if use_orm_update_stmt is not None 
    332        else () 
    333    ) 
    334 
    335    for table, super_mapper in base_mapper._sorted_tables.items(): 
    336        if not mapper.isa(super_mapper) or table not in mapper._pks_by_table: 
    337            continue 
    338 
    339        records = persistence._collect_update_commands( 
    340            None, 
    341            table, 
    342            ( 
    343                ( 
    344                    None, 
    345                    mapping, 
    346                    mapper, 
    347                    connection, 
    348                    ( 
    349                        mapping[mapper._version_id_prop.key] 
    350                        if mapper._version_id_prop 
    351                        else None 
    352                    ), 
    353                ) 
    354                for mapping in mappings 
    355            ), 
    356            bulk=True, 
    357            use_orm_update_stmt=use_orm_update_stmt, 
    358            include_bulk_keys=extra_bp_names, 
    359        ) 
    360        persistence._emit_update_statements( 
    361            base_mapper, 
    362            None, 
    363            super_mapper, 
    364            table, 
    365            records, 
    366            bookkeeping=False, 
    367            use_orm_update_stmt=use_orm_update_stmt, 
    368            enable_check_rowcount=enable_check_rowcount, 
    369        ) 
    370 
    371    if use_orm_update_stmt is not None: 
    372        return _result.null_result() 
    373 
    374 
    375def _expand_other_attrs( 
    376    mapper: Mapper[Any], mappings: Iterable[Dict[str, Any]] 
    377) -> None: 
    378    all_attrs = mapper.all_orm_descriptors 
    379 
    380    attr_keys = set(all_attrs.keys()) 
    381 
    382    bulk_dml_setters = { 
    383        key: setter 
    384        for key, setter in ( 
    385            (key, attr._bulk_dml_setter(key)) 
    386            for key, attr in ( 
    387                (key, _entity_namespace_key(mapper, key, default=NO_VALUE)) 
    388                for key in attr_keys 
    389            ) 
    390            if attr is not NO_VALUE and isinstance(attr, PropComparator) 
    391        ) 
    392        if setter is not None 
    393    } 
    394    setters_todo = set(bulk_dml_setters) 
    395    if not setters_todo: 
    396        return 
    397 
    398    for mapping in mappings: 
    399        for key in setters_todo.intersection(mapping): 
    400            bulk_dml_setters[key](mapping) 
    401 
    402 
    403class _ORMDMLState(_AbstractORMCompileState): 
    404    is_dml_returning = True 
    405    from_statement_ctx: Optional[_ORMFromStatementCompileState] = None 
    406 
    407    @classmethod 
    408    def _get_orm_crud_kv_pairs( 
    409        cls, mapper, statement, kv_iterator, needs_to_be_cacheable 
    410    ): 
    411        core_get_crud_kv_pairs = UpdateDMLState._get_crud_kv_pairs 
    412 
    413        for k, v in kv_iterator: 
    414            k = coercions.expect(roles.DMLColumnRole, k) 
    415 
    416            if isinstance(k, str): 
    417                desc = _entity_namespace_key(mapper, k, default=NO_VALUE) 
    418                if not isinstance(desc, PropComparator): 
    419                    yield ( 
    420                        coercions.expect(roles.DMLColumnRole, k), 
    421                        ( 
    422                            coercions.expect( 
    423                                roles.ExpressionElementRole, 
    424                                v, 
    425                                type_=sqltypes.NullType(), 
    426                                is_crud=True, 
    427                            ) 
    428                            if needs_to_be_cacheable 
    429                            else v 
    430                        ), 
    431                    ) 
    432                else: 
    433                    yield from core_get_crud_kv_pairs( 
    434                        statement, 
    435                        desc._bulk_update_tuples(v), 
    436                        needs_to_be_cacheable, 
    437                    ) 
    438            elif "entity_namespace" in k._annotations: 
    439                k_anno = k._annotations 
    440                attr = _entity_namespace_key( 
    441                    k_anno["entity_namespace"], k_anno["proxy_key"] 
    442                ) 
    443                assert isinstance(attr, PropComparator) 
    444                yield from core_get_crud_kv_pairs( 
    445                    statement, 
    446                    attr._bulk_update_tuples(v), 
    447                    needs_to_be_cacheable, 
    448                ) 
    449            else: 
    450                yield ( 
    451                    k, 
    452                    ( 
    453                        v 
    454                        if not needs_to_be_cacheable 
    455                        else coercions.expect( 
    456                            roles.ExpressionElementRole, 
    457                            v, 
    458                            type_=sqltypes.NullType(), 
    459                            is_crud=True, 
    460                        ) 
    461                    ), 
    462                ) 
    463 
    464    @classmethod 
    465    def _get_dml_plugin_subject(cls, statement): 
    466        plugin_subject = statement.table._propagate_attrs.get("plugin_subject") 
    467 
    468        if ( 
    469            not plugin_subject 
    470            or not plugin_subject.mapper 
    471            or plugin_subject 
    472            is not statement._propagate_attrs["plugin_subject"] 
    473        ): 
    474            return None 
    475        return plugin_subject 
    476 
    477    @classmethod 
    478    def _get_multi_crud_kv_pairs(cls, statement, kv_iterator): 
    479        plugin_subject = cls._get_dml_plugin_subject(statement) 
    480 
    481        if not plugin_subject: 
    482            return UpdateDMLState._get_multi_crud_kv_pairs( 
    483                statement, kv_iterator 
    484            ) 
    485 
    486        return [ 
    487            dict( 
    488                cls._get_orm_crud_kv_pairs( 
    489                    plugin_subject.mapper, statement, value_dict.items(), False 
    490                ) 
    491            ) 
    492            for value_dict in kv_iterator 
    493        ] 
    494 
    495    @classmethod 
    496    def _get_crud_kv_pairs(cls, statement, kv_iterator, needs_to_be_cacheable): 
    497        assert ( 
    498            needs_to_be_cacheable 
    499        ), "no test coverage for needs_to_be_cacheable=False" 
    500 
    501        plugin_subject = cls._get_dml_plugin_subject(statement) 
    502 
    503        if not plugin_subject: 
    504            return UpdateDMLState._get_crud_kv_pairs( 
    505                statement, kv_iterator, needs_to_be_cacheable 
    506            ) 
    507        return list( 
    508            cls._get_orm_crud_kv_pairs( 
    509                plugin_subject.mapper, 
    510                statement, 
    511                kv_iterator, 
    512                needs_to_be_cacheable, 
    513            ) 
    514        ) 
    515 
    516    @classmethod 
    517    def get_entity_description(cls, statement): 
    518        ext_info = statement.table._annotations["parententity"] 
    519        mapper = ext_info.mapper 
    520        if ext_info.is_aliased_class: 
    521            _label_name = ext_info.name 
    522        else: 
    523            _label_name = mapper.class_.__name__ 
    524 
    525        return { 
    526            "name": _label_name, 
    527            "type": mapper.class_, 
    528            "expr": ext_info.entity, 
    529            "entity": ext_info.entity, 
    530            "table": mapper.local_table, 
    531        } 
    532 
    533    @classmethod 
    534    def get_returning_column_descriptions(cls, statement): 
    535        def _ent_for_col(c): 
    536            return c._annotations.get("parententity", None) 
    537 
    538        def _attr_for_col(c, ent): 
    539            if ent is None: 
    540                return c 
    541            proxy_key = c._annotations.get("proxy_key", None) 
    542            if not proxy_key: 
    543                return c 
    544            else: 
    545                return getattr(ent.entity, proxy_key, c) 
    546 
    547        return [ 
    548            { 
    549                "name": c.key, 
    550                "type": c.type, 
    551                "expr": _attr_for_col(c, ent), 
    552                "aliased": ent.is_aliased_class, 
    553                "entity": ent.entity, 
    554            } 
    555            for c, ent in [ 
    556                (c, _ent_for_col(c)) for c in statement._all_selected_columns 
    557            ] 
    558        ] 
    559 
    560    def _setup_orm_returning( 
    561        self, 
    562        compiler, 
    563        orm_level_statement, 
    564        dml_level_statement, 
    565        dml_mapper, 
    566        *, 
    567        use_supplemental_cols=True, 
    568    ): 
    569        """establish ORM column handlers for an INSERT, UPDATE, or DELETE 
    570        which uses explicit returning(). 
    571 
    572        called within compilation level create_for_statement. 
    573 
    574        The _return_orm_returning() method then receives the Result 
    575        after the statement was executed, and applies ORM loading to the 
    576        state that we first established here. 
    577 
    578        """ 
    579 
    580        if orm_level_statement._returning: 
    581            fs = FromStatement( 
    582                orm_level_statement._returning, 
    583                dml_level_statement, 
    584                _adapt_on_names=False, 
    585            ) 
    586            fs = fs.execution_options(**orm_level_statement._execution_options) 
    587            fs = fs.options(*orm_level_statement._with_options) 
    588            self.select_statement = fs 
    589            self.from_statement_ctx = fsc = ( 
    590                _ORMFromStatementCompileState.create_for_statement( 
    591                    fs, compiler 
    592                ) 
    593            ) 
    594            fsc.setup_dml_returning_compile_state(dml_mapper) 
    595 
    596            dml_level_statement = dml_level_statement._generate() 
    597            dml_level_statement._returning = () 
    598 
    599            cols_to_return = [c for c in fsc.primary_columns if c is not None] 
    600 
    601            # since we are splicing result sets together, make sure there 
    602            # are columns of some kind returned in each result set 
    603            if not cols_to_return: 
    604                cols_to_return.extend(dml_mapper.primary_key) 
    605 
    606            if use_supplemental_cols: 
    607                dml_level_statement = dml_level_statement.return_defaults( 
    608                    # this is a little weird looking, but by passing 
    609                    # primary key as the main list of cols, this tells 
    610                    # return_defaults to omit server-default cols (and 
    611                    # actually all cols, due to some weird thing we should 
    612                    # clean up in crud.py). 
    613                    # Since we have cols_to_return, just return what we asked 
    614                    # for (plus primary key, which ORM persistence needs since 
    615                    # we likely set bookkeeping=True here, which is another 
    616                    # whole thing...).   We dont want to clutter the 
    617                    # statement up with lots of other cols the user didn't 
    618                    # ask for.  see #9685 
    619                    *dml_mapper.primary_key, 
    620                    supplemental_cols=cols_to_return, 
    621                ) 
    622            else: 
    623                dml_level_statement = dml_level_statement.returning( 
    624                    *cols_to_return 
    625                ) 
    626 
    627        return dml_level_statement 
    628 
    629    @classmethod 
    630    def _return_orm_returning( 
    631        cls, 
    632        session, 
    633        statement, 
    634        params, 
    635        execution_options, 
    636        bind_arguments, 
    637        result, 
    638    ): 
    639        execution_context = result.context 
    640        compile_state = execution_context.compiled.compile_state 
    641 
    642        if ( 
    643            compile_state.from_statement_ctx 
    644            and not compile_state.from_statement_ctx.compile_options._is_star 
    645        ): 
    646            load_options = execution_options.get( 
    647                "_sa_orm_load_options", QueryContext.default_load_options 
    648            ) 
    649 
    650            querycontext = QueryContext( 
    651                compile_state.from_statement_ctx, 
    652                compile_state.select_statement, 
    653                statement, 
    654                params, 
    655                session, 
    656                load_options, 
    657                execution_options, 
    658                bind_arguments, 
    659            ) 
    660            return loading.instances(result, querycontext) 
    661        else: 
    662            return result 
    663 
    664 
    665class _BulkUDCompileState(_ORMDMLState): 
    666    class default_update_options(Options): 
    667        _dml_strategy: DMLStrategyArgument = "auto" 
    668        _synchronize_session: SynchronizeSessionArgument = "auto" 
    669        _can_use_returning: bool = False 
    670        _is_delete_using: bool = False 
    671        _is_update_from: bool = False 
    672        _autoflush: bool = True 
    673        _subject_mapper: Optional[Mapper[Any]] = None 
    674        _resolved_values = EMPTY_DICT 
    675        _eval_condition = None 
    676        _matched_rows = None 
    677        _identity_token = None 
    678        _populate_existing: bool = False 
    679 
    680    @classmethod 
    681    def can_use_returning( 
    682        cls, 
    683        dialect: Dialect, 
    684        mapper: Mapper[Any], 
    685        *, 
    686        is_multitable: bool = False, 
    687        is_update_from: bool = False, 
    688        is_delete_using: bool = False, 
    689        is_executemany: bool = False, 
    690    ) -> bool: 
    691        raise NotImplementedError() 
    692 
    693    @classmethod 
    694    def orm_pre_session_exec( 
    695        cls, 
    696        session, 
    697        statement, 
    698        params, 
    699        execution_options, 
    700        bind_arguments, 
    701        is_pre_event, 
    702    ): 
    703        ( 
    704            update_options, 
    705            execution_options, 
    706        ) = _BulkUDCompileState.default_update_options.from_execution_options( 
    707            "_sa_orm_update_options", 
    708            { 
    709                "synchronize_session", 
    710                "autoflush", 
    711                "populate_existing", 
    712                "identity_token", 
    713                "is_delete_using", 
    714                "is_update_from", 
    715                "dml_strategy", 
    716            }, 
    717            execution_options, 
    718            statement._execution_options, 
    719        ) 
    720        bind_arguments["clause"] = statement 
    721        try: 
    722            plugin_subject = statement._propagate_attrs["plugin_subject"] 
    723        except KeyError: 
    724            assert False, "statement had 'orm' plugin but no plugin_subject" 
    725        else: 
    726            if plugin_subject: 
    727                bind_arguments["mapper"] = plugin_subject.mapper 
    728                update_options += {"_subject_mapper": plugin_subject.mapper} 
    729 
    730        if "parententity" not in statement.table._annotations: 
    731            update_options += {"_dml_strategy": "core_only"} 
    732        elif not isinstance(params, list): 
    733            if update_options._dml_strategy == "auto": 
    734                update_options += {"_dml_strategy": "orm"} 
    735            elif update_options._dml_strategy == "bulk": 
    736                raise sa_exc.InvalidRequestError( 
    737                    'Can\'t use "bulk" ORM insert strategy without ' 
    738                    "passing separate parameters" 
    739                ) 
    740        else: 
    741            if update_options._dml_strategy == "auto": 
    742                update_options += {"_dml_strategy": "bulk"} 
    743 
    744        sync = update_options._synchronize_session 
    745        if sync is not None: 
    746            if sync not in ("auto", "evaluate", "fetch", False): 
    747                raise sa_exc.ArgumentError( 
    748                    "Valid strategies for session synchronization " 
    749                    "are 'auto', 'evaluate', 'fetch', False" 
    750                ) 
    751            if update_options._dml_strategy == "bulk" and sync == "fetch": 
    752                raise sa_exc.InvalidRequestError( 
    753                    "The 'fetch' synchronization strategy is not available " 
    754                    "for 'bulk' ORM updates (i.e. multiple parameter sets)" 
    755                ) 
    756 
    757        if not is_pre_event: 
    758            if update_options._autoflush: 
    759                session._autoflush() 
    760 
    761            if update_options._dml_strategy == "orm": 
    762                if update_options._synchronize_session == "auto": 
    763                    update_options = cls._do_pre_synchronize_auto( 
    764                        session, 
    765                        statement, 
    766                        params, 
    767                        execution_options, 
    768                        bind_arguments, 
    769                        update_options, 
    770                    ) 
    771                elif update_options._synchronize_session == "evaluate": 
    772                    update_options = cls._do_pre_synchronize_evaluate( 
    773                        session, 
    774                        statement, 
    775                        params, 
    776                        execution_options, 
    777                        bind_arguments, 
    778                        update_options, 
    779                    ) 
    780                elif update_options._synchronize_session == "fetch": 
    781                    update_options = cls._do_pre_synchronize_fetch( 
    782                        session, 
    783                        statement, 
    784                        params, 
    785                        execution_options, 
    786                        bind_arguments, 
    787                        update_options, 
    788                    ) 
    789            elif update_options._dml_strategy == "bulk": 
    790                if update_options._synchronize_session == "auto": 
    791                    update_options += {"_synchronize_session": "evaluate"} 
    792 
    793            # indicators from the "pre exec" step that are then 
    794            # added to the DML statement, which will also be part of the cache 
    795            # key.  The compile level create_for_statement() method will then 
    796            # consume these at compiler time. 
    797            statement = statement._annotate( 
    798                { 
    799                    "synchronize_session": update_options._synchronize_session, 
    800                    "is_delete_using": update_options._is_delete_using, 
    801                    "is_update_from": update_options._is_update_from, 
    802                    "dml_strategy": update_options._dml_strategy, 
    803                    "can_use_returning": update_options._can_use_returning, 
    804                } 
    805            ) 
    806 
    807        return ( 
    808            statement, 
    809            util.immutabledict(execution_options).union( 
    810                {"_sa_orm_update_options": update_options} 
    811            ), 
    812        ) 
    813 
    814    @classmethod 
    815    def orm_setup_cursor_result( 
    816        cls, 
    817        session, 
    818        statement, 
    819        params, 
    820        execution_options, 
    821        bind_arguments, 
    822        result, 
    823    ): 
    824        # this stage of the execution is called after the 
    825        # do_orm_execute event hook.  meaning for an extension like 
    826        # horizontal sharding, this step happens *within* the horizontal 
    827        # sharding event handler which calls session.execute() re-entrantly 
    828        # and will occur for each backend individually. 
    829        # the sharding extension then returns its own merged result from the 
    830        # individual ones we return here. 
    831 
    832        update_options = execution_options["_sa_orm_update_options"] 
    833        if update_options._dml_strategy == "orm": 
    834            if update_options._synchronize_session == "evaluate": 
    835                cls._do_post_synchronize_evaluate( 
    836                    session, statement, result, update_options 
    837                ) 
    838            elif update_options._synchronize_session == "fetch": 
    839                cls._do_post_synchronize_fetch( 
    840                    session, statement, result, update_options 
    841                ) 
    842        elif update_options._dml_strategy == "bulk": 
    843            if update_options._synchronize_session == "evaluate": 
    844                cls._do_post_synchronize_bulk_evaluate( 
    845                    session, params, result, update_options 
    846                ) 
    847            return result 
    848 
    849        return cls._return_orm_returning( 
    850            session, 
    851            statement, 
    852            params, 
    853            execution_options, 
    854            bind_arguments, 
    855            result, 
    856        ) 
    857 
    858    @classmethod 
    859    def _adjust_for_extra_criteria(cls, global_attributes, ext_info): 
    860        """Apply extra criteria filtering. 
    861 
    862        For all distinct single-table-inheritance mappers represented in the 
    863        table being updated or deleted, produce additional WHERE criteria such 
    864        that only the appropriate subtypes are selected from the total results. 
    865 
    866        Additionally, add WHERE criteria originating from LoaderCriteriaOptions 
    867        collected from the statement. 
    868 
    869        """ 
    870 
    871        return_crit = () 
    872 
    873        adapter = ext_info._adapter if ext_info.is_aliased_class else None 
    874 
    875        if ( 
    876            "additional_entity_criteria", 
    877            ext_info.mapper, 
    878        ) in global_attributes: 
    879            return_crit += tuple( 
    880                ae._resolve_where_criteria(ext_info) 
    881                for ae in global_attributes[ 
    882                    ("additional_entity_criteria", ext_info.mapper) 
    883                ] 
    884                if ae.include_aliases or ae.entity is ext_info 
    885            ) 
    886 
    887        if ext_info.mapper._single_table_criterion is not None: 
    888            return_crit += (ext_info.mapper._single_table_criterion,) 
    889 
    890        if adapter: 
    891            return_crit = tuple(adapter.traverse(crit) for crit in return_crit) 
    892 
    893        return return_crit 
    894 
    895    @classmethod 
    896    def _interpret_returning_rows(cls, result, mapper, rows): 
    897        """return rows that indicate PK cols in mapper.primary_key position 
    898        for RETURNING rows. 
    899 
    900        Prior to 2.0.36, this method seemed to be written for some kind of 
    901        inheritance scenario but the scenario was unused for actual joined 
    902        inheritance, and the function instead seemed to perform some kind of 
    903        partial translation that would remove non-PK cols if the PK cols 
    904        happened to be first in the row, but not otherwise.  The joined 
    905        inheritance walk feature here seems to have never been used as it was 
    906        always skipped by the "local_table" check. 
    907 
    908        As of 2.0.36 the function strips away non-PK cols and provides the 
    909        PK cols for the table in mapper PK order. 
    910 
    911        """ 
    912 
    913        try: 
    914            if mapper.local_table is not mapper.base_mapper.local_table: 
    915                # TODO: dive more into how a local table PK is used for fetch 
    916                # sync, not clear if this is correct as it depends on the 
    917                # downstream routine to fetch rows using 
    918                # local_table.primary_key order 
    919                pk_keys = result._tuple_getter(mapper.local_table.primary_key) 
    920            else: 
    921                pk_keys = result._tuple_getter(mapper.primary_key) 
    922        except KeyError: 
    923            # can't use these rows, they don't have PK cols in them 
    924            # this is an unusual case where the user would have used 
    925            # .return_defaults() 
    926            return [] 
    927 
    928        return [pk_keys(row) for row in rows] 
    929 
    930    @classmethod 
    931    def _get_matched_objects_on_criteria(cls, update_options, states): 
    932        mapper = update_options._subject_mapper 
    933        eval_condition = update_options._eval_condition 
    934 
    935        raw_data = [ 
    936            (state.obj(), state, state.dict) 
    937            for state in states 
    938            if state.mapper.isa(mapper) and not state.expired 
    939        ] 
    940 
    941        identity_token = update_options._identity_token 
    942        if identity_token is not None: 
    943            raw_data = [ 
    944                (obj, state, dict_) 
    945                for obj, state, dict_ in raw_data 
    946                if state.identity_token == identity_token 
    947            ] 
    948 
    949        result = [] 
    950        for obj, state, dict_ in raw_data: 
    951            evaled_condition = eval_condition(obj) 
    952 
    953            # caution: don't use "in ()" or == here, _EXPIRE_OBJECT 
    954            # evaluates as True for all comparisons 
    955            if ( 
    956                evaled_condition is True 
    957                or evaled_condition is evaluator._EXPIRED_OBJECT 
    958            ): 
    959                result.append( 
    960                    ( 
    961                        obj, 
    962                        state, 
    963                        dict_, 
    964                        evaled_condition is evaluator._EXPIRED_OBJECT, 
    965                    ) 
    966                ) 
    967        return result 
    968 
    969    @classmethod 
    970    def _eval_condition_from_statement(cls, update_options, statement): 
    971        mapper = update_options._subject_mapper 
    972        target_cls = mapper.class_ 
    973 
    974        evaluator_compiler = evaluator._EvaluatorCompiler(target_cls) 
    975        crit = () 
    976        if statement._where_criteria: 
    977            crit += statement._where_criteria 
    978 
    979        global_attributes = {} 
    980        for opt in statement._with_options: 
    981            if opt._is_criteria_option: 
    982                opt.get_global_criteria(global_attributes) 
    983 
    984        if global_attributes: 
    985            crit += cls._adjust_for_extra_criteria(global_attributes, mapper) 
    986 
    987        if crit: 
    988            eval_condition = evaluator_compiler.process(*crit) 
    989        else: 
    990            # workaround for mypy https://github.com/python/mypy/issues/14027 
    991            def _eval_condition(obj): 
    992                return True 
    993 
    994            eval_condition = _eval_condition 
    995 
    996        return eval_condition 
    997 
    998    @classmethod 
    999    def _do_pre_synchronize_auto( 
    1000        cls, 
    1001        session, 
    1002        statement, 
    1003        params, 
    1004        execution_options, 
    1005        bind_arguments, 
    1006        update_options, 
    1007    ): 
    1008        """setup auto sync strategy 
    1009 
    1010 
    1011        "auto" checks if we can use "evaluate" first, then falls back 
    1012        to "fetch" 
    1013 
    1014        evaluate is vastly more efficient for the common case 
    1015        where session is empty, only has a few objects, and the UPDATE 
    1016        statement can potentially match thousands/millions of rows. 
    1017 
    1018        OTOH more complex criteria that fails to work with "evaluate" 
    1019        we would hope usually correlates with fewer net rows. 
    1020 
    1021        """ 
    1022 
    1023        try: 
    1024            eval_condition = cls._eval_condition_from_statement( 
    1025                update_options, statement 
    1026            ) 
    1027 
    1028        except evaluator.UnevaluatableError: 
    1029            pass 
    1030        else: 
    1031            return update_options + { 
    1032                "_eval_condition": eval_condition, 
    1033                "_synchronize_session": "evaluate", 
    1034            } 
    1035 
    1036        update_options += {"_synchronize_session": "fetch"} 
    1037        return cls._do_pre_synchronize_fetch( 
    1038            session, 
    1039            statement, 
    1040            params, 
    1041            execution_options, 
    1042            bind_arguments, 
    1043            update_options, 
    1044        ) 
    1045 
    1046    @classmethod 
    1047    def _do_pre_synchronize_evaluate( 
    1048        cls, 
    1049        session, 
    1050        statement, 
    1051        params, 
    1052        execution_options, 
    1053        bind_arguments, 
    1054        update_options, 
    1055    ): 
    1056        try: 
    1057            eval_condition = cls._eval_condition_from_statement( 
    1058                update_options, statement 
    1059            ) 
    1060 
    1061        except evaluator.UnevaluatableError as err: 
    1062            raise sa_exc.InvalidRequestError( 
    1063                'Could not evaluate current criteria in Python: "%s". ' 
    1064                "Specify 'fetch' or False for the " 
    1065                "synchronize_session execution option." % err 
    1066            ) from err 
    1067 
    1068        return update_options + { 
    1069            "_eval_condition": eval_condition, 
    1070        } 
    1071 
    1072    @classmethod 
    1073    def _get_resolved_values(cls, mapper, statement): 
    1074        if statement._multi_values: 
    1075            return [] 
    1076        elif statement._values: 
    1077            return list(statement._values.items()) 
    1078        else: 
    1079            return [] 
    1080 
    1081    @classmethod 
    1082    def _resolved_keys_as_propnames(cls, mapper, resolved_values): 
    1083        values = [] 
    1084        for k, v in resolved_values: 
    1085            if mapper and isinstance(k, expression.ColumnElement): 
    1086                try: 
    1087                    attr = mapper._columntoproperty[k] 
    1088                except orm_exc.UnmappedColumnError: 
    1089                    pass 
    1090                else: 
    1091                    values.append((attr.key, v)) 
    1092            else: 
    1093                raise sa_exc.InvalidRequestError( 
    1094                    "Attribute name not found, can't be " 
    1095                    "synchronized back to objects: %r" % k 
    1096                ) 
    1097        return values 
    1098 
    1099    @classmethod 
    1100    def _do_pre_synchronize_fetch( 
    1101        cls, 
    1102        session, 
    1103        statement, 
    1104        params, 
    1105        execution_options, 
    1106        bind_arguments, 
    1107        update_options, 
    1108    ): 
    1109        mapper = update_options._subject_mapper 
    1110 
    1111        select_stmt = ( 
    1112            select(*(mapper.primary_key + (mapper.select_identity_token,))) 
    1113            .select_from(mapper) 
    1114            .options(*statement._with_options) 
    1115        ) 
    1116        select_stmt._where_criteria = statement._where_criteria 
    1117 
    1118        # conditionally run the SELECT statement for pre-fetch, testing the 
    1119        # "bind" for if we can use RETURNING or not using the do_orm_execute 
    1120        # event.  If RETURNING is available, the do_orm_execute event 
    1121        # will cancel the SELECT from being actually run. 
    1122        # 
    1123        # The way this is organized seems strange, why don't we just 
    1124        # call can_use_returning() before invoking the statement and get 
    1125        # answer?, why does this go through the whole execute phase using an 
    1126        # event?  Answer: because we are integrating with extensions such 
    1127        # as the horizontal sharding extention that "multiplexes" an individual 
    1128        # statement run through multiple engines, and it uses 
    1129        # do_orm_execute() to do that. 
    1130 
    1131        can_use_returning = None 
    1132 
    1133        def skip_for_returning(orm_context: ORMExecuteState) -> Any: 
    1134            bind = orm_context.session.get_bind(**orm_context.bind_arguments) 
    1135            nonlocal can_use_returning 
    1136 
    1137            per_bind_result = cls.can_use_returning( 
    1138                bind.dialect, 
    1139                mapper, 
    1140                is_update_from=update_options._is_update_from, 
    1141                is_delete_using=update_options._is_delete_using, 
    1142                is_executemany=orm_context.is_executemany, 
    1143            ) 
    1144 
    1145            if can_use_returning is not None: 
    1146                if can_use_returning != per_bind_result: 
    1147                    raise sa_exc.InvalidRequestError( 
    1148                        "For synchronize_session='fetch', can't mix multiple " 
    1149                        "backends where some support RETURNING and others " 
    1150                        "don't" 
    1151                    ) 
    1152            elif orm_context.is_executemany and not per_bind_result: 
    1153                raise sa_exc.InvalidRequestError( 
    1154                    "For synchronize_session='fetch', can't use multiple " 
    1155                    "parameter sets in ORM mode, which this backend does not " 
    1156                    "support with RETURNING" 
    1157                ) 
    1158            else: 
    1159                can_use_returning = per_bind_result 
    1160 
    1161            if per_bind_result: 
    1162                return _result.null_result() 
    1163            else: 
    1164                return None 
    1165 
    1166        result = session.execute( 
    1167            select_stmt, 
    1168            params, 
    1169            execution_options=execution_options, 
    1170            bind_arguments=bind_arguments, 
    1171            _add_event=skip_for_returning, 
    1172        ) 
    1173        matched_rows = result.fetchall() 
    1174 
    1175        return update_options + { 
    1176            "_matched_rows": matched_rows, 
    1177            "_can_use_returning": can_use_returning, 
    1178        } 
    1179 
    1180 
    1181@CompileState.plugin_for("orm", "insert") 
    1182class _BulkORMInsert(_ORMDMLState, InsertDMLState): 
    1183    class default_insert_options(Options): 
    1184        _dml_strategy: DMLStrategyArgument = "auto" 
    1185        _render_nulls: bool = False 
    1186        _return_defaults: bool = False 
    1187        _subject_mapper: Optional[Mapper[Any]] = None 
    1188        _autoflush: bool = True 
    1189        _populate_existing: bool = False 
    1190 
    1191    select_statement: Optional[FromStatement] = None 
    1192 
    1193    @classmethod 
    1194    def orm_pre_session_exec( 
    1195        cls, 
    1196        session, 
    1197        statement, 
    1198        params, 
    1199        execution_options, 
    1200        bind_arguments, 
    1201        is_pre_event, 
    1202    ): 
    1203        ( 
    1204            insert_options, 
    1205            execution_options, 
    1206        ) = _BulkORMInsert.default_insert_options.from_execution_options( 
    1207            "_sa_orm_insert_options", 
    1208            {"dml_strategy", "autoflush", "populate_existing", "render_nulls"}, 
    1209            execution_options, 
    1210            statement._execution_options, 
    1211        ) 
    1212        bind_arguments["clause"] = statement 
    1213        try: 
    1214            plugin_subject = statement._propagate_attrs["plugin_subject"] 
    1215        except KeyError: 
    1216            assert False, "statement had 'orm' plugin but no plugin_subject" 
    1217        else: 
    1218            if plugin_subject: 
    1219                bind_arguments["mapper"] = plugin_subject.mapper 
    1220                insert_options += {"_subject_mapper": plugin_subject.mapper} 
    1221 
    1222        if not params: 
    1223            if insert_options._dml_strategy == "auto": 
    1224                insert_options += {"_dml_strategy": "orm"} 
    1225            elif insert_options._dml_strategy == "bulk": 
    1226                raise sa_exc.InvalidRequestError( 
    1227                    'Can\'t use "bulk" ORM insert strategy without ' 
    1228                    "passing separate parameters" 
    1229                ) 
    1230        else: 
    1231            if insert_options._dml_strategy == "auto": 
    1232                insert_options += {"_dml_strategy": "bulk"} 
    1233 
    1234        if insert_options._dml_strategy != "raw": 
    1235            # for ORM object loading, like ORMContext, we have to disable 
    1236            # result set adapt_to_context, because we will be generating a 
    1237            # new statement with specific columns that's cached inside of 
    1238            # an ORMFromStatementCompileState, which we will re-use for 
    1239            # each result. 
    1240            if not execution_options: 
    1241                execution_options = context._orm_load_exec_options 
    1242            else: 
    1243                execution_options = execution_options.union( 
    1244                    context._orm_load_exec_options 
    1245                ) 
    1246 
    1247        if not is_pre_event and insert_options._autoflush: 
    1248            session._autoflush() 
    1249 
    1250        statement = statement._annotate( 
    1251            {"dml_strategy": insert_options._dml_strategy} 
    1252        ) 
    1253 
    1254        return ( 
    1255            statement, 
    1256            util.immutabledict(execution_options).union( 
    1257                {"_sa_orm_insert_options": insert_options} 
    1258            ), 
    1259        ) 
    1260 
    1261    @classmethod 
    1262    def orm_execute_statement( 
    1263        cls, 
    1264        session: Session, 
    1265        statement: dml.Insert, 
    1266        params: _CoreAnyExecuteParams, 
    1267        execution_options: OrmExecuteOptionsParameter, 
    1268        bind_arguments: _BindArguments, 
    1269        conn: Connection, 
    1270    ) -> _result.Result: 
    1271        insert_options = execution_options.get( 
    1272            "_sa_orm_insert_options", cls.default_insert_options 
    1273        ) 
    1274 
    1275        if insert_options._dml_strategy not in ( 
    1276            "raw", 
    1277            "bulk", 
    1278            "orm", 
    1279            "auto", 
    1280        ): 
    1281            raise sa_exc.ArgumentError( 
    1282                "Valid strategies for ORM insert strategy " 
    1283                "are 'raw', 'orm', 'bulk', 'auto" 
    1284            ) 
    1285 
    1286        result: _result.Result[Unpack[TupleAny]] 
    1287 
    1288        if insert_options._dml_strategy == "raw": 
    1289            result = conn.execute( 
    1290                statement, params or {}, execution_options=execution_options 
    1291            ) 
    1292            return result 
    1293 
    1294        if insert_options._dml_strategy == "bulk": 
    1295            mapper = insert_options._subject_mapper 
    1296 
    1297            if ( 
    1298                statement._post_values_clause is not None 
    1299                and mapper._multiple_persistence_tables 
    1300            ): 
    1301                raise sa_exc.InvalidRequestError( 
    1302                    "bulk INSERT with a 'post values' clause " 
    1303                    "(typically upsert) not supported for multi-table " 
    1304                    f"mapper {mapper}" 
    1305                ) 
    1306 
    1307            assert mapper is not None 
    1308            assert session._transaction is not None 
    1309            result = _bulk_insert( 
    1310                mapper, 
    1311                cast( 
    1312                    "Iterable[Dict[str, Any]]", 
    1313                    [params] if isinstance(params, dict) else params, 
    1314                ), 
    1315                session._transaction, 
    1316                isstates=False, 
    1317                return_defaults=insert_options._return_defaults, 
    1318                render_nulls=insert_options._render_nulls, 
    1319                use_orm_insert_stmt=statement, 
    1320                execution_options=execution_options, 
    1321            ) 
    1322        elif insert_options._dml_strategy == "orm": 
    1323            result = conn.execute( 
    1324                statement, params or {}, execution_options=execution_options 
    1325            ) 
    1326        else: 
    1327            raise AssertionError() 
    1328 
    1329        if not bool(statement._returning): 
    1330            return result 
    1331 
    1332        if insert_options._populate_existing: 
    1333            load_options = execution_options.get( 
    1334                "_sa_orm_load_options", QueryContext.default_load_options 
    1335            ) 
    1336            load_options += {"_populate_existing": True} 
    1337            execution_options = execution_options.union( 
    1338                {"_sa_orm_load_options": load_options} 
    1339            ) 
    1340 
    1341        return cls._return_orm_returning( 
    1342            session, 
    1343            statement, 
    1344            params, 
    1345            execution_options, 
    1346            bind_arguments, 
    1347            result, 
    1348        ) 
    1349 
    1350    @classmethod 
    1351    def create_for_statement(cls, statement, compiler, **kw) -> _BulkORMInsert: 
    1352        self = cast( 
    1353            _BulkORMInsert, 
    1354            super().create_for_statement(statement, compiler, **kw), 
    1355        ) 
    1356 
    1357        if compiler is not None: 
    1358            toplevel = not compiler.stack 
    1359        else: 
    1360            toplevel = True 
    1361        if not toplevel: 
    1362            return self 
    1363 
    1364        mapper = statement._propagate_attrs["plugin_subject"] 
    1365        dml_strategy = statement._annotations.get("dml_strategy", "raw") 
    1366        if dml_strategy == "bulk": 
    1367            self._setup_for_bulk_insert(compiler) 
    1368        elif dml_strategy == "orm": 
    1369            self._setup_for_orm_insert(compiler, mapper) 
    1370 
    1371        return self 
    1372 
    1373    @classmethod 
    1374    def _resolved_keys_as_col_keys(cls, mapper, resolved_value_dict): 
    1375        return { 
    1376            col.key if col is not None else k: v 
    1377            for col, k, v in ( 
    1378                (mapper.c.get(k), k, v) for k, v in resolved_value_dict.items() 
    1379            ) 
    1380        } 
    1381 
    1382    def _setup_for_orm_insert(self, compiler, mapper): 
    1383        statement = orm_level_statement = cast(dml.Insert, self.statement) 
    1384 
    1385        statement = self._setup_orm_returning( 
    1386            compiler, 
    1387            orm_level_statement, 
    1388            statement, 
    1389            dml_mapper=mapper, 
    1390            use_supplemental_cols=False, 
    1391        ) 
    1392        self.statement = statement 
    1393 
    1394    def _setup_for_bulk_insert(self, compiler): 
    1395        """establish an INSERT statement within the context of 
    1396        bulk insert. 
    1397 
    1398        This method will be within the "conn.execute()" call that is invoked 
    1399        by persistence._emit_insert_statement(). 
    1400 
    1401        """ 
    1402        statement = orm_level_statement = cast(dml.Insert, self.statement) 
    1403        an = statement._annotations 
    1404 
    1405        emit_insert_table, emit_insert_mapper = ( 
    1406            an["_emit_insert_table"], 
    1407            an["_emit_insert_mapper"], 
    1408        ) 
    1409 
    1410        statement = statement._clone() 
    1411 
    1412        statement.table = emit_insert_table 
    1413        if self._dict_parameters: 
    1414            self._dict_parameters = { 
    1415                col: val 
    1416                for col, val in self._dict_parameters.items() 
    1417                if col.table is emit_insert_table 
    1418            } 
    1419 
    1420        statement = self._setup_orm_returning( 
    1421            compiler, 
    1422            orm_level_statement, 
    1423            statement, 
    1424            dml_mapper=emit_insert_mapper, 
    1425            use_supplemental_cols=True, 
    1426        ) 
    1427 
    1428        if ( 
    1429            self.from_statement_ctx is not None 
    1430            and self.from_statement_ctx.compile_options._is_star 
    1431        ): 
    1432            raise sa_exc.CompileError( 
    1433                "Can't use RETURNING * with bulk ORM INSERT.  " 
    1434                "Please use a different INSERT form, such as INSERT..VALUES " 
    1435                "or INSERT with a Core Connection" 
    1436            ) 
    1437 
    1438        self.statement = statement 
    1439 
    1440 
    1441@CompileState.plugin_for("orm", "update") 
    1442class _BulkORMUpdate(_BulkUDCompileState, UpdateDMLState): 
    1443    @classmethod 
    1444    def create_for_statement(cls, statement, compiler, **kw): 
    1445        self = cls.__new__(cls) 
    1446 
    1447        dml_strategy = statement._annotations.get( 
    1448            "dml_strategy", "unspecified" 
    1449        ) 
    1450 
    1451        toplevel = not compiler.stack 
    1452 
    1453        if toplevel and dml_strategy == "bulk": 
    1454            self._setup_for_bulk_update(statement, compiler) 
    1455        elif ( 
    1456            dml_strategy == "core_only" 
    1457            or dml_strategy == "unspecified" 
    1458            and "parententity" not in statement.table._annotations 
    1459        ): 
    1460            UpdateDMLState.__init__(self, statement, compiler, **kw) 
    1461        elif not toplevel or dml_strategy in ("orm", "unspecified"): 
    1462            self._setup_for_orm_update(statement, compiler) 
    1463 
    1464        return self 
    1465 
    1466    def _setup_for_orm_update(self, statement, compiler, **kw): 
    1467        orm_level_statement = statement 
    1468 
    1469        toplevel = not compiler.stack 
    1470 
    1471        ext_info = statement.table._annotations["parententity"] 
    1472 
    1473        self.mapper = mapper = ext_info.mapper 
    1474 
    1475        self._resolved_values = self._get_resolved_values(mapper, statement) 
    1476 
    1477        self._init_global_attributes( 
    1478            statement, 
    1479            compiler, 
    1480            toplevel=toplevel, 
    1481            process_criteria_for_toplevel=toplevel, 
    1482        ) 
    1483 
    1484        if statement._values: 
    1485            self._resolved_values = dict(self._resolved_values) 
    1486 
    1487        new_stmt = statement._clone() 
    1488 
    1489        if new_stmt.table._annotations["parententity"] is mapper: 
    1490            new_stmt.table = mapper.local_table 
    1491 
    1492        # note if the statement has _multi_values, these 
    1493        # are passed through to the new statement, which will then raise 
    1494        # InvalidRequestError because UPDATE doesn't support multi_values 
    1495        # right now. 
    1496        if statement._values: 
    1497            new_stmt._values = self._resolved_values 
    1498 
    1499        new_crit = self._adjust_for_extra_criteria( 
    1500            self.global_attributes, mapper 
    1501        ) 
    1502        if new_crit: 
    1503            new_stmt = new_stmt.where(*new_crit) 
    1504 
    1505        # if we are against a lambda statement we might not be the 
    1506        # topmost object that received per-execute annotations 
    1507 
    1508        # do this first as we need to determine if there is 
    1509        # UPDATE..FROM 
    1510 
    1511        UpdateDMLState.__init__(self, new_stmt, compiler, **kw) 
    1512 
    1513        use_supplemental_cols = False 
    1514 
    1515        if not toplevel: 
    1516            synchronize_session = None 
    1517        else: 
    1518            synchronize_session = compiler._annotations.get( 
    1519                "synchronize_session", None 
    1520            ) 
    1521        can_use_returning = compiler._annotations.get( 
    1522            "can_use_returning", None 
    1523        ) 
    1524        if can_use_returning is not False: 
    1525            # even though pre_exec has determined basic 
    1526            # can_use_returning for the dialect, if we are to use 
    1527            # RETURNING we need to run can_use_returning() at this level 
    1528            # unconditionally because is_delete_using was not known 
    1529            # at the pre_exec level 
    1530            can_use_returning = ( 
    1531                synchronize_session == "fetch" 
    1532                and self.can_use_returning( 
    1533                    compiler.dialect, mapper, is_multitable=self.is_multitable 
    1534                ) 
    1535            ) 
    1536 
    1537        if synchronize_session == "fetch" and can_use_returning: 
    1538            use_supplemental_cols = True 
    1539 
    1540            # NOTE: we might want to RETURNING the actual columns to be 
    1541            # synchronized also.  however this is complicated and difficult 
    1542            # to align against the behavior of "evaluate".  Additionally, 
    1543            # in a large number (if not the majority) of cases, we have the 
    1544            # "evaluate" answer, usually a fixed value, in memory already and 
    1545            # there's no need to re-fetch the same value 
    1546            # over and over again.   so perhaps if it could be RETURNING just 
    1547            # the elements that were based on a SQL expression and not 
    1548            # a constant.   For now it doesn't quite seem worth it 
    1549            new_stmt = new_stmt.return_defaults(*new_stmt.table.primary_key) 
    1550 
    1551        if toplevel: 
    1552            new_stmt = self._setup_orm_returning( 
    1553                compiler, 
    1554                orm_level_statement, 
    1555                new_stmt, 
    1556                dml_mapper=mapper, 
    1557                use_supplemental_cols=use_supplemental_cols, 
    1558            ) 
    1559 
    1560        self.statement = new_stmt 
    1561 
    1562    def _setup_for_bulk_update(self, statement, compiler, **kw): 
    1563        """establish an UPDATE statement within the context of 
    1564        bulk insert. 
    1565 
    1566        This method will be within the "conn.execute()" call that is invoked 
    1567        by persistence._emit_update_statement(). 
    1568 
    1569        """ 
    1570        statement = cast(dml.Update, statement) 
    1571        an = statement._annotations 
    1572 
    1573        emit_update_table, _ = ( 
    1574            an["_emit_update_table"], 
    1575            an["_emit_update_mapper"], 
    1576        ) 
    1577 
    1578        statement = statement._clone() 
    1579        statement.table = emit_update_table 
    1580 
    1581        UpdateDMLState.__init__(self, statement, compiler, **kw) 
    1582 
    1583        if self._maintain_values_ordering: 
    1584            raise sa_exc.InvalidRequestError( 
    1585                "bulk ORM UPDATE does not support ordered_values() for " 
    1586                "custom UPDATE statements with bulk parameter sets.  Use a " 
    1587                "non-bulk UPDATE statement or use values()." 
    1588            ) 
    1589 
    1590        if self._dict_parameters: 
    1591            self._dict_parameters = { 
    1592                col: val 
    1593                for col, val in self._dict_parameters.items() 
    1594                if col.table is emit_update_table 
    1595            } 
    1596        self.statement = statement 
    1597 
    1598    @classmethod 
    1599    def orm_execute_statement( 
    1600        cls, 
    1601        session: Session, 
    1602        statement: dml.Update, 
    1603        params: _CoreAnyExecuteParams, 
    1604        execution_options: OrmExecuteOptionsParameter, 
    1605        bind_arguments: _BindArguments, 
    1606        conn: Connection, 
    1607    ) -> _result.Result: 
    1608 
    1609        update_options = execution_options.get( 
    1610            "_sa_orm_update_options", cls.default_update_options 
    1611        ) 
    1612 
    1613        if update_options._populate_existing: 
    1614            load_options = execution_options.get( 
    1615                "_sa_orm_load_options", QueryContext.default_load_options 
    1616            ) 
    1617            load_options += {"_populate_existing": True} 
    1618            execution_options = execution_options.union( 
    1619                {"_sa_orm_load_options": load_options} 
    1620            ) 
    1621 
    1622        if update_options._dml_strategy not in ( 
    1623            "orm", 
    1624            "auto", 
    1625            "bulk", 
    1626            "core_only", 
    1627        ): 
    1628            raise sa_exc.ArgumentError( 
    1629                "Valid strategies for ORM UPDATE strategy " 
    1630                "are 'orm', 'auto', 'bulk', 'core_only'" 
    1631            ) 
    1632 
    1633        result: _result.Result[Unpack[TupleAny]] 
    1634 
    1635        if update_options._dml_strategy == "bulk": 
    1636            enable_check_rowcount = not statement._where_criteria 
    1637 
    1638            assert update_options._synchronize_session != "fetch" 
    1639 
    1640            if ( 
    1641                statement._where_criteria 
    1642                and update_options._synchronize_session == "evaluate" 
    1643            ): 
    1644                raise sa_exc.InvalidRequestError( 
    1645                    "bulk synchronize of persistent objects not supported " 
    1646                    "when using bulk update with additional WHERE " 
    1647                    "criteria right now.  add synchronize_session=None " 
    1648                    "execution option to bypass synchronize of persistent " 
    1649                    "objects." 
    1650                ) 
    1651            mapper = update_options._subject_mapper 
    1652            assert mapper is not None 
    1653            assert session._transaction is not None 
    1654            result = _bulk_update( 
    1655                mapper, 
    1656                cast( 
    1657                    "Iterable[Dict[str, Any]]", 
    1658                    [params] if isinstance(params, dict) else params, 
    1659                ), 
    1660                session._transaction, 
    1661                isstates=False, 
    1662                update_changed_only=False, 
    1663                use_orm_update_stmt=statement, 
    1664                enable_check_rowcount=enable_check_rowcount, 
    1665            ) 
    1666            return cls.orm_setup_cursor_result( 
    1667                session, 
    1668                statement, 
    1669                params, 
    1670                execution_options, 
    1671                bind_arguments, 
    1672                result, 
    1673            ) 
    1674        else: 
    1675            return super().orm_execute_statement( 
    1676                session, 
    1677                statement, 
    1678                params, 
    1679                execution_options, 
    1680                bind_arguments, 
    1681                conn, 
    1682            ) 
    1683 
    1684    @classmethod 
    1685    def can_use_returning( 
    1686        cls, 
    1687        dialect: Dialect, 
    1688        mapper: Mapper[Any], 
    1689        *, 
    1690        is_multitable: bool = False, 
    1691        is_update_from: bool = False, 
    1692        is_delete_using: bool = False, 
    1693        is_executemany: bool = False, 
    1694    ) -> bool: 
    1695        # normal answer for "should we use RETURNING" at all. 
    1696        normal_answer = ( 
    1697            dialect.update_returning and mapper.local_table.implicit_returning 
    1698        ) 
    1699        if not normal_answer: 
    1700            return False 
    1701 
    1702        if is_executemany: 
    1703            return dialect.update_executemany_returning 
    1704 
    1705        # these workarounds are currently hypothetical for UPDATE, 
    1706        # unlike DELETE where they impact MariaDB 
    1707        if is_update_from: 
    1708            return dialect.update_returning_multifrom 
    1709 
    1710        elif is_multitable and not dialect.update_returning_multifrom: 
    1711            raise sa_exc.CompileError( 
    1712                f'Dialect "{dialect.name}" does not support RETURNING ' 
    1713                "with UPDATE..FROM; for synchronize_session='fetch', " 
    1714                "please add the additional execution option " 
    1715                "'is_update_from=True' to the statement to indicate that " 
    1716                "a separate SELECT should be used for this backend." 
    1717            ) 
    1718 
    1719        return True 
    1720 
    1721    @classmethod 
    1722    def _do_post_synchronize_bulk_evaluate( 
    1723        cls, session, params, result, update_options 
    1724    ): 
    1725        if not params: 
    1726            return 
    1727 
    1728        mapper = update_options._subject_mapper 
    1729        pk_keys = [prop.key for prop in mapper._identity_key_props] 
    1730 
    1731        identity_map = session.identity_map 
    1732 
    1733        for param in params: 
    1734            identity_key = mapper.identity_key_from_primary_key( 
    1735                (param[key] for key in pk_keys), 
    1736                update_options._identity_token, 
    1737            ) 
    1738            state = identity_map.fast_get_state(identity_key) 
    1739            if not state: 
    1740                continue 
    1741 
    1742            evaluated_keys = set(param).difference(pk_keys) 
    1743 
    1744            dict_ = state.dict 
    1745            # only evaluate unmodified attributes 
    1746            to_evaluate = state.unmodified.intersection(evaluated_keys) 
    1747            for key in to_evaluate: 
    1748                if key in dict_: 
    1749                    dict_[key] = param[key] 
    1750 
    1751            state.manager.dispatch.refresh(state, None, to_evaluate) 
    1752 
    1753            state._commit(dict_, list(to_evaluate)) 
    1754 
    1755            # attributes that were formerly modified instead get expired. 
    1756            # this only gets hit if the session had pending changes 
    1757            # and autoflush were set to False. 
    1758            to_expire = evaluated_keys.intersection(dict_).difference( 
    1759                to_evaluate 
    1760            ) 
    1761            if to_expire: 
    1762                state._expire_attributes(dict_, to_expire) 
    1763 
    1764    @classmethod 
    1765    def _do_post_synchronize_evaluate( 
    1766        cls, session, statement, result, update_options 
    1767    ): 
    1768        matched_objects = cls._get_matched_objects_on_criteria( 
    1769            update_options, 
    1770            session.identity_map.all_states(), 
    1771        ) 
    1772 
    1773        cls._apply_update_set_values_to_objects( 
    1774            session, 
    1775            update_options, 
    1776            statement, 
    1777            result.context.compiled_parameters[0], 
    1778            [(obj, state, dict_) for obj, state, dict_, _ in matched_objects], 
    1779            result.prefetch_cols(), 
    1780            result.postfetch_cols(), 
    1781        ) 
    1782 
    1783    @classmethod 
    1784    def _do_post_synchronize_fetch( 
    1785        cls, session, statement, result, update_options 
    1786    ): 
    1787        target_mapper = update_options._subject_mapper 
    1788 
    1789        returned_defaults_rows = result.returned_defaults_rows 
    1790        if returned_defaults_rows: 
    1791            pk_rows = cls._interpret_returning_rows( 
    1792                result, target_mapper, returned_defaults_rows 
    1793            ) 
    1794            matched_rows = [ 
    1795                tuple(row) + (update_options._identity_token,) 
    1796                for row in pk_rows 
    1797            ] 
    1798        else: 
    1799            matched_rows = update_options._matched_rows 
    1800 
    1801        objs = [ 
    1802            session.identity_map[identity_key] 
    1803            for identity_key in [ 
    1804                target_mapper.identity_key_from_primary_key( 
    1805                    list(primary_key), 
    1806                    identity_token=identity_token, 
    1807                ) 
    1808                for primary_key, identity_token in [ 
    1809                    (row[0:-1], row[-1]) for row in matched_rows 
    1810                ] 
    1811                if update_options._identity_token is None 
    1812                or identity_token == update_options._identity_token 
    1813            ] 
    1814            if identity_key in session.identity_map 
    1815        ] 
    1816 
    1817        if not objs: 
    1818            return 
    1819 
    1820        cls._apply_update_set_values_to_objects( 
    1821            session, 
    1822            update_options, 
    1823            statement, 
    1824            result.context.compiled_parameters[0], 
    1825            [ 
    1826                ( 
    1827                    obj, 
    1828                    attributes.instance_state(obj), 
    1829                    attributes.instance_dict(obj), 
    1830                ) 
    1831                for obj in objs 
    1832            ], 
    1833            result.prefetch_cols(), 
    1834            result.postfetch_cols(), 
    1835        ) 
    1836 
    1837    @classmethod 
    1838    def _apply_update_set_values_to_objects( 
    1839        cls, 
    1840        session, 
    1841        update_options, 
    1842        statement, 
    1843        effective_params, 
    1844        matched_objects, 
    1845        prefetch_cols, 
    1846        postfetch_cols, 
    1847    ): 
    1848        """apply values to objects derived from an update statement, e.g. 
    1849        UPDATE..SET <values> 
    1850 
    1851        """ 
    1852 
    1853        mapper = update_options._subject_mapper 
    1854        target_cls = mapper.class_ 
    1855        evaluator_compiler = evaluator._EvaluatorCompiler(target_cls) 
    1856        resolved_values = cls._get_resolved_values(mapper, statement) 
    1857        resolved_keys_as_propnames = cls._resolved_keys_as_propnames( 
    1858            mapper, resolved_values 
    1859        ) 
    1860        value_evaluators = {} 
    1861        for key, value in resolved_keys_as_propnames: 
    1862            try: 
    1863                _evaluator = evaluator_compiler.process( 
    1864                    coercions.expect(roles.ExpressionElementRole, value) 
    1865                ) 
    1866            except evaluator.UnevaluatableError: 
    1867                pass 
    1868            else: 
    1869                value_evaluators[key] = _evaluator 
    1870 
    1871        evaluated_keys = list(value_evaluators.keys()) 
    1872        attrib = {k for k, v in resolved_keys_as_propnames} 
    1873 
    1874        states = set() 
    1875 
    1876        to_prefetch = { 
    1877            c 
    1878            for c in prefetch_cols 
    1879            if c.key in effective_params 
    1880            and c in mapper._columntoproperty 
    1881            and c.key not in evaluated_keys 
    1882        } 
    1883        to_expire = { 
    1884            mapper._columntoproperty[c].key 
    1885            for c in postfetch_cols 
    1886            if c in mapper._columntoproperty 
    1887        }.difference(evaluated_keys) 
    1888 
    1889        prefetch_transfer = [ 
    1890            (mapper._columntoproperty[c].key, c.key) for c in to_prefetch 
    1891        ] 
    1892 
    1893        for obj, state, dict_ in matched_objects: 
    1894 
    1895            dict_.update( 
    1896                { 
    1897                    col_to_prop: effective_params[c_key] 
    1898                    for col_to_prop, c_key in prefetch_transfer 
    1899                } 
    1900            ) 
    1901 
    1902            state._expire_attributes(state.dict, to_expire) 
    1903 
    1904            to_evaluate = state.unmodified.intersection(evaluated_keys) 
    1905 
    1906            for key in to_evaluate: 
    1907                if key in dict_: 
    1908                    # only run eval for attributes that are present. 
    1909                    dict_[key] = value_evaluators[key](obj) 
    1910 
    1911            state.manager.dispatch.refresh(state, None, to_evaluate) 
    1912 
    1913            state._commit(dict_, list(to_evaluate)) 
    1914 
    1915            # attributes that were formerly modified instead get expired. 
    1916            # this only gets hit if the session had pending changes 
    1917            # and autoflush were set to False. 
    1918            to_expire = attrib.intersection(dict_).difference(to_evaluate) 
    1919            if to_expire: 
    1920                state._expire_attributes(dict_, to_expire) 
    1921 
    1922            states.add(state) 
    1923        session._register_altered(states) 
    1924 
    1925 
    1926@CompileState.plugin_for("orm", "delete") 
    1927class _BulkORMDelete(_BulkUDCompileState, DeleteDMLState): 
    1928    @classmethod 
    1929    def create_for_statement(cls, statement, compiler, **kw): 
    1930        self = cls.__new__(cls) 
    1931 
    1932        dml_strategy = statement._annotations.get( 
    1933            "dml_strategy", "unspecified" 
    1934        ) 
    1935 
    1936        if ( 
    1937            dml_strategy == "core_only" 
    1938            or dml_strategy == "unspecified" 
    1939            and "parententity" not in statement.table._annotations 
    1940        ): 
    1941            DeleteDMLState.__init__(self, statement, compiler, **kw) 
    1942            return self 
    1943 
    1944        toplevel = not compiler.stack 
    1945 
    1946        orm_level_statement = statement 
    1947 
    1948        ext_info = statement.table._annotations["parententity"] 
    1949        self.mapper = mapper = ext_info.mapper 
    1950 
    1951        self._init_global_attributes( 
    1952            statement, 
    1953            compiler, 
    1954            toplevel=toplevel, 
    1955            process_criteria_for_toplevel=toplevel, 
    1956        ) 
    1957 
    1958        new_stmt = statement._clone() 
    1959 
    1960        if new_stmt.table._annotations["parententity"] is mapper: 
    1961            new_stmt.table = mapper.local_table 
    1962 
    1963        new_crit = cls._adjust_for_extra_criteria( 
    1964            self.global_attributes, mapper 
    1965        ) 
    1966        if new_crit: 
    1967            new_stmt = new_stmt.where(*new_crit) 
    1968 
    1969        # do this first as we need to determine if there is 
    1970        # DELETE..FROM 
    1971        DeleteDMLState.__init__(self, new_stmt, compiler, **kw) 
    1972 
    1973        use_supplemental_cols = False 
    1974 
    1975        if not toplevel: 
    1976            synchronize_session = None 
    1977        else: 
    1978            synchronize_session = compiler._annotations.get( 
    1979                "synchronize_session", None 
    1980            ) 
    1981        can_use_returning = compiler._annotations.get( 
    1982            "can_use_returning", None 
    1983        ) 
    1984        if can_use_returning is not False: 
    1985            # even though pre_exec has determined basic 
    1986            # can_use_returning for the dialect, if we are to use 
    1987            # RETURNING we need to run can_use_returning() at this level 
    1988            # unconditionally because is_delete_using was not known 
    1989            # at the pre_exec level 
    1990            can_use_returning = ( 
    1991                synchronize_session == "fetch" 
    1992                and self.can_use_returning( 
    1993                    compiler.dialect, 
    1994                    mapper, 
    1995                    is_multitable=self.is_multitable, 
    1996                    is_delete_using=compiler._annotations.get( 
    1997                        "is_delete_using", False 
    1998                    ), 
    1999                ) 
    2000            ) 
    2001 
    2002        if can_use_returning: 
    2003            use_supplemental_cols = True 
    2004 
    2005            new_stmt = new_stmt.return_defaults(*new_stmt.table.primary_key) 
    2006 
    2007        if toplevel: 
    2008            new_stmt = self._setup_orm_returning( 
    2009                compiler, 
    2010                orm_level_statement, 
    2011                new_stmt, 
    2012                dml_mapper=mapper, 
    2013                use_supplemental_cols=use_supplemental_cols, 
    2014            ) 
    2015 
    2016        self.statement = new_stmt 
    2017 
    2018        return self 
    2019 
    2020    @classmethod 
    2021    def orm_execute_statement( 
    2022        cls, 
    2023        session: Session, 
    2024        statement: dml.Delete, 
    2025        params: _CoreAnyExecuteParams, 
    2026        execution_options: OrmExecuteOptionsParameter, 
    2027        bind_arguments: _BindArguments, 
    2028        conn: Connection, 
    2029    ) -> _result.Result: 
    2030        update_options = execution_options.get( 
    2031            "_sa_orm_update_options", cls.default_update_options 
    2032        ) 
    2033 
    2034        if update_options._dml_strategy == "bulk": 
    2035            raise sa_exc.InvalidRequestError( 
    2036                "Bulk ORM DELETE not supported right now. " 
    2037                "Statement may be invoked at the " 
    2038                "Core level using " 
    2039                "session.connection().execute(stmt, parameters)" 
    2040            ) 
    2041 
    2042        if update_options._dml_strategy not in ("orm", "auto", "core_only"): 
    2043            raise sa_exc.ArgumentError( 
    2044                "Valid strategies for ORM DELETE strategy are 'orm', 'auto', " 
    2045                "'core_only'" 
    2046            ) 
    2047 
    2048        return super().orm_execute_statement( 
    2049            session, statement, params, execution_options, bind_arguments, conn 
    2050        ) 
    2051 
    2052    @classmethod 
    2053    def can_use_returning( 
    2054        cls, 
    2055        dialect: Dialect, 
    2056        mapper: Mapper[Any], 
    2057        *, 
    2058        is_multitable: bool = False, 
    2059        is_update_from: bool = False, 
    2060        is_delete_using: bool = False, 
    2061        is_executemany: bool = False, 
    2062    ) -> bool: 
    2063        # normal answer for "should we use RETURNING" at all. 
    2064        normal_answer = ( 
    2065            dialect.delete_returning and mapper.local_table.implicit_returning 
    2066        ) 
    2067        if not normal_answer: 
    2068            return False 
    2069 
    2070        # now get into special workarounds because MariaDB supports 
    2071        # DELETE...RETURNING but not DELETE...USING...RETURNING. 
    2072        if is_delete_using: 
    2073            # is_delete_using hint was passed.   use 
    2074            # additional dialect feature (True for PG, False for MariaDB) 
    2075            return dialect.delete_returning_multifrom 
    2076 
    2077        elif is_multitable and not dialect.delete_returning_multifrom: 
    2078            # is_delete_using hint was not passed, but we determined 
    2079            # at compile time that this is in fact a DELETE..USING. 
    2080            # it's too late to continue since we did not pre-SELECT. 
    2081            # raise that we need that hint up front. 
    2082 
    2083            raise sa_exc.CompileError( 
    2084                f'Dialect "{dialect.name}" does not support RETURNING ' 
    2085                "with DELETE..USING; for synchronize_session='fetch', " 
    2086                "please add the additional execution option " 
    2087                "'is_delete_using=True' to the statement to indicate that " 
    2088                "a separate SELECT should be used for this backend." 
    2089            ) 
    2090 
    2091        return True 
    2092 
    2093    @classmethod 
    2094    def _do_post_synchronize_evaluate( 
    2095        cls, session, statement, result, update_options 
    2096    ): 
    2097        matched_objects = cls._get_matched_objects_on_criteria( 
    2098            update_options, 
    2099            session.identity_map.all_states(), 
    2100        ) 
    2101 
    2102        to_delete = [] 
    2103 
    2104        for _, state, dict_, is_partially_expired in matched_objects: 
    2105            if is_partially_expired: 
    2106                state._expire(dict_, session.identity_map._modified) 
    2107            else: 
    2108                to_delete.append(state) 
    2109 
    2110        if to_delete: 
    2111            session._remove_newly_deleted(to_delete) 
    2112 
    2113    @classmethod 
    2114    def _do_post_synchronize_fetch( 
    2115        cls, session, statement, result, update_options 
    2116    ): 
    2117        target_mapper = update_options._subject_mapper 
    2118 
    2119        returned_defaults_rows = result.returned_defaults_rows 
    2120 
    2121        if returned_defaults_rows: 
    2122            pk_rows = cls._interpret_returning_rows( 
    2123                result, target_mapper, returned_defaults_rows 
    2124            ) 
    2125 
    2126            matched_rows = [ 
    2127                tuple(row) + (update_options._identity_token,) 
    2128                for row in pk_rows 
    2129            ] 
    2130        else: 
    2131            matched_rows = update_options._matched_rows 
    2132 
    2133        for row in matched_rows: 
    2134            primary_key = row[0:-1] 
    2135            identity_token = row[-1] 
    2136 
    2137            # TODO: inline this and call remove_newly_deleted 
    2138            # once 
    2139            identity_key = target_mapper.identity_key_from_primary_key( 
    2140                list(primary_key), 
    2141                identity_token=identity_token, 
    2142            ) 
    2143            if identity_key in session.identity_map: 
    2144                session._remove_newly_deleted( 
    2145                    [ 
    2146                        attributes.instance_state( 
    2147                            session.identity_map[identity_key] 
    2148                        ) 
    2149                    ] 
    2150                )