Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/dynamic.py: 46%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

134 statements  

1# orm/dynamic.py 

2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8 

9"""Dynamic collection API. 

10 

11Dynamic collections act like Query() objects for read operations and support 

12basic add/delete mutation. 

13 

14.. legacy:: the "dynamic" loader is a legacy feature, superseded by the 

15 "write_only" loader. 

16 

17 

18""" 

19 

20from __future__ import annotations 

21 

22from typing import Any 

23from typing import Iterable 

24from typing import Iterator 

25from typing import List 

26from typing import Optional 

27from typing import overload 

28from typing import Tuple 

29from typing import Type 

30from typing import TYPE_CHECKING 

31from typing import TypeVar 

32from typing import Union 

33 

34from . import attributes 

35from . import exc as orm_exc 

36from . import relationships 

37from . import util as orm_util 

38from .base import PassiveFlag 

39from .query import Query 

40from .session import object_session 

41from .writeonly import _AbstractCollectionWriter 

42from .writeonly import _WriteOnlyAttributeImpl 

43from .writeonly import _WriteOnlyLoader 

44from .writeonly import WriteOnlyHistory 

45from .. import util 

46from ..engine import result 

47 

48 

49if TYPE_CHECKING: 

50 from . import QueryableAttribute 

51 from .mapper import Mapper 

52 from .relationships import _RelationshipOrderByArg 

53 from .session import Session 

54 from .state import InstanceState 

55 from .util import AliasedClass 

56 from ..event import _Dispatch 

57 from ..sql.elements import ColumnElement 

58 

59_T = TypeVar("_T", bound=Any) 

60 

61 

62class DynamicCollectionHistory(WriteOnlyHistory[_T]): 

63 def __init__( 

64 self, 

65 attr: _DynamicAttributeImpl, 

66 state: InstanceState[_T], 

67 passive: PassiveFlag, 

68 apply_to: Optional[DynamicCollectionHistory[_T]] = None, 

69 ) -> None: 

70 if apply_to: 

71 coll = AppenderQuery(attr, state).autoflush(False) 

72 self.unchanged_items = util.OrderedIdentitySet(coll) 

73 self.added_items = apply_to.added_items 

74 self.deleted_items = apply_to.deleted_items 

75 self._reconcile_collection = True 

76 else: 

77 self.deleted_items = util.OrderedIdentitySet() 

78 self.added_items = util.OrderedIdentitySet() 

79 self.unchanged_items = util.OrderedIdentitySet() 

80 self._reconcile_collection = False 

81 

82 

83class _DynamicAttributeImpl(_WriteOnlyAttributeImpl): 

84 _supports_dynamic_iteration = True 

85 collection_history_cls = DynamicCollectionHistory[Any] 

86 query_class: Type[_AppenderMixin[Any]] # type: ignore[assignment] 

87 

88 def __init__( 

89 self, 

90 class_: Union[Type[Any], AliasedClass[Any]], 

91 key: str, 

92 dispatch: _Dispatch[QueryableAttribute[Any]], 

93 target_mapper: Mapper[_T], 

94 order_by: _RelationshipOrderByArg, 

95 query_class: Optional[Type[_AppenderMixin[_T]]] = None, 

96 **kw: Any, 

97 ) -> None: 

98 attributes._AttributeImpl.__init__( 

99 self, class_, key, None, dispatch, **kw 

100 ) 

101 self.target_mapper = target_mapper 

102 if order_by: 

103 self.order_by = tuple(order_by) 

104 if not query_class: 

105 self.query_class = AppenderQuery 

106 elif _AppenderMixin in query_class.mro(): 

107 self.query_class = query_class 

108 else: 

109 self.query_class = mixin_user_query(query_class) 

110 

111 

112@relationships.RelationshipProperty.strategy_for(lazy="dynamic") 

113class _DynaLoader(_WriteOnlyLoader): 

114 impl_class = _DynamicAttributeImpl 

115 

116 

117class _AppenderMixin(_AbstractCollectionWriter[_T]): 

118 """A mixin that expects to be mixing in a Query class with 

119 AbstractAppender. 

120 

121 

122 """ 

123 

124 query_class: Optional[Type[Query[_T]]] = None 

125 _order_by_clauses: Tuple[ColumnElement[Any], ...] 

126 

127 def __init__( 

128 self, attr: _DynamicAttributeImpl, state: InstanceState[_T] 

129 ) -> None: 

130 Query.__init__( 

131 self, # type: ignore[arg-type] 

132 attr.target_mapper, 

133 None, 

134 ) 

135 super().__init__(attr, state) 

136 

137 @property 

138 def session(self) -> Optional[Session]: 

139 sess = object_session(self.instance) 

140 if sess is not None and sess.autoflush and self.instance in sess: 

141 sess.flush() 

142 if not orm_util.has_identity(self.instance): 

143 return None 

144 else: 

145 return sess 

146 

147 @session.setter 

148 def session(self, session: Session) -> None: 

149 self.sess = session 

150 

151 def _iter(self) -> Union[result.ScalarResult[_T], result.Result[_T]]: 

152 sess = self.session 

153 if sess is None: 

154 state = attributes.instance_state(self.instance) 

155 if state.detached: 

