Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/dynamic.py: 46%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

134 statements  

1# orm/dynamic.py 

2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8 

9"""Dynamic collection API. 

10 

11Dynamic collections act like Query() objects for read operations and support 

12basic add/delete mutation. 

13 

14.. legacy:: the "dynamic" loader is a legacy feature, superseded by the 

15 "write_only" loader. 

16 

17 

18""" 

19 

20from __future__ import annotations 

21 

22from typing import Any 

23from typing import Iterable 

24from typing import Iterator 

25from typing import List 

26from typing import Optional 

27from typing import overload 

28from typing import Tuple 

29from typing import Type 

30from typing import TYPE_CHECKING 

31from typing import TypeVar 

32from typing import Union 

33 

34from . import attributes 

35from . import exc as orm_exc 

36from . import relationships 

37from . import util as orm_util 

38from .base import PassiveFlag 

39from .query import Query 

40from .session import object_session 

41from .writeonly import _AbstractCollectionWriter 

42from .writeonly import _WriteOnlyAttributeImpl 

43from .writeonly import _WriteOnlyLoader 

44from .writeonly import WriteOnlyHistory 

45from .. import util 

46from ..engine import result 

47 

48if TYPE_CHECKING: 

49 from . import QueryableAttribute 

50 from .mapper import Mapper 

51 from .relationships import _RelationshipOrderByArg 

52 from .session import Session 

53 from .state import InstanceState 

54 from .util import AliasedClass 

55 from ..event import _Dispatch 

56 from ..sql.elements import ColumnElement 

57 

58_T = TypeVar("_T", bound=Any) 

59 

60 

61class DynamicCollectionHistory(WriteOnlyHistory[_T]): 

62 def __init__( 

63 self, 

64 attr: _DynamicAttributeImpl, 

65 state: InstanceState[_T], 

66 passive: PassiveFlag, 

67 apply_to: Optional[DynamicCollectionHistory[_T]] = None, 

68 ) -> None: 

69 if apply_to: 

70 coll = AppenderQuery(attr, state).autoflush(False) 

71 self.unchanged_items = util.OrderedIdentitySet(coll) 

72 self.added_items = apply_to.added_items 

73 self.deleted_items = apply_to.deleted_items 

74 self._reconcile_collection = True 

75 else: 

76 self.deleted_items = util.OrderedIdentitySet() 

77 self.added_items = util.OrderedIdentitySet() 

78 self.unchanged_items = util.OrderedIdentitySet() 

79 self._reconcile_collection = False 

80 

81 

82class _DynamicAttributeImpl(_WriteOnlyAttributeImpl): 

83 _supports_dynamic_iteration = True 

84 collection_history_cls = DynamicCollectionHistory[Any] 

85 query_class: Type[_AppenderMixin[Any]] # type: ignore[assignment] 

86 

87 def __init__( 

88 self, 

89 class_: Union[Type[Any], AliasedClass[Any]], 

90 key: str, 

91 dispatch: _Dispatch[QueryableAttribute[Any]], 

92 target_mapper: Mapper[_T], 

93 order_by: _RelationshipOrderByArg, 

94 query_class: Optional[Type[_AppenderMixin[_T]]] = None, 

95 **kw: Any, 

96 ) -> None: 

97 attributes._AttributeImpl.__init__( 

98 self, class_, key, None, dispatch, **kw 

99 ) 

100 self.target_mapper = target_mapper 

101 if order_by: 

102 self.order_by = tuple(order_by) 

103 if not query_class: 

104 self.query_class = AppenderQuery 

105 elif _AppenderMixin in query_class.mro(): 

106 self.query_class = query_class 

107 else: 

108 self.query_class = mixin_user_query(query_class) 

109 

110 

111@relationships.RelationshipProperty.strategy_for(lazy="dynamic") 

112class _DynaLoader(_WriteOnlyLoader): 

113 impl_class = _DynamicAttributeImpl 

114 

115 

116class _AppenderMixin(_AbstractCollectionWriter[_T]): 

117 """A mixin that expects to be mixing in a Query class with 

118 AbstractAppender. 

119 

120 

121 """ 

122 

123 query_class: Optional[Type[Query[_T]]] = None 

124 _order_by_clauses: Tuple[ColumnElement[Any], ...] 

125 

126 def __init__( 

127 self, attr: _DynamicAttributeImpl, state: InstanceState[_T] 

128 ) -> None: 

129 Query.__init__( 

130 self, # type: ignore[arg-type] 

131 attr.target_mapper, 

132 None, 

133 ) 

134 super().__init__(attr, state) 

135 

136 @property 

137 def session(self) -> Optional[Session]: 

138 sess = object_session(self.instance) 

139 if sess is not None and sess.autoflush and self.instance in sess: 

140 sess.flush() 

141 if not orm_util.has_identity(self.instance): 

142 return None 

143 else: 

144 return sess 

145 

146 @session.setter 

147 def session(self, session: Session) -> None: 

148 self.sess = session 

149 

150 def _iter(self) -> Union[result.ScalarResult[_T], result.Result[_T]]: 

151 sess = self.session 

152 if sess is None: 

153 state = attributes.instance_state(self.instance) 

154 if state.detached: 

