1# orm/writeonly.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8"""Write-only collection API.
9
10This is an alternate mapped attribute style that only supports single-item
11collection mutation operations. To read the collection, a select()
12object must be executed each time.
13
14.. versionadded:: 2.0
15
16
17"""
18
19from __future__ import annotations
20
21from typing import Any
22from typing import Collection
23from typing import Dict
24from typing import Generic
25from typing import Iterable
26from typing import Iterator
27from typing import List
28from typing import NoReturn
29from typing import Optional
30from typing import overload
31from typing import Tuple
32from typing import Type
33from typing import TYPE_CHECKING
34from typing import TypeVar
35from typing import Union
36
37from sqlalchemy.sql import bindparam
38from . import attributes
39from . import interfaces
40from . import relationships
41from . import strategies
42from .base import ATTR_EMPTY
43from .base import NEVER_SET
44from .base import object_mapper
45from .base import PassiveFlag
46from .base import RelationshipDirection
47from .. import exc
48from .. import inspect
49from .. import log
50from .. import util
51from ..sql import delete
52from ..sql import insert
53from ..sql import select
54from ..sql import update
55from ..sql.dml import Delete
56from ..sql.dml import Insert
57from ..sql.dml import Update
58from ..util.typing import Literal
59
60if TYPE_CHECKING:
61 from . import QueryableAttribute
62 from ._typing import _InstanceDict
63 from .attributes import AttributeEventToken
64 from .base import LoaderCallableStatus
65 from .collections import _AdaptedCollectionProtocol
66 from .collections import CollectionAdapter
67 from .mapper import Mapper
68 from .relationships import _RelationshipOrderByArg
69 from .state import InstanceState
70 from .util import AliasedClass
71 from ..event import _Dispatch
72 from ..sql.selectable import FromClause
73 from ..sql.selectable import Select
74
75_T = TypeVar("_T", bound=Any)
76
77
78class WriteOnlyHistory(Generic[_T]):
79 """Overrides AttributeHistory to receive append/remove events directly."""
80
81 unchanged_items: util.OrderedIdentitySet
82 added_items: util.OrderedIdentitySet
83 deleted_items: util.OrderedIdentitySet
84 _reconcile_collection: bool
85
86 def __init__(
87 self,
88 attr: _WriteOnlyAttributeImpl,
89 state: InstanceState[_T],
90 passive: PassiveFlag,
91 apply_to: Optional[WriteOnlyHistory[_T]] = None,
92 ) -> None:
93 if apply_to:
94 if passive & PassiveFlag.SQL_OK:
95 raise exc.InvalidRequestError(
96 f"Attribute {attr} can't load the existing state from the "
97 "database for this operation; full iteration is not "
98 "permitted. If this is a delete operation, configure "
99 f"passive_deletes=True on the {attr} relationship in "
100 "order to resolve this error."
101 )
102
103 self.unchanged_items = apply_to.unchanged_items
104 self.added_items = apply_to.added_items
105 self.deleted_items = apply_to.deleted_items
106 self._reconcile_collection = apply_to._reconcile_collection
107 else:
108 self.deleted_items = util.OrderedIdentitySet()
109 self.added_items = util.OrderedIdentitySet()
110 self.unchanged_items = util.OrderedIdentitySet()
111 self._reconcile_collection = False
112
113 @property
114 def added_plus_unchanged(self) -> List[_T]:
115 return list(self.added_items.union(self.unchanged_items))
116
117 @property
118 def all_items(self) -> List[_T]:
119 return list(
120 self.added_items.union(self.unchanged_items).union(
121 self.deleted_items
122 )
123 )
124
125 def as_history(self) -> attributes.History:
126 if self._reconcile_collection:
127 added = self.added_items.difference(self.unchanged_items)
128 deleted = self.deleted_items.intersection(self.unchanged_items)
129 unchanged = self.unchanged_items.difference(deleted)
130 else:
131 added, unchanged, deleted = (
132 self.added_items,
133 self.unchanged_items,
134 self.deleted_items,
135 )
136 return attributes.History(list(added), list(unchanged), list(deleted))
137
138 def indexed(self, index: Union[int, slice]) -> Union[List[_T], _T]:
139 return list(self.added_items)[index]
140
141 def add_added(self, value: _T) -> None:
142 self.added_items.add(value)
143
144 def add_removed(self, value: _T) -> None:
145 if value in self.added_items:
146 self.added_items.remove(value)
147 else:
148 self.deleted_items.add(value)
149
150
151class _WriteOnlyAttributeImpl(
152 attributes._HasCollectionAdapter, attributes._AttributeImpl
153):
154 uses_objects: bool = True
155 default_accepts_scalar_loader: bool = False
156 supports_population: bool = False
157 _supports_dynamic_iteration: bool = False
158 collection: bool = False
159 dynamic: bool = True
160 order_by: _RelationshipOrderByArg = ()
161 collection_history_cls: Type[WriteOnlyHistory[Any]] = WriteOnlyHistory
162
163 query_class: Type[WriteOnlyCollection[Any]]
164
165 def __init__(
166 self,
167 class_: Union[Type[Any], AliasedClass[Any]],
168 key: str,
169 dispatch: _Dispatch[QueryableAttribute[Any]],
170 target_mapper: Mapper[_T],
171 order_by: _RelationshipOrderByArg,
172 **kw: Any,
173 ):
174 super().__init__(class_, key, None, dispatch, **kw)
175 self.target_mapper = target_mapper
176 self.query_class = WriteOnlyCollection
177 if order_by:
178 self.order_by = tuple(order_by)
179
180 def get(
181 self,
182 state: InstanceState[Any],
183 dict_: _InstanceDict,
184 passive: PassiveFlag = PassiveFlag.PASSIVE_OFF,
185 ) -> Union[util.OrderedIdentitySet, WriteOnlyCollection[Any]]:
186 if not passive & PassiveFlag.SQL_OK:
187 return self._get_collection_history(
188 state, PassiveFlag.PASSIVE_NO_INITIALIZE
189 ).added_items
190 else:
191 return self.query_class(self, state)
192
193 @overload
194 def get_collection(
195 self,
196 state: InstanceState[Any],
197 dict_: _InstanceDict,
198 user_data: Literal[None] = ...,
199 passive: Literal[PassiveFlag.PASSIVE_OFF] = ...,
200 ) -> CollectionAdapter: ...
201
202 @overload
203 def get_collection(
204 self,
205 state: InstanceState[Any],
206 dict_: _InstanceDict,
207 user_data: _AdaptedCollectionProtocol = ...,
208 passive: PassiveFlag = ...,
209 ) -> CollectionAdapter: ...
210
211 @overload
212 def get_collection(
213 self,
214 state: InstanceState[Any],
215 dict_: _InstanceDict,
216 user_data: Optional[_AdaptedCollectionProtocol] = ...,
217 passive: PassiveFlag = ...,
218 ) -> Union[
219 Literal[LoaderCallableStatus.PASSIVE_NO_RESULT], CollectionAdapter
220 ]: ...
221
222 def get_collection(
223 self,
224 state: InstanceState[Any],
225 dict_: _InstanceDict,
226 user_data: Optional[_AdaptedCollectionProtocol] = None,
227 passive: PassiveFlag = PassiveFlag.PASSIVE_OFF,
228 ) -> Union[
229 Literal[LoaderCallableStatus.PASSIVE_NO_RESULT], CollectionAdapter
230 ]:
231 data: Collection[Any]
232 if not passive & PassiveFlag.SQL_OK:
233 data = self._get_collection_history(state, passive).added_items
234 else:
235 history = self._get_collection_history(state, passive)
236 data = history.added_plus_unchanged
237 return _DynamicCollectionAdapter(data) # type: ignore[return-value]
238
239 @util.memoized_property
240 def _append_token(self) -> attributes.AttributeEventToken:
241 return attributes.AttributeEventToken(self, attributes.OP_APPEND)
242
243 @util.memoized_property
244 def _remove_token(self) -> attributes.AttributeEventToken:
245 return attributes.AttributeEventToken(self, attributes.OP_REMOVE)
246
247 def fire_append_event(
248 self,
249 state: InstanceState[Any],
250 dict_: _InstanceDict,
251 value: Any,
252 initiator: Optional[AttributeEventToken],
253 collection_history: Optional[WriteOnlyHistory[Any]] = None,
254 ) -> None:
255 if collection_history is None:
256 collection_history = self._modified_event(state, dict_)
257
258 collection_history.add_added(value)
259
260 for fn in self.dispatch.append:
261 value = fn(state, value, initiator or self._append_token)
262
263 if self.trackparent and value is not None:
264 self.sethasparent(attributes.instance_state(value), state, True)
265
266 def fire_remove_event(
267 self,
268 state: InstanceState[Any],
269 dict_: _InstanceDict,
270 value: Any,
271 initiator: Optional[AttributeEventToken],
272 collection_history: Optional[WriteOnlyHistory[Any]] = None,
273 ) -> None:
274 if collection_history is None:
275 collection_history = self._modified_event(state, dict_)
276
277 collection_history.add_removed(value)
278
279 if self.trackparent and value is not None:
280 self.sethasparent(attributes.instance_state(value), state, False)
281
282 for fn in self.dispatch.remove:
283 fn(state, value, initiator or self._remove_token)
284
285 def _modified_event(
286 self, state: InstanceState[Any], dict_: _InstanceDict
287 ) -> WriteOnlyHistory[Any]:
288 if self.key not in state.committed_state:
289 state.committed_state[self.key] = self.collection_history_cls(
290 self, state, PassiveFlag.PASSIVE_NO_FETCH
291 )
292
293 state._modified_event(dict_, self, NEVER_SET)
294
295 # this is a hack to allow the entities.ComparableEntity fixture
296 # to work
297 dict_[self.key] = True
298 return state.committed_state[self.key] # type: ignore[no-any-return]
299
300 def set(
301 self,
302 state: InstanceState[Any],
303 dict_: _InstanceDict,
304 value: Any,
305 initiator: Optional[AttributeEventToken] = None,
306 passive: PassiveFlag = PassiveFlag.PASSIVE_OFF,
307 check_old: Any = None,
308 pop: bool = False,
309 _adapt: bool = True,
310 ) -> None:
311 if initiator and initiator.parent_token is self.parent_token:
312 return
313
314 if pop and value is None:
315 return
316
317 iterable = value
318 new_values = list(iterable)
319 if state.has_identity:
320 if not self._supports_dynamic_iteration:
321 raise exc.InvalidRequestError(
322 f'Collection "{self}" does not support implicit '
323 "iteration; collection replacement operations "
324 "can't be used"
325 )
326 old_collection = util.IdentitySet(
327 self.get(state, dict_, passive=passive)
328 )
329
330 collection_history = self._modified_event(state, dict_)
331 if not state.has_identity:
332 old_collection = collection_history.added_items
333 else:
334 old_collection = old_collection.union(
335 collection_history.added_items
336 )
337
338 constants = old_collection.intersection(new_values)
339 additions = util.IdentitySet(new_values).difference(constants)
340 removals = old_collection.difference(constants)
341
342 for member in new_values:
343 if member in additions:
344 self.fire_append_event(
345 state,
346 dict_,
347 member,
348 None,
349 collection_history=collection_history,
350 )
351
352 for member in removals:
353 self.fire_remove_event(
354 state,
355 dict_,
356 member,
357 None,
358 collection_history=collection_history,
359 )
360
361 def delete(self, *args: Any, **kwargs: Any) -> NoReturn:
362 raise NotImplementedError()
363
364 def set_committed_value(
365 self, state: InstanceState[Any], dict_: _InstanceDict, value: Any
366 ) -> NoReturn:
367 raise NotImplementedError(
368 "Dynamic attributes don't support collection population."
369 )
370
371 def get_history(
372 self,
373 state: InstanceState[Any],
374 dict_: _InstanceDict,
375 passive: PassiveFlag = PassiveFlag.PASSIVE_NO_FETCH,
376 ) -> attributes.History:
377 c = self._get_collection_history(state, passive)
378 return c.as_history()
379
380 def get_all_pending(
381 self,
382 state: InstanceState[Any],
383 dict_: _InstanceDict,
384 passive: PassiveFlag = PassiveFlag.PASSIVE_NO_INITIALIZE,
385 ) -> List[Tuple[InstanceState[Any], Any]]:
386 c = self._get_collection_history(state, passive)
387 return [(attributes.instance_state(x), x) for x in c.all_items]
388
389 def _default_value(
390 self, state: InstanceState[Any], dict_: _InstanceDict
391 ) -> Any:
392 value = None
393 for fn in self.dispatch.init_scalar:
394 ret = fn(state, value, dict_)
395 if ret is not ATTR_EMPTY:
396 value = ret
397
398 return value
399
400 def _get_collection_history(
401 self, state: InstanceState[Any], passive: PassiveFlag
402 ) -> WriteOnlyHistory[Any]:
403 c: WriteOnlyHistory[Any]
404 if self.key in state.committed_state:
405 c = state.committed_state[self.key]
406 else:
407 c = self.collection_history_cls(
408 self, state, PassiveFlag.PASSIVE_NO_FETCH
409 )
410
411 if state.has_identity and (passive & PassiveFlag.INIT_OK):
412 return self.collection_history_cls(
413 self, state, passive, apply_to=c
414 )
415 else:
416 return c
417
418 def append(
419 self,
420 state: InstanceState[Any],
421 dict_: _InstanceDict,
422 value: Any,
423 initiator: Optional[AttributeEventToken],
424 passive: PassiveFlag = PassiveFlag.PASSIVE_NO_FETCH,
425 ) -> None:
426 if initiator is not self:
427 self.fire_append_event(state, dict_, value, initiator)
428
429 def remove(
430 self,
431 state: InstanceState[Any],
432 dict_: _InstanceDict,
433 value: Any,
434 initiator: Optional[AttributeEventToken],
435 passive: PassiveFlag = PassiveFlag.PASSIVE_NO_FETCH,
436 ) -> None:
437 if initiator is not self:
438 self.fire_remove_event(state, dict_, value, initiator)
439
440 def pop(
441 self,
442 state: InstanceState[Any],
443 dict_: _InstanceDict,
444 value: Any,
445 initiator: Optional[AttributeEventToken],
446 passive: PassiveFlag = PassiveFlag.PASSIVE_NO_FETCH,
447 ) -> None:
448 self.remove(state, dict_, value, initiator, passive=passive)
449
450
451@log.class_logger
452@relationships.RelationshipProperty.strategy_for(lazy="write_only")
453class _WriteOnlyLoader(strategies._AbstractRelationshipLoader, log.Identified):
454 impl_class = _WriteOnlyAttributeImpl
455
456 def init_class_attribute(self, mapper: Mapper[Any]) -> None:
457 self.is_class_level = True
458 if not self.uselist or self.parent_property.direction not in (
459 interfaces.ONETOMANY,
460 interfaces.MANYTOMANY,
461 ):
462 raise exc.InvalidRequestError(
463 "On relationship %s, 'dynamic' loaders cannot be used with "
464 "many-to-one/one-to-one relationships and/or "
465 "uselist=False." % self.parent_property
466 )
467
468 strategies._register_attribute( # type: ignore[no-untyped-call]
469 self.parent_property,
470 mapper,
471 useobject=True,
472 impl_class=self.impl_class,
473 target_mapper=self.parent_property.mapper,
474 order_by=self.parent_property.order_by,
475 query_class=self.parent_property.query_class,
476 )
477
478
479class _DynamicCollectionAdapter:
480 """simplified CollectionAdapter for internal API consistency"""
481
482 data: Collection[Any]
483
484 def __init__(self, data: Collection[Any]):
485 self.data = data
486
487 def __iter__(self) -> Iterator[Any]:
488 return iter(self.data)
489
490 def _reset_empty(self) -> None:
491 pass
492
493 def __len__(self) -> int:
494 return len(self.data)
495
496 def __bool__(self) -> bool:
497 return True
498
499
500class _AbstractCollectionWriter(Generic[_T]):
501 """Virtual collection which includes append/remove methods that synchronize
502 into the attribute event system.
503
504 """
505
506 if not TYPE_CHECKING:
507 __slots__ = ()
508
509 instance: _T
510 _from_obj: Tuple[FromClause, ...]
511
512 def __init__(
513 self, attr: _WriteOnlyAttributeImpl, state: InstanceState[_T]
514 ):
515 instance = state.obj()
516 if TYPE_CHECKING:
517 assert instance
518 self.instance = instance
519 self.attr = attr
520
521 mapper = object_mapper(instance)
522 prop = mapper._props[self.attr.key]
523
524 if prop.secondary is not None:
525 # this is a hack right now. The Query only knows how to
526 # make subsequent joins() without a given left-hand side
527 # from self._from_obj[0]. We need to ensure prop.secondary
528 # is in the FROM. So we purposely put the mapper selectable
529 # in _from_obj[0] to ensure a user-defined join() later on
530 # doesn't fail, and secondary is then in _from_obj[1].
531
532 # note also, we are using the official ORM-annotated selectable
533 # from __clause_element__(), see #7868
534 self._from_obj = (prop.mapper.__clause_element__(), prop.secondary)
535 else:
536 self._from_obj = ()
537
538 self._where_criteria = (
539 prop._with_parent(instance, alias_secondary=False),
540 )
541
542 if self.attr.order_by:
543 self._order_by_clauses = self.attr.order_by
544 else:
545 self._order_by_clauses = ()
546
547 def _add_all_impl(self, iterator: Iterable[_T]) -> None:
548 for item in iterator:
549 self.attr.append(
550 attributes.instance_state(self.instance),
551 attributes.instance_dict(self.instance),
552 item,
553 None,
554 )
555
556 def _remove_impl(self, item: _T) -> None:
557 self.attr.remove(
558 attributes.instance_state(self.instance),
559 attributes.instance_dict(self.instance),
560 item,
561 None,
562 )
563
564
565class WriteOnlyCollection(_AbstractCollectionWriter[_T]):
566 """Write-only collection which can synchronize changes into the
567 attribute event system.
568
569 The :class:`.WriteOnlyCollection` is used in a mapping by
570 using the ``"write_only"`` lazy loading strategy with
571 :func:`_orm.relationship`. For background on this configuration,
572 see :ref:`write_only_relationship`.
573
574 .. versionadded:: 2.0
575
576 .. seealso::
577
578 :ref:`write_only_relationship`
579
580 """
581
582 __slots__ = (
583 "instance",
584 "attr",
585 "_where_criteria",
586 "_from_obj",
587 "_order_by_clauses",
588 )
589
590 def __iter__(self) -> NoReturn:
591 raise TypeError(
592 "WriteOnly collections don't support iteration in-place; "
593 "to query for collection items, use the select() method to "
594 "produce a SQL statement and execute it with session.scalars()."
595 )
596
597 def select(self) -> Select[_T]:
598 """Produce a :class:`_sql.Select` construct that represents the
599 rows within this instance-local :class:`_orm.WriteOnlyCollection`.
600
601 """
602 stmt = select(self.attr.target_mapper).where(*self._where_criteria)
603 if self._from_obj:
604 stmt = stmt.select_from(*self._from_obj)
605 if self._order_by_clauses:
606 stmt = stmt.order_by(*self._order_by_clauses)
607 return stmt
608
609 def insert(self) -> Insert:
610 """For one-to-many collections, produce a :class:`_dml.Insert` which
611 will insert new rows in terms of this this instance-local
612 :class:`_orm.WriteOnlyCollection`.
613
614 This construct is only supported for a :class:`_orm.Relationship`
615 that does **not** include the :paramref:`_orm.relationship.secondary`
616 parameter. For relationships that refer to a many-to-many table,
617 use ordinary bulk insert techniques to produce new objects, then
618 use :meth:`_orm.AbstractCollectionWriter.add_all` to associate them
619 with the collection.
620
621
622 """
623
624 state = inspect(self.instance)
625 mapper = state.mapper
626 prop = mapper._props[self.attr.key]
627
628 if prop.direction is not RelationshipDirection.ONETOMANY:
629 raise exc.InvalidRequestError(
630 "Write only bulk INSERT only supported for one-to-many "
631 "collections; for many-to-many, use a separate bulk "
632 "INSERT along with add_all()."
633 )
634
635 dict_: Dict[str, Any] = {}
636
637 for l, r in prop.synchronize_pairs:
638 fn = prop._get_attr_w_warn_on_none(
639 mapper,
640 state,
641 state.dict,
642 l,
643 )
644
645 dict_[r.key] = bindparam(None, callable_=fn)
646
647 return insert(self.attr.target_mapper).values(**dict_)
648
649 def update(self) -> Update:
650 """Produce a :class:`_dml.Update` which will refer to rows in terms
651 of this instance-local :class:`_orm.WriteOnlyCollection`.
652
653 """
654 return update(self.attr.target_mapper).where(*self._where_criteria)
655
656 def delete(self) -> Delete:
657 """Produce a :class:`_dml.Delete` which will refer to rows in terms
658 of this instance-local :class:`_orm.WriteOnlyCollection`.
659
660 """
661 return delete(self.attr.target_mapper).where(*self._where_criteria)
662
663 def add_all(self, iterator: Iterable[_T]) -> None:
664 """Add an iterable of items to this :class:`_orm.WriteOnlyCollection`.
665
666 The given items will be persisted to the database in terms of
667 the parent instance's collection on the next flush.
668
669 """
670 self._add_all_impl(iterator)
671
672 def add(self, item: _T) -> None:
673 """Add an item to this :class:`_orm.WriteOnlyCollection`.
674
675 The given item will be persisted to the database in terms of
676 the parent instance's collection on the next flush.
677
678 """
679 self._add_all_impl([item])
680
681 def remove(self, item: _T) -> None:
682 """Remove an item from this :class:`_orm.WriteOnlyCollection`.
683
684 The given item will be removed from the parent instance's collection on
685 the next flush.
686
687 """
688 self._remove_impl(item)