1# orm/dynamic.py 
    2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 
    3# <see AUTHORS file> 
    4# 
    5# This module is part of SQLAlchemy and is released under 
    6# the MIT License: https://www.opensource.org/licenses/mit-license.php 
    7 
    8 
    9"""Dynamic collection API. 
    10 
    11Dynamic collections act like Query() objects for read operations and support 
    12basic add/delete mutation. 
    13 
    14.. legacy:: the "dynamic" loader is a legacy feature, superseded by the 
    15 "write_only" loader. 
    16 
    17 
    18""" 
    19 
    20from __future__ import annotations 
    21 
    22from typing import Any 
    23from typing import Iterable 
    24from typing import Iterator 
    25from typing import List 
    26from typing import Optional 
    27from typing import Tuple 
    28from typing import Type 
    29from typing import TYPE_CHECKING 
    30from typing import TypeVar 
    31from typing import Union 
    32 
    33from . import attributes 
    34from . import exc as orm_exc 
    35from . import relationships 
    36from . import util as orm_util 
    37from .base import PassiveFlag 
    38from .query import Query 
    39from .session import object_session 
    40from .writeonly import _AbstractCollectionWriter 
    41from .writeonly import _WriteOnlyAttributeImpl 
    42from .writeonly import _WriteOnlyLoader 
    43from .writeonly import WriteOnlyHistory 
    44from .. import util 
    45from ..engine import result 
    46 
    47 
    48if TYPE_CHECKING: 
    49    from . import QueryableAttribute 
    50    from .mapper import Mapper 
    51    from .relationships import _RelationshipOrderByArg 
    52    from .session import Session 
    53    from .state import InstanceState 
    54    from .util import AliasedClass 
    55    from ..event import _Dispatch 
    56    from ..sql.elements import ColumnElement 
    57 
    58_T = TypeVar("_T", bound=Any) 
    59 
    60 
    61class DynamicCollectionHistory(WriteOnlyHistory[_T]): 
    62    def __init__( 
    63        self, 
    64        attr: _DynamicAttributeImpl, 
    65        state: InstanceState[_T], 
    66        passive: PassiveFlag, 
    67        apply_to: Optional[DynamicCollectionHistory[_T]] = None, 
    68    ) -> None: 
    69        if apply_to: 
    70            coll = AppenderQuery(attr, state).autoflush(False) 
    71            self.unchanged_items = util.OrderedIdentitySet(coll) 
    72            self.added_items = apply_to.added_items 
    73            self.deleted_items = apply_to.deleted_items 
    74            self._reconcile_collection = True 
    75        else: 
    76            self.deleted_items = util.OrderedIdentitySet() 
    77            self.added_items = util.OrderedIdentitySet() 
    78            self.unchanged_items = util.OrderedIdentitySet() 
    79            self._reconcile_collection = False 
    80 
    81 
    82class _DynamicAttributeImpl(_WriteOnlyAttributeImpl): 
    83    _supports_dynamic_iteration = True 
    84    collection_history_cls = DynamicCollectionHistory[Any] 
    85    query_class: Type[_AppenderMixin[Any]]  # type: ignore[assignment] 
    86 
    87    def __init__( 
    88        self, 
    89        class_: Union[Type[Any], AliasedClass[Any]], 
    90        key: str, 
    91        dispatch: _Dispatch[QueryableAttribute[Any]], 
    92        target_mapper: Mapper[_T], 
    93        order_by: _RelationshipOrderByArg, 
    94        query_class: Optional[Type[_AppenderMixin[_T]]] = None, 
    95        **kw: Any, 
    96    ) -> None: 
    97        attributes._AttributeImpl.__init__( 
    98            self, class_, key, None, dispatch, **kw 
    99        ) 
    100        self.target_mapper = target_mapper 
    101        if order_by: 
    102            self.order_by = tuple(order_by) 
    103        if not query_class: 
    104            self.query_class = AppenderQuery 
    105        elif _AppenderMixin in query_class.mro(): 
    106            self.query_class = query_class 
    107        else: 
    108            self.query_class = mixin_user_query(query_class) 
    109 
    110 
    111@relationships.RelationshipProperty.strategy_for(lazy="dynamic") 
    112class _DynaLoader(_WriteOnlyLoader): 
    113    impl_class = _DynamicAttributeImpl 
    114 
    115 
    116class _AppenderMixin(_AbstractCollectionWriter[_T]): 
    117    """A mixin that expects to be mixing in a Query class with 
    118    AbstractAppender. 
    119 
    120 
    121    """ 
    122 
    123    query_class: Optional[Type[Query[_T]]] = None 
    124    _order_by_clauses: Tuple[ColumnElement[Any], ...] 
    125 
    126    def __init__( 
    127        self, attr: _DynamicAttributeImpl, state: InstanceState[_T] 
    128    ) -> None: 
    129        Query.__init__( 
    130            self,  # type: ignore[arg-type] 
    131            attr.target_mapper, 
    132            None, 
    133        ) 
    134        super().__init__(attr, state) 
    135 
    136    @property 
    137    def session(self) -> Optional[Session]: 
    138        sess = object_session(self.instance) 
    139        if sess is not None and sess.autoflush and self.instance in sess: 
    140            sess.flush() 
    141        if not orm_util.has_identity(self.instance): 
    142            return None 
    143        else: 
    144            return sess 
    145 
    146    @session.setter 
    147    def session(self, session: Session) -> None: 
    148        self.sess = session 
    149 
    150    def _iter(self) -> Union[result.ScalarResult[_T], result.Result[_T]]: 
    151        sess = self.session 
    152        if sess is None: 
    153            state = attributes.instance_state(self.instance) 
    154            if state.detached: 
    155                util.warn( 
    156                    "Instance %s is detached, dynamic relationship cannot " 
    157                    "return a correct result.   This warning will become " 
    158                    "a DetachedInstanceError in a future release." 
    159                    % (orm_util.state_str(state)) 
    160                ) 
    161 
    162            return result.IteratorResult( 
    163                result.SimpleResultMetaData([self.attr.class_.__name__]), 
    164                iter( 
    165                    self.attr._get_collection_history( 
    166                        attributes.instance_state(self.instance), 
    167                        PassiveFlag.PASSIVE_NO_INITIALIZE, 
    168                    ).added_items 
    169                ), 
    170                _source_supports_scalars=True, 
    171            ).scalars() 
    172        else: 
    173            return self._generate(sess)._iter() 
    174 
    175    if TYPE_CHECKING: 
    176 
    177        def __iter__(self) -> Iterator[_T]: ... 
    178 
    179    def __getitem__(self, index: Any) -> Union[_T, List[_T]]: 
    180        sess = self.session 
    181        if sess is None: 
    182            return self.attr._get_collection_history( 
    183                attributes.instance_state(self.instance), 
    184                PassiveFlag.PASSIVE_NO_INITIALIZE, 
    185            ).indexed(index) 
    186        else: 
    187            return self._generate(sess).__getitem__(index)  # type: ignore[no-any-return] # noqa: E501 
    188 
    189    def count(self) -> int: 
    190        sess = self.session 
    191        if sess is None: 
    192            return len( 
    193                self.attr._get_collection_history( 
    194                    attributes.instance_state(self.instance), 
    195                    PassiveFlag.PASSIVE_NO_INITIALIZE, 
    196                ).added_items 
    197            ) 
    198        else: 
    199            return self._generate(sess).count() 
    200 
    201    def _generate( 
    202        self, 
    203        sess: Optional[Session] = None, 
    204    ) -> Query[_T]: 
    205        # note we're returning an entirely new Query class instance 
    206        # here without any assignment capabilities; the class of this 
    207        # query is determined by the session. 
    208        instance = self.instance 
    209        if sess is None: 
    210            sess = object_session(instance) 
    211            if sess is None: 
    212                raise orm_exc.DetachedInstanceError( 
    213                    "Parent instance %s is not bound to a Session, and no " 
    214                    "contextual session is established; lazy load operation " 
    215                    "of attribute '%s' cannot proceed" 
    216                    % (orm_util.instance_str(instance), self.attr.key) 
    217                ) 
    218 
    219        if self.query_class: 
    220            query = self.query_class(self.attr.target_mapper, session=sess) 
    221        else: 
    222            query = sess.query(self.attr.target_mapper) 
    223 
    224        query._where_criteria = self._where_criteria 
    225        query._from_obj = self._from_obj 
    226        query._order_by_clauses = self._order_by_clauses 
    227 
    228        return query 
    229 
    230    def add_all(self, iterator: Iterable[_T]) -> None: 
    231        """Add an iterable of items to this :class:`_orm.AppenderQuery`. 
    232 
    233        The given items will be persisted to the database in terms of 
    234        the parent instance's collection on the next flush. 
    235 
    236        This method is provided to assist in delivering forwards-compatibility 
    237        with the :class:`_orm.WriteOnlyCollection` collection class. 
    238 
    239        .. versionadded:: 2.0 
    240 
    241        """ 
    242        self._add_all_impl(iterator) 
    243 
    244    def add(self, item: _T) -> None: 
    245        """Add an item to this :class:`_orm.AppenderQuery`. 
    246 
    247        The given item will be persisted to the database in terms of 
    248        the parent instance's collection on the next flush. 
    249 
    250        This method is provided to assist in delivering forwards-compatibility 
    251        with the :class:`_orm.WriteOnlyCollection` collection class. 
    252 
    253        .. versionadded:: 2.0 
    254 
    255        """ 
    256        self._add_all_impl([item]) 
    257 
    258    def extend(self, iterator: Iterable[_T]) -> None: 
    259        """Add an iterable of items to this :class:`_orm.AppenderQuery`. 
    260 
    261        The given items will be persisted to the database in terms of 
    262        the parent instance's collection on the next flush. 
    263 
    264        """ 
    265        self._add_all_impl(iterator) 
    266 
    267    def append(self, item: _T) -> None: 
    268        """Append an item to this :class:`_orm.AppenderQuery`. 
    269 
    270        The given item will be persisted to the database in terms of 
    271        the parent instance's collection on the next flush. 
    272 
    273        """ 
    274        self._add_all_impl([item]) 
    275 
    276    def remove(self, item: _T) -> None: 
    277        """Remove an item from this :class:`_orm.AppenderQuery`. 
    278 
    279        The given item will be removed from the parent instance's collection on 
    280        the next flush. 
    281 
    282        """ 
    283        self._remove_impl(item) 
    284 
    285 
    286class AppenderQuery(_AppenderMixin[_T], Query[_T]):  # type: ignore[misc] 
    287    """A dynamic query that supports basic collection storage operations. 
    288 
    289    Methods on :class:`.AppenderQuery` include all methods of 
    290    :class:`_orm.Query`, plus additional methods used for collection 
    291    persistence. 
    292 
    293 
    294    """ 
    295 
    296 
    297def mixin_user_query(cls: Any) -> type[_AppenderMixin[Any]]: 
    298    """Return a new class with AppenderQuery functionality layered over.""" 
    299    name = "Appender" + cls.__name__ 
    300    return type(name, (_AppenderMixin, cls), {"query_class": cls})