156 util.warn( 

157 "Instance %s is detached, dynamic relationship cannot " 

158 "return a correct result. This warning will become " 

159 "a DetachedInstanceError in a future release." 

160 % (orm_util.state_str(state)) 

161 ) 

162 

163 return result.IteratorResult( 

164 result.SimpleResultMetaData([self.attr.class_.__name__]), 

165 iter( 

166 self.attr._get_collection_history( 

167 attributes.instance_state(self.instance), 

168 PassiveFlag.PASSIVE_NO_INITIALIZE, 

169 ).added_items 

170 ), 

171 _source_supports_scalars=True, 

172 ).scalars() 

173 else: 

174 return self._generate(sess)._iter() 

175 

176 if TYPE_CHECKING: 

177 

178 def __iter__(self) -> Iterator[_T]: ... 

179 

180 @overload 

181 def __getitem__(self, index: int) -> _T: ... 

182 

183 @overload 

184 def __getitem__(self, index: slice) -> List[_T]: ... 

185 

186 def __getitem__(self, index: Union[int, slice]) -> Union[_T, List[_T]]: 

187 sess = self.session 

188 if sess is None: 

189 return self.attr._get_collection_history( 

190 attributes.instance_state(self.instance), 

191 PassiveFlag.PASSIVE_NO_INITIALIZE, 

192 ).indexed(index) 

193 else: 

194 return self._generate(sess).__getitem__(index) 

195 

196 def count(self) -> int: 

197 sess = self.session 

198 if sess is None: 

199 return len( 

200 self.attr._get_collection_history( 

201 attributes.instance_state(self.instance), 

202 PassiveFlag.PASSIVE_NO_INITIALIZE, 

203 ).added_items 

204 ) 

205 else: 

206 return self._generate(sess).count() 

207 

208 def _generate( 

209 self, 

210 sess: Optional[Session] = None, 

211 ) -> Query[_T]: 

212 # note we're returning an entirely new Query class instance 

213 # here without any assignment capabilities; the class of this 

214 # query is determined by the session. 

215 instance = self.instance 

216 if sess is None: 

217 sess = object_session(instance) 

218 if sess is None: 

219 raise orm_exc.DetachedInstanceError( 

220 "Parent instance %s is not bound to a Session, and no " 

221 "contextual session is established; lazy load operation " 

222 "of attribute '%s' cannot proceed" 

223 % (orm_util.instance_str(instance), self.attr.key) 

224 ) 

225 

226 if self.query_class: 

227 query = self.query_class(self.attr.target_mapper, session=sess) 

228 else: 

229 query = sess.query(self.attr.target_mapper) 

230 

231 query._where_criteria = self._where_criteria 

232 query._from_obj = self._from_obj 

233 query._order_by_clauses = self._order_by_clauses 

234 

235 return query 

236 

237 def add_all(self, iterator: Iterable[_T]) -> None: 

238 """Add an iterable of items to this :class:`_orm.AppenderQuery`. 

239 

240 The given items will be persisted to the database in terms of 

241 the parent instance's collection on the next flush. 

242 

243 This method is provided to assist in delivering forwards-compatibility 

244 with the :class:`_orm.WriteOnlyCollection` collection class. 

245 

246 .. versionadded:: 2.0 

247 

248 """ 

249 self._add_all_impl(iterator) 

250 

251 def add(self, item: _T) -> None: 

252 """Add an item to this :class:`_orm.AppenderQuery`. 

253 

254 The given item will be persisted to the database in terms of 

255 the parent instance's collection on the next flush. 

256 

257 This method is provided to assist in delivering forwards-compatibility 

258 with the :class:`_orm.WriteOnlyCollection` collection class. 

259 

260 .. versionadded:: 2.0 

261 

262 """ 

263 self._add_all_impl([item]) 

264 

265 def extend(self, iterator: Iterable[_T]) -> None: 

266 """Add an iterable of items to this :class:`_orm.AppenderQuery`. 

267 

268 The given items will be persisted to the database in terms of 

269 the parent instance's collection on the next flush. 

270 

271 """ 

272 self._add_all_impl(iterator) 

273 

274 def append(self, item: _T) -> None: 

275 """Append an item to this :class:`_orm.AppenderQuery`. 

276 

277 The given item will be persisted to the database in terms of 

278 the parent instance's collection on the next flush. 

279 

280 """ 

281 self._add_all_impl([item]) 

282 

283 def remove(self, item: _T) -> None: 

284 """Remove an item from this :class:`_orm.AppenderQuery`. 

285 

286 The given item will be removed from the parent instance's collection on 

287 the next flush. 

288 

289 """ 

290 self._remove_impl(item) 

291 

292 

293class AppenderQuery(_AppenderMixin[_T], Query[_T]): # type: ignore[misc] 

294 """A dynamic query that supports basic collection storage operations. 

295 

296 Methods on :class:`.AppenderQuery` include all methods of 

297 :class:`_orm.Query`, plus additional methods used for collection 

298 persistence. 

299 

300 

301 """ 

302 

303 

304def mixin_user_query(cls: Any) -> type[_AppenderMixin[Any]]: 

305 """Return a new class with AppenderQuery functionality layered over.""" 

306 name = "Appender" + cls.__name__ 

307 return type(name, (_AppenderMixin, cls), {"query_class": cls})