1# orm/writeonly.py 
    2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 
    3# <see AUTHORS file> 
    4# 
    5# This module is part of SQLAlchemy and is released under 
    6# the MIT License: https://www.opensource.org/licenses/mit-license.php 
    7 
    8"""Write-only collection API. 
    9 
    10This is an alternate mapped attribute style that only supports single-item 
    11collection mutation operations.   To read the collection, a select() 
    12object must be executed each time. 
    13 
    14.. versionadded:: 2.0 
    15 
    16 
    17""" 
    18 
    19from __future__ import annotations 
    20 
    21from typing import Any 
    22from typing import Collection 
    23from typing import Dict 
    24from typing import Generic 
    25from typing import Iterable 
    26from typing import Iterator 
    27from typing import List 
    28from typing import Literal 
    29from typing import NoReturn 
    30from typing import Optional 
    31from typing import overload 
    32from typing import Tuple 
    33from typing import Type 
    34from typing import TYPE_CHECKING 
    35from typing import TypeVar 
    36from typing import Union 
    37 
    38from sqlalchemy.sql import bindparam 
    39from . import attributes 
    40from . import interfaces 
    41from . import relationships 
    42from . import strategies 
    43from .base import ATTR_EMPTY 
    44from .base import NEVER_SET 
    45from .base import object_mapper 
    46from .base import PassiveFlag 
    47from .base import RelationshipDirection 
    48from .. import exc 
    49from .. import inspect 
    50from .. import log 
    51from .. import util 
    52from ..sql import delete 
    53from ..sql import insert 
    54from ..sql import select 
    55from ..sql import update 
    56from ..sql.dml import Delete 
    57from ..sql.dml import Insert 
    58from ..sql.dml import Update 
    59 
    60if TYPE_CHECKING: 
    61    from . import QueryableAttribute 
    62    from ._typing import _InstanceDict 
    63    from .attributes import AttributeEventToken 
    64    from .base import LoaderCallableStatus 
    65    from .collections import _AdaptedCollectionProtocol 
    66    from .collections import CollectionAdapter 
    67    from .mapper import Mapper 
    68    from .relationships import _RelationshipOrderByArg 
    69    from .state import InstanceState 
    70    from .util import AliasedClass 
    71    from ..event import _Dispatch 
    72    from ..sql.selectable import FromClause 
    73    from ..sql.selectable import Select 
    74 
    75_T = TypeVar("_T", bound=Any) 
    76 
    77 
    78class WriteOnlyHistory(Generic[_T]): 
    79    """Overrides AttributeHistory to receive append/remove events directly.""" 
    80 
    81    unchanged_items: util.OrderedIdentitySet 
    82    added_items: util.OrderedIdentitySet 
    83    deleted_items: util.OrderedIdentitySet 
    84    _reconcile_collection: bool 
    85 
    86    def __init__( 
    87        self, 
    88        attr: _WriteOnlyAttributeImpl, 
    89        state: InstanceState[_T], 
    90        passive: PassiveFlag, 
    91        apply_to: Optional[WriteOnlyHistory[_T]] = None, 
    92    ) -> None: 
    93        if apply_to: 
    94            if passive & PassiveFlag.SQL_OK: 
    95                raise exc.InvalidRequestError( 
    96                    f"Attribute {attr} can't load the existing state from the " 
    97                    "database for this operation; full iteration is not " 
    98                    "permitted.  If this is a delete operation, configure " 
    99                    f"passive_deletes=True on the {attr} relationship in " 
    100                    "order to resolve this error." 
    101                ) 
    102 
    103            self.unchanged_items = apply_to.unchanged_items 
    104            self.added_items = apply_to.added_items 
    105            self.deleted_items = apply_to.deleted_items 
    106            self._reconcile_collection = apply_to._reconcile_collection 
    107        else: 
    108            self.deleted_items = util.OrderedIdentitySet() 
    109            self.added_items = util.OrderedIdentitySet() 
    110            self.unchanged_items = util.OrderedIdentitySet() 
    111            self._reconcile_collection = False 
    112 
    113    @property 
    114    def added_plus_unchanged(self) -> List[_T]: 
    115        return list(self.added_items.union(self.unchanged_items)) 
    116 
    117    @property 
    118    def all_items(self) -> List[_T]: 
    119        return list( 
    120            self.added_items.union(self.unchanged_items).union( 
    121                self.deleted_items 
    122            ) 
    123        ) 
    124 
    125    def as_history(self) -> attributes.History: 
    126        if self._reconcile_collection: 
    127            added = self.added_items.difference(self.unchanged_items) 
    128            deleted = self.deleted_items.intersection(self.unchanged_items) 
    129            unchanged = self.unchanged_items.difference(deleted) 
    130        else: 
    131            added, unchanged, deleted = ( 
    132                self.added_items, 
    133                self.unchanged_items, 
    134                self.deleted_items, 
    135            ) 
    136        return attributes.History(list(added), list(unchanged), list(deleted)) 
    137 
    138    def indexed(self, index: Union[int, slice]) -> Union[List[_T], _T]: 
    139        return list(self.added_items)[index] 
    140 
    141    def add_added(self, value: _T) -> None: 
    142        self.added_items.add(value) 
    143 
    144    def add_removed(self, value: _T) -> None: 
    145        if value in self.added_items: 
    146            self.added_items.remove(value) 
    147        else: 
    148            self.deleted_items.add(value) 
    149 
    150 
    151class _WriteOnlyAttributeImpl( 
    152    attributes._HasCollectionAdapter, attributes._AttributeImpl 
    153): 
    154    uses_objects: bool = True 
    155    default_accepts_scalar_loader: bool = False 
    156    supports_population: bool = False 
    157    _supports_dynamic_iteration: bool = False 
    158    collection: bool = False 
    159    dynamic: bool = True 
    160    order_by: _RelationshipOrderByArg = () 
    161    collection_history_cls: Type[WriteOnlyHistory[Any]] = WriteOnlyHistory 
    162 
    163    query_class: Type[WriteOnlyCollection[Any]] 
    164 
    165    def __init__( 
    166        self, 
    167        class_: Union[Type[Any], AliasedClass[Any]], 
    168        key: str, 
    169        dispatch: _Dispatch[QueryableAttribute[Any]], 
    170        target_mapper: Mapper[_T], 
    171        order_by: _RelationshipOrderByArg, 
    172        **kw: Any, 
    173    ): 
    174        super().__init__(class_, key, None, dispatch, **kw) 
    175        self.target_mapper = target_mapper 
    176        self.query_class = WriteOnlyCollection 
    177        if order_by: 
    178            self.order_by = tuple(order_by) 
    179 
    180    def get( 
    181        self, 
    182        state: InstanceState[Any], 
    183        dict_: _InstanceDict, 
    184        passive: PassiveFlag = PassiveFlag.PASSIVE_OFF, 
    185    ) -> Union[util.OrderedIdentitySet, WriteOnlyCollection[Any]]: 
    186        if not passive & PassiveFlag.SQL_OK: 
    187            return self._get_collection_history( 
    188                state, PassiveFlag.PASSIVE_NO_INITIALIZE 
    189            ).added_items 
    190        else: 
    191            return self.query_class(self, state) 
    192 
    193    @overload 
    194    def get_collection( 
    195        self, 
    196        state: InstanceState[Any], 
    197        dict_: _InstanceDict, 
    198        user_data: Literal[None] = ..., 
    199        passive: Literal[PassiveFlag.PASSIVE_OFF] = ..., 
    200    ) -> CollectionAdapter: ... 
    201 
    202    @overload 
    203    def get_collection( 
    204        self, 
    205        state: InstanceState[Any], 
    206        dict_: _InstanceDict, 
    207        user_data: _AdaptedCollectionProtocol = ..., 
    208        passive: PassiveFlag = ..., 
    209    ) -> CollectionAdapter: ... 
    210 
    211    @overload 
    212    def get_collection( 
    213        self, 
    214        state: InstanceState[Any], 
    215        dict_: _InstanceDict, 
    216        user_data: Optional[_AdaptedCollectionProtocol] = ..., 
    217        passive: PassiveFlag = ..., 
    218    ) -> Union[ 
    219        Literal[LoaderCallableStatus.PASSIVE_NO_RESULT], CollectionAdapter 
    220    ]: ... 
    221 
    222    def get_collection( 
    223        self, 
    224        state: InstanceState[Any], 
    225        dict_: _InstanceDict, 
    226        user_data: Optional[_AdaptedCollectionProtocol] = None, 
    227        passive: PassiveFlag = PassiveFlag.PASSIVE_OFF, 
    228    ) -> Union[ 
    229        Literal[LoaderCallableStatus.PASSIVE_NO_RESULT], CollectionAdapter 
    230    ]: 
    231        data: Collection[Any] 
    232        if not passive & PassiveFlag.SQL_OK: 
    233            data = self._get_collection_history(state, passive).added_items 
    234        else: 
    235            history = self._get_collection_history(state, passive) 
    236            data = history.added_plus_unchanged 
    237        return _DynamicCollectionAdapter(data)  # type: ignore[return-value] 
    238 
    239    @util.memoized_property 
    240    def _append_token(self) -> attributes.AttributeEventToken: 
    241        return attributes.AttributeEventToken(self, attributes.OP_APPEND) 
    242 
    243    @util.memoized_property 
    244    def _remove_token(self) -> attributes.AttributeEventToken: 
    245        return attributes.AttributeEventToken(self, attributes.OP_REMOVE) 
    246 
    247    def fire_append_event( 
    248        self, 
    249        state: InstanceState[Any], 
    250        dict_: _InstanceDict, 
    251        value: Any, 
    252        initiator: Optional[AttributeEventToken], 
    253        collection_history: Optional[WriteOnlyHistory[Any]] = None, 
    254    ) -> None: 
    255        if collection_history is None: 
    256            collection_history = self._modified_event(state, dict_) 
    257 
    258        collection_history.add_added(value) 
    259 
    260        for fn in self.dispatch.append: 
    261            value = fn(state, value, initiator or self._append_token) 
    262 
    263        if self.trackparent and value is not None: 
    264            self.sethasparent(attributes.instance_state(value), state, True) 
    265 
    266    def fire_remove_event( 
    267        self, 
    268        state: InstanceState[Any], 
    269        dict_: _InstanceDict, 
    270        value: Any, 
    271        initiator: Optional[AttributeEventToken], 
    272        collection_history: Optional[WriteOnlyHistory[Any]] = None, 
    273    ) -> None: 
    274        if collection_history is None: 
    275            collection_history = self._modified_event(state, dict_) 
    276 
    277        collection_history.add_removed(value) 
    278 
    279        if self.trackparent and value is not None: 
    280            self.sethasparent(attributes.instance_state(value), state, False) 
    281 
    282        for fn in self.dispatch.remove: 
    283            fn(state, value, initiator or self._remove_token) 
    284 
    285    def _modified_event( 
    286        self, state: InstanceState[Any], dict_: _InstanceDict 
    287    ) -> WriteOnlyHistory[Any]: 
    288        if self.key not in state.committed_state: 
    289            state.committed_state[self.key] = self.collection_history_cls( 
    290                self, state, PassiveFlag.PASSIVE_NO_FETCH 
    291            ) 
    292 
    293        state._modified_event(dict_, self, NEVER_SET) 
    294 
    295        # this is a hack to allow the entities.ComparableEntity fixture 
    296        # to work 
    297        dict_[self.key] = True 
    298        return state.committed_state[self.key]  # type: ignore[no-any-return] 
    299 
    300    def set( 
    301        self, 
    302        state: InstanceState[Any], 
    303        dict_: _InstanceDict, 
    304        value: Any, 
    305        initiator: Optional[AttributeEventToken] = None, 
    306        passive: PassiveFlag = PassiveFlag.PASSIVE_OFF, 
    307        check_old: Any = None, 
    308        pop: bool = False, 
    309        _adapt: bool = True, 
    310    ) -> None: 
    311        if initiator and initiator.parent_token is self.parent_token: 
    312            return 
    313 
    314        if pop and value is None: 
    315            return 
    316 
    317        iterable = value 
    318        new_values = list(iterable) 
    319        if state.has_identity: 
    320            if not self._supports_dynamic_iteration: 
    321                raise exc.InvalidRequestError( 
    322                    f'Collection "{self}" does not support implicit ' 
    323                    "iteration; collection replacement operations " 
    324                    "can't be used" 
    325                ) 
    326            old_collection = util.IdentitySet( 
    327                self.get(state, dict_, passive=passive) 
    328            ) 
    329 
    330        collection_history = self._modified_event(state, dict_) 
    331        if not state.has_identity: 
    332            old_collection = collection_history.added_items 
    333        else: 
    334            old_collection = old_collection.union( 
    335                collection_history.added_items 
    336            ) 
    337 
    338        constants = old_collection.intersection(new_values) 
    339        additions = util.IdentitySet(new_values).difference(constants) 
    340        removals = old_collection.difference(constants) 
    341 
    342        for member in new_values: 
    343            if member in additions: 
    344                self.fire_append_event( 
    345                    state, 
    346                    dict_, 
    347                    member, 
    348                    None, 
    349                    collection_history=collection_history, 
    350                ) 
    351 
    352        for member in removals: 
    353            self.fire_remove_event( 
    354                state, 
    355                dict_, 
    356                member, 
    357                None, 
    358                collection_history=collection_history, 
    359            ) 
    360 
    361    def delete(self, *args: Any, **kwargs: Any) -> NoReturn: 
    362        raise NotImplementedError() 
    363 
    364    def set_committed_value( 
    365        self, state: InstanceState[Any], dict_: _InstanceDict, value: Any 
    366    ) -> NoReturn: 
    367        raise NotImplementedError( 
    368            "Dynamic attributes don't support collection population." 
    369        ) 
    370 
    371    def get_history( 
    372        self, 
    373        state: InstanceState[Any], 
    374        dict_: _InstanceDict, 
    375        passive: PassiveFlag = PassiveFlag.PASSIVE_NO_FETCH, 
    376    ) -> attributes.History: 
    377        c = self._get_collection_history(state, passive) 
    378        return c.as_history() 
    379 
    380    def get_all_pending( 
    381        self, 
    382        state: InstanceState[Any], 
    383        dict_: _InstanceDict, 
    384        passive: PassiveFlag = PassiveFlag.PASSIVE_NO_INITIALIZE, 
    385    ) -> List[Tuple[InstanceState[Any], Any]]: 
    386        c = self._get_collection_history(state, passive) 
    387        return [(attributes.instance_state(x), x) for x in c.all_items] 
    388 
    389    def _default_value( 
    390        self, state: InstanceState[Any], dict_: _InstanceDict 
    391    ) -> Any: 
    392        value = None 
    393        for fn in self.dispatch.init_scalar: 
    394            ret = fn(state, value, dict_) 
    395            if ret is not ATTR_EMPTY: 
    396                value = ret 
    397 
    398        return value 
    399 
    400    def _get_collection_history( 
    401        self, state: InstanceState[Any], passive: PassiveFlag 
    402    ) -> WriteOnlyHistory[Any]: 
    403        c: WriteOnlyHistory[Any] 
    404        if self.key in state.committed_state: 
    405            c = state.committed_state[self.key] 
    406        else: 
    407            c = self.collection_history_cls( 
    408                self, state, PassiveFlag.PASSIVE_NO_FETCH 
    409            ) 
    410 
    411        if state.has_identity and (passive & PassiveFlag.INIT_OK): 
    412            return self.collection_history_cls( 
    413                self, state, passive, apply_to=c 
    414            ) 
    415        else: 
    416            return c 
    417 
    418    def append( 
    419        self, 
    420        state: InstanceState[Any], 
    421        dict_: _InstanceDict, 
    422        value: Any, 
    423        initiator: Optional[AttributeEventToken], 
    424        passive: PassiveFlag = PassiveFlag.PASSIVE_NO_FETCH, 
    425    ) -> None: 
    426        if initiator is not self:  # type: ignore[comparison-overlap] 
    427            self.fire_append_event(state, dict_, value, initiator) 
    428 
    429    def remove( 
    430        self, 
    431        state: InstanceState[Any], 
    432        dict_: _InstanceDict, 
    433        value: Any, 
    434        initiator: Optional[AttributeEventToken], 
    435        passive: PassiveFlag = PassiveFlag.PASSIVE_NO_FETCH, 
    436    ) -> None: 
    437        if initiator is not self:  # type: ignore[comparison-overlap] 
    438            self.fire_remove_event(state, dict_, value, initiator) 
    439 
    440    def pop( 
    441        self, 
    442        state: InstanceState[Any], 
    443        dict_: _InstanceDict, 
    444        value: Any, 
    445        initiator: Optional[AttributeEventToken], 
    446        passive: PassiveFlag = PassiveFlag.PASSIVE_NO_FETCH, 
    447    ) -> None: 
    448        self.remove(state, dict_, value, initiator, passive=passive) 
    449 
    450 
    451@log.class_logger 
    452@relationships.RelationshipProperty.strategy_for(lazy="write_only") 
    453class _WriteOnlyLoader(strategies._AbstractRelationshipLoader, log.Identified): 
    454    impl_class = _WriteOnlyAttributeImpl 
    455 
    456    def init_class_attribute(self, mapper: Mapper[Any]) -> None: 
    457        self.is_class_level = True 
    458        if not self.uselist or self.parent_property.direction not in ( 
    459            interfaces.ONETOMANY, 
    460            interfaces.MANYTOMANY, 
    461        ): 
    462            raise exc.InvalidRequestError( 
    463                "On relationship %s, 'dynamic' loaders cannot be used with " 
    464                "many-to-one/one-to-one relationships and/or " 
    465                "uselist=False." % self.parent_property 
    466            ) 
    467 
    468        strategies._register_attribute(  # type: ignore[no-untyped-call] 
    469            self.parent_property, 
    470            mapper, 
    471            useobject=True, 
    472            impl_class=self.impl_class, 
    473            target_mapper=self.parent_property.mapper, 
    474            order_by=self.parent_property.order_by, 
    475            query_class=self.parent_property.query_class, 
    476        ) 
    477 
    478 
    479class _DynamicCollectionAdapter: 
    480    """simplified CollectionAdapter for internal API consistency""" 
    481 
    482    data: Collection[Any] 
    483 
    484    def __init__(self, data: Collection[Any]): 
    485        self.data = data 
    486 
    487    def __iter__(self) -> Iterator[Any]: 
    488        return iter(self.data) 
    489 
    490    def _reset_empty(self) -> None: 
    491        pass 
    492 
    493    def __len__(self) -> int: 
    494        return len(self.data) 
    495 
    496    def __bool__(self) -> bool: 
    497        return True 
    498 
    499 
    500class _AbstractCollectionWriter(Generic[_T]): 
    501    """Virtual collection which includes append/remove methods that synchronize 
    502    into the attribute event system. 
    503 
    504    """ 
    505 
    506    if not TYPE_CHECKING: 
    507        __slots__ = () 
    508 
    509    instance: _T 
    510    _from_obj: Tuple[FromClause, ...] 
    511 
    512    def __init__( 
    513        self, attr: _WriteOnlyAttributeImpl, state: InstanceState[_T] 
    514    ): 
    515        instance = state.obj() 
    516        if TYPE_CHECKING: 
    517            assert instance 
    518        self.instance = instance 
    519        self.attr = attr 
    520 
    521        mapper = object_mapper(instance) 
    522        prop = mapper._props[self.attr.key] 
    523 
    524        if prop.secondary is not None: 
    525            # this is a hack right now.  The Query only knows how to 
    526            # make subsequent joins() without a given left-hand side 
    527            # from self._from_obj[0].  We need to ensure prop.secondary 
    528            # is in the FROM.  So we purposely put the mapper selectable 
    529            # in _from_obj[0] to ensure a user-defined join() later on 
    530            # doesn't fail, and secondary is then in _from_obj[1]. 
    531 
    532            # note also, we are using the official ORM-annotated selectable 
    533            # from __clause_element__(), see #7868 
    534            self._from_obj = (prop.mapper.__clause_element__(), prop.secondary) 
    535        else: 
    536            self._from_obj = () 
    537 
    538        self._where_criteria = ( 
    539            prop._with_parent(instance, alias_secondary=False), 
    540        ) 
    541 
    542        if self.attr.order_by: 
    543            self._order_by_clauses = self.attr.order_by 
    544        else: 
    545            self._order_by_clauses = () 
    546 
    547    def _add_all_impl(self, iterator: Iterable[_T]) -> None: 
    548        for item in iterator: 
    549            self.attr.append( 
    550                attributes.instance_state(self.instance), 
    551                attributes.instance_dict(self.instance), 
    552                item, 
    553                None, 
    554            ) 
    555 
    556    def _remove_impl(self, item: _T) -> None: 
    557        self.attr.remove( 
    558            attributes.instance_state(self.instance), 
    559            attributes.instance_dict(self.instance), 
    560            item, 
    561            None, 
    562        ) 
    563 
    564 
    565class WriteOnlyCollection(_AbstractCollectionWriter[_T]): 
    566    """Write-only collection which can synchronize changes into the 
    567    attribute event system. 
    568 
    569    The :class:`.WriteOnlyCollection` is used in a mapping by 
    570    using the ``"write_only"`` lazy loading strategy with 
    571    :func:`_orm.relationship`.     For background on this configuration, 
    572    see :ref:`write_only_relationship`. 
    573 
    574    .. versionadded:: 2.0 
    575 
    576    .. seealso:: 
    577 
    578        :ref:`write_only_relationship` 
    579 
    580    """ 
    581 
    582    __slots__ = ( 
    583        "instance", 
    584        "attr", 
    585        "_where_criteria", 
    586        "_from_obj", 
    587        "_order_by_clauses", 
    588    ) 
    589 
    590    def __iter__(self) -> NoReturn: 
    591        raise TypeError( 
    592            "WriteOnly collections don't support iteration in-place; " 
    593            "to query for collection items, use the select() method to " 
    594            "produce a SQL statement and execute it with session.scalars()." 
    595        ) 
    596 
    597    def select(self) -> Select[_T]: 
    598        """Produce a :class:`_sql.Select` construct that represents the 
    599        rows within this instance-local :class:`_orm.WriteOnlyCollection`. 
    600 
    601        """ 
    602        stmt = select(self.attr.target_mapper).where(*self._where_criteria) 
    603        if self._from_obj: 
    604            stmt = stmt.select_from(*self._from_obj) 
    605        if self._order_by_clauses: 
    606            stmt = stmt.order_by(*self._order_by_clauses) 
    607        return stmt 
    608 
    609    def insert(self) -> Insert: 
    610        """For one-to-many collections, produce a :class:`_dml.Insert` which 
    611        will insert new rows in terms of this this instance-local 
    612        :class:`_orm.WriteOnlyCollection`. 
    613 
    614        This construct is only supported for a :class:`_orm.Relationship` 
    615        that does **not** include the :paramref:`_orm.relationship.secondary` 
    616        parameter.  For relationships that refer to a many-to-many table, 
    617        use ordinary bulk insert techniques to produce new objects, then 
    618        use :meth:`_orm.AbstractCollectionWriter.add_all` to associate them 
    619        with the collection. 
    620 
    621 
    622        """ 
    623 
    624        state = inspect(self.instance) 
    625        mapper = state.mapper 
    626        prop = mapper._props[self.attr.key] 
    627 
    628        if prop.direction is not RelationshipDirection.ONETOMANY: 
    629            raise exc.InvalidRequestError( 
    630                "Write only bulk INSERT only supported for one-to-many " 
    631                "collections; for many-to-many, use a separate bulk " 
    632                "INSERT along with add_all()." 
    633            ) 
    634 
    635        dict_: Dict[str, Any] = {} 
    636 
    637        for l, r in prop.synchronize_pairs: 
    638            fn = prop._get_attr_w_warn_on_none( 
    639                mapper, 
    640                state, 
    641                state.dict, 
    642                l, 
    643            ) 
    644 
    645            dict_[r.key] = bindparam(None, callable_=fn) 
    646 
    647        return insert(self.attr.target_mapper).values(**dict_) 
    648 
    649    def update(self) -> Update: 
    650        """Produce a :class:`_dml.Update` which will refer to rows in terms 
    651        of this instance-local :class:`_orm.WriteOnlyCollection`. 
    652 
    653        """ 
    654        return update(self.attr.target_mapper).where(*self._where_criteria) 
    655 
    656    def delete(self) -> Delete: 
    657        """Produce a :class:`_dml.Delete` which will refer to rows in terms 
    658        of this instance-local :class:`_orm.WriteOnlyCollection`. 
    659 
    660        """ 
    661        return delete(self.attr.target_mapper).where(*self._where_criteria) 
    662 
    663    def add_all(self, iterator: Iterable[_T]) -> None: 
    664        """Add an iterable of items to this :class:`_orm.WriteOnlyCollection`. 
    665 
    666        The given items will be persisted to the database in terms of 
    667        the parent instance's collection on the next flush. 
    668 
    669        """ 
    670        self._add_all_impl(iterator) 
    671 
    672    def add(self, item: _T) -> None: 
    673        """Add an item to this :class:`_orm.WriteOnlyCollection`. 
    674 
    675        The given item will be persisted to the database in terms of 
    676        the parent instance's collection on the next flush. 
    677 
    678        """ 
    679        self._add_all_impl([item]) 
    680 
    681    def remove(self, item: _T) -> None: 
    682        """Remove an item from this :class:`_orm.WriteOnlyCollection`. 
    683 
    684        The given item will be removed from the parent instance's collection on 
    685        the next flush. 
    686 
    687        """ 
    688        self._remove_impl(item)