Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/dynamic.py: 45%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

128 statements  

1# orm/dynamic.py 

2# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8 

9"""Dynamic collection API. 

10 

11Dynamic collections act like Query() objects for read operations and support 

12basic add/delete mutation. 

13 

14.. legacy:: the "dynamic" loader is a legacy feature, superseded by the 

15 "write_only" loader. 

16 

17 

18""" 

19 

20from __future__ import annotations 

21 

22from typing import Any 

23from typing import Iterable 

24from typing import Iterator 

25from typing import List 

26from typing import Optional 

27from typing import Tuple 

28from typing import Type 

29from typing import TYPE_CHECKING 

30from typing import TypeVar 

31from typing import Union 

32 

33from . import attributes 

34from . import exc as orm_exc 

35from . import relationships 

36from . import util as orm_util 

37from .base import PassiveFlag 

38from .query import Query 

39from .session import object_session 

40from .writeonly import AbstractCollectionWriter 

41from .writeonly import WriteOnlyAttributeImpl 

42from .writeonly import WriteOnlyHistory 

43from .writeonly import WriteOnlyLoader 

44from .. import util 

45from ..engine import result 

46 

47 

48if TYPE_CHECKING: 

49 from . import QueryableAttribute 

50 from .mapper import Mapper 

51 from .relationships import _RelationshipOrderByArg 

52 from .session import Session 

53 from .state import InstanceState 

54 from .util import AliasedClass 

55 from ..event import _Dispatch 

56 from ..sql.elements import ColumnElement 

57 

58_T = TypeVar("_T", bound=Any) 

59 

60 

61class DynamicCollectionHistory(WriteOnlyHistory[_T]): 

62 def __init__( 

63 self, 

64 attr: DynamicAttributeImpl, 

65 state: InstanceState[_T], 

66 passive: PassiveFlag, 

67 apply_to: Optional[DynamicCollectionHistory[_T]] = None, 

68 ) -> None: 

69 if apply_to: 

70 coll = AppenderQuery(attr, state).autoflush(False) 

71 self.unchanged_items = util.OrderedIdentitySet(coll) 

72 self.added_items = apply_to.added_items 

73 self.deleted_items = apply_to.deleted_items 

74 self._reconcile_collection = True 

75 else: 

76 self.deleted_items = util.OrderedIdentitySet() 

77 self.added_items = util.OrderedIdentitySet() 

78 self.unchanged_items = util.OrderedIdentitySet() 

79 self._reconcile_collection = False 

80 

81 

82class DynamicAttributeImpl(WriteOnlyAttributeImpl): 

83 _supports_dynamic_iteration = True 

84 collection_history_cls = DynamicCollectionHistory[Any] 

85 query_class: Type[AppenderMixin[Any]] # type: ignore[assignment] 

86 

87 def __init__( 

88 self, 

89 class_: Union[Type[Any], AliasedClass[Any]], 

90 key: str, 

91 dispatch: _Dispatch[QueryableAttribute[Any]], 

92 target_mapper: Mapper[_T], 

93 order_by: _RelationshipOrderByArg, 

94 query_class: Optional[Type[AppenderMixin[_T]]] = None, 

95 **kw: Any, 

96 ) -> None: 

97 attributes.AttributeImpl.__init__( 

98 self, class_, key, None, dispatch, **kw 

99 ) 

100 self.target_mapper = target_mapper 

101 if order_by: 

102 self.order_by = tuple(order_by) 

103 if not query_class: 

104 self.query_class = AppenderQuery 

105 elif AppenderMixin in query_class.mro(): 

106 self.query_class = query_class 

107 else: 

108 self.query_class = mixin_user_query(query_class) 

109 

110 

111@relationships.RelationshipProperty.strategy_for(lazy="dynamic") 

112class DynaLoader(WriteOnlyLoader): 

113 impl_class = DynamicAttributeImpl 

114 

115 

116class AppenderMixin(AbstractCollectionWriter[_T]): 

117 """A mixin that expects to be mixing in a Query class with 

118 AbstractAppender. 

119 

120 

121 """ 

122 

123 query_class: Optional[Type[Query[_T]]] = None 

124 _order_by_clauses: Tuple[ColumnElement[Any], ...] 

125 

126 def __init__( 

127 self, attr: DynamicAttributeImpl, state: InstanceState[_T] 

128 ) -> None: 

129 Query.__init__( 

130 self, # type: ignore[arg-type] 

131 attr.target_mapper, 

132 None, 

133 ) 

134 super().__init__(attr, state) 

135 

136 @property 

137 def session(self) -> Optional[Session]: 

138 sess = object_session(self.instance) 

139 if sess is not None and sess.autoflush and self.instance in sess: 

140 sess.flush() 

141 if not orm_util.has_identity(self.instance): 

142 return None 

143 else: 

144 return sess 

145 

146 @session.setter 

147 def session(self, session: Session) -> None: 

148 self.sess = session 

149 

150 def _iter(self) -> Union[result.ScalarResult[_T], result.Result[_T]]: 

151 sess = self.session 

152 if sess is None: 

153 state = attributes.instance_state(self.instance) 

154 if state.detached: 

