1# orm/dynamic.py
2# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8
9"""Dynamic collection API.
10
11Dynamic collections act like Query() objects for read operations and support
12basic add/delete mutation.
13
14.. legacy:: the "dynamic" loader is a legacy feature, superseded by the
15 "write_only" loader.
16
17
18"""
19
20from __future__ import annotations
21
22from typing import Any
23from typing import Iterable
24from typing import Iterator
25from typing import List
26from typing import Optional
27from typing import Tuple
28from typing import Type
29from typing import TYPE_CHECKING
30from typing import TypeVar
31from typing import Union
32
33from . import attributes
34from . import exc as orm_exc
35from . import relationships
36from . import util as orm_util
37from .base import PassiveFlag
38from .query import Query
39from .session import object_session
40from .writeonly import AbstractCollectionWriter
41from .writeonly import WriteOnlyAttributeImpl
42from .writeonly import WriteOnlyHistory
43from .writeonly import WriteOnlyLoader
44from .. import util
45from ..engine import result
46
47
48if TYPE_CHECKING:
49 from . import QueryableAttribute
50 from .mapper import Mapper
51 from .relationships import _RelationshipOrderByArg
52 from .session import Session
53 from .state import InstanceState
54 from .util import AliasedClass
55 from ..event import _Dispatch
56 from ..sql.elements import ColumnElement
57
58_T = TypeVar("_T", bound=Any)
59
60
61class DynamicCollectionHistory(WriteOnlyHistory[_T]):
62 def __init__(
63 self,
64 attr: DynamicAttributeImpl,
65 state: InstanceState[_T],
66 passive: PassiveFlag,
67 apply_to: Optional[DynamicCollectionHistory[_T]] = None,
68 ) -> None:
69 if apply_to:
70 coll = AppenderQuery(attr, state).autoflush(False)
71 self.unchanged_items = util.OrderedIdentitySet(coll)
72 self.added_items = apply_to.added_items
73 self.deleted_items = apply_to.deleted_items
74 self._reconcile_collection = True
75 else:
76 self.deleted_items = util.OrderedIdentitySet()
77 self.added_items = util.OrderedIdentitySet()
78 self.unchanged_items = util.OrderedIdentitySet()
79 self._reconcile_collection = False
80
81
82class DynamicAttributeImpl(WriteOnlyAttributeImpl):
83 _supports_dynamic_iteration = True
84 collection_history_cls = DynamicCollectionHistory[Any]
85 query_class: Type[AppenderMixin[Any]] # type: ignore[assignment]
86
87 def __init__(
88 self,
89 class_: Union[Type[Any], AliasedClass[Any]],
90 key: str,
91 dispatch: _Dispatch[QueryableAttribute[Any]],
92 target_mapper: Mapper[_T],
93 order_by: _RelationshipOrderByArg,
94 query_class: Optional[Type[AppenderMixin[_T]]] = None,
95 **kw: Any,
96 ) -> None:
97 attributes.AttributeImpl.__init__(
98 self, class_, key, None, dispatch, **kw
99 )
100 self.target_mapper = target_mapper
101 if order_by:
102 self.order_by = tuple(order_by)
103 if not query_class:
104 self.query_class = AppenderQuery
105 elif AppenderMixin in query_class.mro():
106 self.query_class = query_class
107 else:
108 self.query_class = mixin_user_query(query_class)
109
110
111@relationships.RelationshipProperty.strategy_for(lazy="dynamic")
112class DynaLoader(WriteOnlyLoader):
113 impl_class = DynamicAttributeImpl
114
115
116class AppenderMixin(AbstractCollectionWriter[_T]):
117 """A mixin that expects to be mixing in a Query class with
118 AbstractAppender.
119
120
121 """
122
123 query_class: Optional[Type[Query[_T]]] = None
124 _order_by_clauses: Tuple[ColumnElement[Any], ...]
125
126 def __init__(
127 self, attr: DynamicAttributeImpl, state: InstanceState[_T]
128 ) -> None:
129 Query.__init__(
130 self, # type: ignore[arg-type]
131 attr.target_mapper,
132 None,
133 )
134 super().__init__(attr, state)
135
136 @property
137 def session(self) -> Optional[Session]:
138 sess = object_session(self.instance)
139 if sess is not None and sess.autoflush and self.instance in sess:
140 sess.flush()
141 if not orm_util.has_identity(self.instance):
142 return None
143 else:
144 return sess
145
146 @session.setter
147 def session(self, session: Session) -> None:
148 self.sess = session
149
150 def _iter(self) -> Union[result.ScalarResult[_T], result.Result[_T]]:
151 sess = self.session
152 if sess is None:
153 state = attributes.instance_state(self.instance)
154 if state.detached:
155 util.warn(
156 "Instance %s is detached, dynamic relationship cannot "
157 "return a correct result. This warning will become "
158 "a DetachedInstanceError in a future release."
159 % (orm_util.state_str(state))
160 )
161
162 return result.IteratorResult(
163 result.SimpleResultMetaData([self.attr.class_.__name__]),
164 iter(
165 self.attr._get_collection_history(
166 attributes.instance_state(self.instance),
167 PassiveFlag.PASSIVE_NO_INITIALIZE,
168 ).added_items
169 ),
170 _source_supports_scalars=True,
171 ).scalars()
172 else:
173 return self._generate(sess)._iter()
174
175 if TYPE_CHECKING:
176
177 def __iter__(self) -> Iterator[_T]: ...
178
179 def __getitem__(self, index: Any) -> Union[_T, List[_T]]:
180 sess = self.session
181 if sess is None:
182 return self.attr._get_collection_history(
183 attributes.instance_state(self.instance),
184 PassiveFlag.PASSIVE_NO_INITIALIZE,
185 ).indexed(index)
186 else:
187 return self._generate(sess).__getitem__(index) # type: ignore[no-any-return] # noqa: E501
188
189 def count(self) -> int:
190 sess = self.session
191 if sess is None:
192 return len(
193 self.attr._get_collection_history(
194 attributes.instance_state(self.instance),
195 PassiveFlag.PASSIVE_NO_INITIALIZE,
196 ).added_items
197 )
198 else:
199 return self._generate(sess).count()
200
201 def _generate(
202 self,
203 sess: Optional[Session] = None,
204 ) -> Query[_T]:
205 # note we're returning an entirely new Query class instance
206 # here without any assignment capabilities; the class of this
207 # query is determined by the session.
208 instance = self.instance
209 if sess is None:
210 sess = object_session(instance)
211 if sess is None:
212 raise orm_exc.DetachedInstanceError(
213 "Parent instance %s is not bound to a Session, and no "
214 "contextual session is established; lazy load operation "
215 "of attribute '%s' cannot proceed"
216 % (orm_util.instance_str(instance), self.attr.key)
217 )
218
219 if self.query_class:
220 query = self.query_class(self.attr.target_mapper, session=sess)
221 else:
222 query = sess.query(self.attr.target_mapper)
223
224 query._where_criteria = self._where_criteria
225 query._from_obj = self._from_obj
226 query._order_by_clauses = self._order_by_clauses
227
228 return query
229
230 def add_all(self, iterator: Iterable[_T]) -> None:
231 """Add an iterable of items to this :class:`_orm.AppenderQuery`.
232
233 The given items will be persisted to the database in terms of
234 the parent instance's collection on the next flush.
235
236 This method is provided to assist in delivering forwards-compatibility
237 with the :class:`_orm.WriteOnlyCollection` collection class.
238
239 .. versionadded:: 2.0
240
241 """
242 self._add_all_impl(iterator)
243
244 def add(self, item: _T) -> None:
245 """Add an item to this :class:`_orm.AppenderQuery`.
246
247 The given item will be persisted to the database in terms of
248 the parent instance's collection on the next flush.
249
250 This method is provided to assist in delivering forwards-compatibility
251 with the :class:`_orm.WriteOnlyCollection` collection class.
252
253 .. versionadded:: 2.0
254
255 """
256 self._add_all_impl([item])
257
258 def extend(self, iterator: Iterable[_T]) -> None:
259 """Add an iterable of items to this :class:`_orm.AppenderQuery`.
260
261 The given items will be persisted to the database in terms of
262 the parent instance's collection on the next flush.
263
264 """
265 self._add_all_impl(iterator)
266
267 def append(self, item: _T) -> None:
268 """Append an item to this :class:`_orm.AppenderQuery`.
269
270 The given item will be persisted to the database in terms of
271 the parent instance's collection on the next flush.
272
273 """
274 self._add_all_impl([item])
275
276 def remove(self, item: _T) -> None:
277 """Remove an item from this :class:`_orm.AppenderQuery`.
278
279 The given item will be removed from the parent instance's collection on
280 the next flush.
281
282 """
283 self._remove_impl(item)
284
285
286class AppenderQuery(AppenderMixin[_T], Query[_T]): # type: ignore[misc]
287 """A dynamic query that supports basic collection storage operations.
288
289 Methods on :class:`.AppenderQuery` include all methods of
290 :class:`_orm.Query`, plus additional methods used for collection
291 persistence.
292
293
294 """
295
296
297def mixin_user_query(cls: Any) -> type[AppenderMixin[Any]]:
298 """Return a new class with AppenderQuery functionality layered over."""
299 name = "Appender" + cls.__name__
300 return type(name, (AppenderMixin, cls), {"query_class": cls})