155 util.warn( 

156 "Instance %s is detached, dynamic relationship cannot " 

157 "return a correct result. This warning will become " 

158 "a DetachedInstanceError in a future release." 

159 % (orm_util.state_str(state)) 

160 ) 

161 

162 return result.IteratorResult( 

163 result.SimpleResultMetaData([self.attr.class_.__name__]), 

164 iter( 

165 self.attr._get_collection_history( 

166 attributes.instance_state(self.instance), 

167 PassiveFlag.PASSIVE_NO_INITIALIZE, 

168 ).added_items 

169 ), 

170 _source_supports_scalars=True, 

171 ).scalars() 

172 else: 

173 return self._generate(sess)._iter() 

174 

175 if TYPE_CHECKING: 

176 

177 def __iter__(self) -> Iterator[_T]: ... 

178 

179 @overload 

180 def __getitem__(self, index: int) -> _T: ... 

181 

182 @overload 

183 def __getitem__(self, index: slice) -> List[_T]: ... 

184 

185 def __getitem__(self, index: Union[int, slice]) -> Union[_T, List[_T]]: 

186 sess = self.session 

187 if sess is None: 

188 return self.attr._get_collection_history( 

189 attributes.instance_state(self.instance), 

190 PassiveFlag.PASSIVE_NO_INITIALIZE, 

191 ).indexed(index) 

192 else: 

193 return self._generate(sess).__getitem__(index) 

194 

195 def count(self) -> int: 

196 sess = self.session 

197 if sess is None: 

198 return len( 

199 self.attr._get_collection_history( 

200 attributes.instance_state(self.instance), 

201 PassiveFlag.PASSIVE_NO_INITIALIZE, 

202 ).added_items 

203 ) 

204 else: 

205 return self._generate(sess).count() 

206 

207 def _generate( 

208 self, 

209 sess: Optional[Session] = None, 

210 ) -> Query[_T]: 

211 # note we're returning an entirely new Query class instance 

212 # here without any assignment capabilities; the class of this 

213 # query is determined by the session. 

214 instance = self.instance 

215 if sess is None: 

216 sess = object_session(instance) 

217 if sess is None: 

218 raise orm_exc.DetachedInstanceError( 

219 "Parent instance %s is not bound to a Session, and no " 

220 "contextual session is established; lazy load operation " 

221 "of attribute '%s' cannot proceed" 

222 % (orm_util.instance_str(instance), self.attr.key) 

223 ) 

224 

225 if self.query_class: 

226 query = self.query_class(self.attr.target_mapper, session=sess) 

227 else: 

228 query = sess.query(self.attr.target_mapper) 

229 

230 query._where_criteria = self._where_criteria 

231 query._from_obj = self._from_obj 

232 query._order_by_clauses = self._order_by_clauses 

233 

234 return query 

235 

236 def add_all(self, iterator: Iterable[_T]) -> None: 

237 """Add an iterable of items to this :class:`_orm.AppenderQuery`. 

238 

239 The given items will be persisted to the database in terms of 

240 the parent instance's collection on the next flush. 

241 

242 This method is provided to assist in delivering forwards-compatibility 

243 with the :class:`_orm.WriteOnlyCollection` collection class. 

244 

245 .. versionadded:: 2.0 

246 

247 """ 

248 self._add_all_impl(iterator) 

249 

250 def add(self, item: _T) -> None: 

251 """Add an item to this :class:`_orm.AppenderQuery`. 

252 

253 The given item will be persisted to the database in terms of 

254 the parent instance's collection on the next flush. 

255 

256 This method is provided to assist in delivering forwards-compatibility 

257 with the :class:`_orm.WriteOnlyCollection` collection class. 

258 

259 .. versionadded:: 2.0 

260 

261 """ 

262 self._add_all_impl([item]) 

263 

264 def extend(self, iterator: Iterable[_T]) -> None: 

265 """Add an iterable of items to this :class:`_orm.AppenderQuery`. 

266 

267 The given items will be persisted to the database in terms of 

268 the parent instance's collection on the next flush. 

269 

270 """ 

271 self._add_all_impl(iterator) 

272 

273 def append(self, item: _T) -> None: 

274 """Append an item to this :class:`_orm.AppenderQuery`. 

275 

276 The given item will be persisted to the database in terms of 

277 the parent instance's collection on the next flush. 

278 

279 """ 

280 self._add_all_impl([item]) 

281 

282 def remove(self, item: _T) -> None: 

283 """Remove an item from this :class:`_orm.AppenderQuery`. 

284 

285 The given item will be removed from the parent instance's collection on 

286 the next flush. 

287 

288 """ 

289 self._remove_impl(item) 

290 

291 

292class AppenderQuery(_AppenderMixin[_T], Query[_T]): # type: ignore[misc] 

293 """A dynamic query that supports basic collection storage operations. 

294 

295 Methods on :class:`.AppenderQuery` include all methods of 

296 :class:`_orm.Query`, plus additional methods used for collection 

297 persistence. 

298 

299 

300 """ 

301 

302 

303def mixin_user_query(cls: Any) -> type[_AppenderMixin[Any]]: 

304 """Return a new class with AppenderQuery functionality layered over.""" 

305 name = "Appender" + cls.__name__ 

306 return type(name, (_AppenderMixin, cls), {"query_class": cls})