155 util.warn( 

156 "Instance %s is detached, dynamic relationship cannot " 

157 "return a correct result. This warning will become " 

158 "a DetachedInstanceError in a future release." 

159 % (orm_util.state_str(state)) 

160 ) 

161 

162 return result.IteratorResult( 

163 result.SimpleResultMetaData([self.attr.class_.__name__]), 

164 iter( 

165 self.attr._get_collection_history( 

166 attributes.instance_state(self.instance), 

167 PassiveFlag.PASSIVE_NO_INITIALIZE, 

168 ).added_items 

169 ), 

170 _source_supports_scalars=True, 

171 ).scalars() 

172 else: 

173 return self._generate(sess)._iter() 

174 

175 if TYPE_CHECKING: 

176 

177 def __iter__(self) -> Iterator[_T]: ... 

178 

179 def __getitem__(self, index: Any) -> Union[_T, List[_T]]: 

180 sess = self.session 

181 if sess is None: 

182 return self.attr._get_collection_history( 

183 attributes.instance_state(self.instance), 

184 PassiveFlag.PASSIVE_NO_INITIALIZE, 

185 ).indexed(index) 

186 else: 

187 return self._generate(sess).__getitem__(index) # type: ignore[no-any-return] # noqa: E501 

188 

189 def count(self) -> int: 

190 sess = self.session 

191 if sess is None: 

192 return len( 

193 self.attr._get_collection_history( 

194 attributes.instance_state(self.instance), 

195 PassiveFlag.PASSIVE_NO_INITIALIZE, 

196 ).added_items 

197 ) 

198 else: 

199 return self._generate(sess).count() 

200 

201 def _generate( 

202 self, 

203 sess: Optional[Session] = None, 

204 ) -> Query[_T]: 

205 # note we're returning an entirely new Query class instance 

206 # here without any assignment capabilities; the class of this 

207 # query is determined by the session. 

208 instance = self.instance 

209 if sess is None: 

210 sess = object_session(instance) 

211 if sess is None: 

212 raise orm_exc.DetachedInstanceError( 

213 "Parent instance %s is not bound to a Session, and no " 

214 "contextual session is established; lazy load operation " 

215 "of attribute '%s' cannot proceed" 

216 % (orm_util.instance_str(instance), self.attr.key) 

217 ) 

218 

219 if self.query_class: 

220 query = self.query_class(self.attr.target_mapper, session=sess) 

221 else: 

222 query = sess.query(self.attr.target_mapper) 

223 

224 query._where_criteria = self._where_criteria 

225 query._from_obj = self._from_obj 

226 query._order_by_clauses = self._order_by_clauses 

227 

228 return query 

229 

230 def add_all(self, iterator: Iterable[_T]) -> None: 

231 """Add an iterable of items to this :class:`_orm.AppenderQuery`. 

232 

233 The given items will be persisted to the database in terms of 

234 the parent instance's collection on the next flush. 

235 

236 This method is provided to assist in delivering forwards-compatibility 

237 with the :class:`_orm.WriteOnlyCollection` collection class. 

238 

239 .. versionadded:: 2.0 

240 

241 """ 

242 self._add_all_impl(iterator) 

243 

244 def add(self, item: _T) -> None: 

245 """Add an item to this :class:`_orm.AppenderQuery`. 

246 

247 The given item will be persisted to the database in terms of 

248 the parent instance's collection on the next flush. 

249 

250 This method is provided to assist in delivering forwards-compatibility 

251 with the :class:`_orm.WriteOnlyCollection` collection class. 

252 

253 .. versionadded:: 2.0 

254 

255 """ 

256 self._add_all_impl([item]) 

257 

258 def extend(self, iterator: Iterable[_T]) -> None: 

259 """Add an iterable of items to this :class:`_orm.AppenderQuery`. 

260 

261 The given items will be persisted to the database in terms of 

262 the parent instance's collection on the next flush. 

263 

264 """ 

265 self._add_all_impl(iterator) 

266 

267 def append(self, item: _T) -> None: 

268 """Append an item to this :class:`_orm.AppenderQuery`. 

269 

270 The given item will be persisted to the database in terms of 

271 the parent instance's collection on the next flush. 

272 

273 """ 

274 self._add_all_impl([item]) 

275 

276 def remove(self, item: _T) -> None: 

277 """Remove an item from this :class:`_orm.AppenderQuery`. 

278 

279 The given item will be removed from the parent instance's collection on 

280 the next flush. 

281 

282 """ 

283 self._remove_impl(item) 

284 

285 

286class AppenderQuery(AppenderMixin[_T], Query[_T]): # type: ignore[misc] 

287 """A dynamic query that supports basic collection storage operations. 

288 

289 Methods on :class:`.AppenderQuery` include all methods of 

290 :class:`_orm.Query`, plus additional methods used for collection 

291 persistence. 

292 

293 

294 """ 

295 

296 

297def mixin_user_query(cls: Any) -> type[AppenderMixin[Any]]: 

298 """Return a new class with AppenderQuery functionality layered over.""" 

299 name = "Appender" + cls.__name__ 

300 return type(name, (AppenderMixin, cls), {"query_class": cls})