Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/loading.py: 9%

566 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1# orm/loading.py 

2# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8"""private module containing functions used to convert database 

9rows into object instances and associated state. 

10 

11the functions here are called primarily by Query, Mapper, 

12as well as some of the attribute loading strategies. 

13 

14""" 

15from __future__ import absolute_import 

16 

17from . import attributes 

18from . import exc as orm_exc 

19from . import path_registry 

20from . import strategy_options 

21from .base import _DEFER_FOR_STATE 

22from .base import _RAISE_FOR_STATE 

23from .base import _SET_DEFERRED_EXPIRED 

24from .util import _none_set 

25from .util import state_str 

26from .. import exc as sa_exc 

27from .. import future 

28from .. import util 

29from ..engine import result_tuple 

30from ..engine.result import ChunkedIteratorResult 

31from ..engine.result import FrozenResult 

32from ..engine.result import SimpleResultMetaData 

33from ..sql import util as sql_util 

34from ..sql.selectable import LABEL_STYLE_TABLENAME_PLUS_COL 

35from ..sql.selectable import SelectState 

36 

37_new_runid = util.counter() 

38 

39 

40def instances(cursor, context): 

41 """Return a :class:`.Result` given an ORM query context. 

42 

43 :param cursor: a :class:`.CursorResult`, generated by a statement 

44 which came from :class:`.ORMCompileState` 

45 

46 :param context: a :class:`.QueryContext` object 

47 

48 :return: a :class:`.Result` object representing ORM results 

49 

50 .. versionchanged:: 1.4 The instances() function now uses 

51 :class:`.Result` objects and has an all new interface. 

52 

53 """ 

54 

55 context.runid = _new_runid() 

56 context.post_load_paths = {} 

57 

58 compile_state = context.compile_state 

59 filtered = compile_state._has_mapper_entities 

60 single_entity = ( 

61 not context.load_options._only_return_tuples 

62 and len(compile_state._entities) == 1 

63 and compile_state._entities[0].supports_single_entity 

64 ) 

65 

66 try: 

67 (process, labels, extra) = list( 

68 zip( 

69 *[ 

70 query_entity.row_processor(context, cursor) 

71 for query_entity in context.compile_state._entities 

72 ] 

73 ) 

74 ) 

75 

76 if context.yield_per and ( 

77 context.loaders_require_buffering 

78 or context.loaders_require_uniquing 

79 ): 

80 raise sa_exc.InvalidRequestError( 

81 "Can't use yield_per with eager loaders that require uniquing " 

82 "or row buffering, e.g. joinedload() against collections " 

83 "or subqueryload(). Consider the selectinload() strategy " 

84 "for better flexibility in loading objects." 

85 ) 

86 

87 except Exception: 

88 with util.safe_reraise(): 

89 cursor.close() 

90 

91 def _no_unique(entry): 

92 raise sa_exc.InvalidRequestError( 

93 "Can't use the ORM yield_per feature in conjunction with unique()" 

94 ) 

95 

96 def _not_hashable(datatype): 

97 def go(obj): 

98 raise sa_exc.InvalidRequestError( 

99 "Can't apply uniqueness to row tuple containing value of " 

100 "type %r; this datatype produces non-hashable values" 

101 % datatype 

102 ) 

103 

104 return go 

105 

106 if context.load_options._legacy_uniquing: 

107 unique_filters = [ 

108 _no_unique 

109 if context.yield_per 

110 else id 

111 if ( 

112 ent.use_id_for_hash 

113 or ent._non_hashable_value 

114 or ent._null_column_type 

115 ) 

116 else None 

117 for ent in context.compile_state._entities 

118 ] 

119 else: 

120 unique_filters = [ 

121 _no_unique 

122 if context.yield_per 

123 else _not_hashable(ent.column.type) 

124 if (not ent.use_id_for_hash and ent._non_hashable_value) 

125 else id 

126 if ent.use_id_for_hash 

127 else None 

128 for ent in context.compile_state._entities 

129 ] 

130 

131 row_metadata = SimpleResultMetaData( 

132 labels, extra, _unique_filters=unique_filters 

133 ) 

134 

135 def chunks(size): 

136 while True: 

137 yield_per = size 

138 

139 context.partials = {} 

140 

141 if yield_per: 

142 fetch = cursor.fetchmany(yield_per) 

143 

144 if not fetch: 

145 break 

146 else: 

147 fetch = cursor._raw_all_rows() 

148 

149 if single_entity: 

150 proc = process[0] 

151 rows = [proc(row) for row in fetch] 

152 else: 

153 rows = [ 

154 tuple([proc(row) for proc in process]) for row in fetch 

155 ] 

156 

157 for path, post_load in context.post_load_paths.items(): 

158 post_load.invoke(context, path) 

159 

160 yield rows 

161 

162 if not yield_per: 

163 break 

164 

165 if context.execution_options.get("prebuffer_rows", False): 

166 # this is a bit of a hack at the moment. 

167 # I would rather have some option in the result to pre-buffer 

168 # internally. 

169 _prebuffered = list(chunks(None)) 

170 

171 def chunks(size): 

172 return iter(_prebuffered) 

173 

174 result = ChunkedIteratorResult( 

175 row_metadata, 

176 chunks, 

177 source_supports_scalars=single_entity, 

178 raw=cursor, 

179 dynamic_yield_per=cursor.context._is_server_side, 

180 ) 

181 

182 # filtered and single_entity are used to indicate to legacy Query that the 

183 # query has ORM entities, so legacy deduping and scalars should be called 

184 # on the result. 

185 result._attributes = result._attributes.union( 

186 dict(filtered=filtered, is_single_entity=single_entity) 

187 ) 

188 

189 # multi_row_eager_loaders OTOH is specific to joinedload. 

190 if context.compile_state.multi_row_eager_loaders: 

191 

192 def require_unique(obj): 

193 raise sa_exc.InvalidRequestError( 

194 "The unique() method must be invoked on this Result, " 

195 "as it contains results that include joined eager loads " 

196 "against collections" 

197 ) 

198 

199 result._unique_filter_state = (None, require_unique) 

200 

201 if context.yield_per: 

202 result.yield_per(context.yield_per) 

203 

204 return result 

205 

206 

207@util.preload_module("sqlalchemy.orm.context") 

208def merge_frozen_result(session, statement, frozen_result, load=True): 

209 """Merge a :class:`_engine.FrozenResult` back into a :class:`_orm.Session`, 

210 returning a new :class:`_engine.Result` object with :term:`persistent` 

211 objects. 

212 

213 See the section :ref:`do_orm_execute_re_executing` for an example. 

214 

215 .. seealso:: 

216 

217 :ref:`do_orm_execute_re_executing` 

218 

219 :meth:`_engine.Result.freeze` 

220 

221 :class:`_engine.FrozenResult` 

222 

223 """ 

224 querycontext = util.preloaded.orm_context 

225 

226 if load: 

227 # flush current contents if we expect to load data 

228 session._autoflush() 

229 

230 ctx = querycontext.ORMSelectCompileState._create_entities_collection( 

231 statement, legacy=False 

232 ) 

233 

234 autoflush = session.autoflush 

235 try: 

236 session.autoflush = False 

237 mapped_entities = [ 

238 i 

239 for i, e in enumerate(ctx._entities) 

240 if isinstance(e, querycontext._MapperEntity) 

241 ] 

242 keys = [ent._label_name for ent in ctx._entities] 

243 

244 keyed_tuple = result_tuple( 

245 keys, [ent._extra_entities for ent in ctx._entities] 

246 ) 

247 

248 result = [] 

249 for newrow in frozen_result.rewrite_rows(): 

250 for i in mapped_entities: 

251 if newrow[i] is not None: 

252 newrow[i] = session._merge( 

253 attributes.instance_state(newrow[i]), 

254 attributes.instance_dict(newrow[i]), 

255 load=load, 

256 _recursive={}, 

257 _resolve_conflict_map={}, 

258 ) 

259 

260 result.append(keyed_tuple(newrow)) 

261 

262 return frozen_result.with_new_rows(result) 

263 finally: 

264 session.autoflush = autoflush 

265 

266 

267@util.deprecated_20( 

268 ":func:`_orm.merge_result`", 

269 alternative="The function as well as the method on :class:`_orm.Query` " 

270 "is superseded by the :func:`_orm.merge_frozen_result` function.", 

271 becomes_legacy=True, 

272) 

273@util.preload_module("sqlalchemy.orm.context") 

274def merge_result(query, iterator, load=True): 

275 """Merge a result into the given :class:`.Query` object's Session. 

276 

277 See :meth:`_orm.Query.merge_result` for top-level documentation on this 

278 function. 

279 

280 """ 

281 

282 querycontext = util.preloaded.orm_context 

283 

284 session = query.session 

285 if load: 

286 # flush current contents if we expect to load data 

287 session._autoflush() 

288 

289 # TODO: need test coverage and documentation for the FrozenResult 

290 # use case. 

291 if isinstance(iterator, FrozenResult): 

292 frozen_result = iterator 

293 iterator = iter(frozen_result.data) 

294 else: 

295 frozen_result = None 

296 

297 ctx = querycontext.ORMSelectCompileState._create_entities_collection( 

298 query, legacy=True 

299 ) 

300 

301 autoflush = session.autoflush 

302 try: 

303 session.autoflush = False 

304 single_entity = not frozen_result and len(ctx._entities) == 1 

305 

306 if single_entity: 

307 if isinstance(ctx._entities[0], querycontext._MapperEntity): 

308 result = [ 

309 session._merge( 

310 attributes.instance_state(instance), 

311 attributes.instance_dict(instance), 

312 load=load, 

313 _recursive={}, 

314 _resolve_conflict_map={}, 

315 ) 

316 for instance in iterator 

317 ] 

318 else: 

319 result = list(iterator) 

320 else: 

321 mapped_entities = [ 

322 i 

323 for i, e in enumerate(ctx._entities) 

324 if isinstance(e, querycontext._MapperEntity) 

325 ] 

326 result = [] 

327 keys = [ent._label_name for ent in ctx._entities] 

328 

329 keyed_tuple = result_tuple( 

330 keys, [ent._extra_entities for ent in ctx._entities] 

331 ) 

332 

333 for row in iterator: 

334 newrow = list(row) 

335 for i in mapped_entities: 

336 if newrow[i] is not None: 

337 newrow[i] = session._merge( 

338 attributes.instance_state(newrow[i]), 

339 attributes.instance_dict(newrow[i]), 

340 load=load, 

341 _recursive={}, 

342 _resolve_conflict_map={}, 

343 ) 

344 result.append(keyed_tuple(newrow)) 

345 

346 if frozen_result: 

347 return frozen_result.with_data(result) 

348 else: 

349 return iter(result) 

350 finally: 

351 session.autoflush = autoflush 

352 

353 

354def get_from_identity(session, mapper, key, passive): 

355 """Look up the given key in the given session's identity map, 

356 check the object for expired state if found. 

357 

358 """ 

359 instance = session.identity_map.get(key) 

360 if instance is not None: 

361 

362 state = attributes.instance_state(instance) 

363 

364 if mapper.inherits and not state.mapper.isa(mapper): 

365 return attributes.PASSIVE_CLASS_MISMATCH 

366 

367 # expired - ensure it still exists 

368 if state.expired: 

369 if not passive & attributes.SQL_OK: 

370 # TODO: no coverage here 

371 return attributes.PASSIVE_NO_RESULT 

372 elif not passive & attributes.RELATED_OBJECT_OK: 

373 # this mode is used within a flush and the instance's 

374 # expired state will be checked soon enough, if necessary. 

375 # also used by immediateloader for a mutually-dependent 

376 # o2m->m2m load, :ticket:`6301` 

377 return instance 

378 try: 

379 state._load_expired(state, passive) 

380 except orm_exc.ObjectDeletedError: 

381 session._remove_newly_deleted([state]) 

382 return None 

383 return instance 

384 else: 

385 return None 

386 

387 

388def load_on_ident( 

389 session, 

390 statement, 

391 key, 

392 load_options=None, 

393 refresh_state=None, 

394 with_for_update=None, 

395 only_load_props=None, 

396 no_autoflush=False, 

397 bind_arguments=util.EMPTY_DICT, 

398 execution_options=util.EMPTY_DICT, 

399): 

400 """Load the given identity key from the database.""" 

401 if key is not None: 

402 ident = key[1] 

403 identity_token = key[2] 

404 else: 

405 ident = identity_token = None 

406 

407 return load_on_pk_identity( 

408 session, 

409 statement, 

410 ident, 

411 load_options=load_options, 

412 refresh_state=refresh_state, 

413 with_for_update=with_for_update, 

414 only_load_props=only_load_props, 

415 identity_token=identity_token, 

416 no_autoflush=no_autoflush, 

417 bind_arguments=bind_arguments, 

418 execution_options=execution_options, 

419 ) 

420 

421 

422def load_on_pk_identity( 

423 session, 

424 statement, 

425 primary_key_identity, 

426 load_options=None, 

427 refresh_state=None, 

428 with_for_update=None, 

429 only_load_props=None, 

430 identity_token=None, 

431 no_autoflush=False, 

432 bind_arguments=util.EMPTY_DICT, 

433 execution_options=util.EMPTY_DICT, 

434): 

435 

436 """Load the given primary key identity from the database.""" 

437 

438 query = statement 

439 q = query._clone() 

440 

441 assert not q._is_lambda_element 

442 

443 # TODO: fix these imports .... 

444 from .context import QueryContext, ORMCompileState 

445 

446 if load_options is None: 

447 load_options = QueryContext.default_load_options 

448 

449 if ( 

450 statement._compile_options 

451 is SelectState.default_select_compile_options 

452 ): 

453 compile_options = ORMCompileState.default_compile_options 

454 else: 

455 compile_options = statement._compile_options 

456 

457 if primary_key_identity is not None: 

458 mapper = query._propagate_attrs["plugin_subject"] 

459 

460 (_get_clause, _get_params) = mapper._get_clause 

461 

462 # None present in ident - turn those comparisons 

463 # into "IS NULL" 

464 if None in primary_key_identity: 

465 nones = set( 

466 [ 

467 _get_params[col].key 

468 for col, value in zip( 

469 mapper.primary_key, primary_key_identity 

470 ) 

471 if value is None 

472 ] 

473 ) 

474 

475 _get_clause = sql_util.adapt_criterion_to_null(_get_clause, nones) 

476 

477 if len(nones) == len(primary_key_identity): 

478 util.warn( 

479 "fully NULL primary key identity cannot load any " 

480 "object. This condition may raise an error in a future " 

481 "release." 

482 ) 

483 

484 q._where_criteria = ( 

485 sql_util._deep_annotate(_get_clause, {"_orm_adapt": True}), 

486 ) 

487 

488 params = dict( 

489 [ 

490 (_get_params[primary_key].key, id_val) 

491 for id_val, primary_key in zip( 

492 primary_key_identity, mapper.primary_key 

493 ) 

494 ] 

495 ) 

496 else: 

497 params = None 

498 

499 if with_for_update is not None: 

500 version_check = True 

501 q._for_update_arg = with_for_update 

502 elif query._for_update_arg is not None: 

503 version_check = True 

504 q._for_update_arg = query._for_update_arg 

505 else: 

506 version_check = False 

507 

508 if refresh_state and refresh_state.load_options: 

509 compile_options += {"_current_path": refresh_state.load_path.parent} 

510 q = q.options(*refresh_state.load_options) 

511 

512 new_compile_options, load_options = _set_get_options( 

513 compile_options, 

514 load_options, 

515 version_check=version_check, 

516 only_load_props=only_load_props, 

517 refresh_state=refresh_state, 

518 identity_token=identity_token, 

519 ) 

520 q._compile_options = new_compile_options 

521 q._order_by = None 

522 

523 if no_autoflush: 

524 load_options += {"_autoflush": False} 

525 

526 execution_options = util.EMPTY_DICT.merge_with( 

527 execution_options, {"_sa_orm_load_options": load_options} 

528 ) 

529 result = ( 

530 session.execute( 

531 q, 

532 params=params, 

533 execution_options=execution_options, 

534 bind_arguments=bind_arguments, 

535 ) 

536 .unique() 

537 .scalars() 

538 ) 

539 

540 try: 

541 return result.one() 

542 except orm_exc.NoResultFound: 

543 return None 

544 

545 

546def _set_get_options( 

547 compile_opt, 

548 load_opt, 

549 populate_existing=None, 

550 version_check=None, 

551 only_load_props=None, 

552 refresh_state=None, 

553 identity_token=None, 

554): 

555 

556 compile_options = {} 

557 load_options = {} 

558 if version_check: 

559 load_options["_version_check"] = version_check 

560 if populate_existing: 

561 load_options["_populate_existing"] = populate_existing 

562 if refresh_state: 

563 load_options["_refresh_state"] = refresh_state 

564 compile_options["_for_refresh_state"] = True 

565 if only_load_props: 

566 compile_options["_only_load_props"] = frozenset(only_load_props) 

567 if identity_token: 

568 load_options["_refresh_identity_token"] = identity_token 

569 

570 if load_options: 

571 load_opt += load_options 

572 if compile_options: 

573 compile_opt += compile_options 

574 

575 return compile_opt, load_opt 

576 

577 

578def _setup_entity_query( 

579 compile_state, 

580 mapper, 

581 query_entity, 

582 path, 

583 adapter, 

584 column_collection, 

585 with_polymorphic=None, 

586 only_load_props=None, 

587 polymorphic_discriminator=None, 

588 **kw 

589): 

590 

591 if with_polymorphic: 

592 poly_properties = mapper._iterate_polymorphic_properties( 

593 with_polymorphic 

594 ) 

595 else: 

596 poly_properties = mapper._polymorphic_properties 

597 

598 quick_populators = {} 

599 

600 path.set(compile_state.attributes, "memoized_setups", quick_populators) 

601 

602 # for the lead entities in the path, e.g. not eager loads, and 

603 # assuming a user-passed aliased class, e.g. not a from_self() or any 

604 # implicit aliasing, don't add columns to the SELECT that aren't 

605 # in the thing that's aliased. 

606 check_for_adapt = adapter and len(path) == 1 and path[-1].is_aliased_class 

607 

608 for value in poly_properties: 

609 if only_load_props and value.key not in only_load_props: 

610 continue 

611 

612 value.setup( 

613 compile_state, 

614 query_entity, 

615 path, 

616 adapter, 

617 only_load_props=only_load_props, 

618 column_collection=column_collection, 

619 memoized_populators=quick_populators, 

620 check_for_adapt=check_for_adapt, 

621 **kw 

622 ) 

623 

624 if ( 

625 polymorphic_discriminator is not None 

626 and polymorphic_discriminator is not mapper.polymorphic_on 

627 ): 

628 

629 if adapter: 

630 pd = adapter.columns[polymorphic_discriminator] 

631 else: 

632 pd = polymorphic_discriminator 

633 column_collection.append(pd) 

634 

635 

636def _warn_for_runid_changed(state): 

637 util.warn( 

638 "Loading context for %s has changed within a load/refresh " 

639 "handler, suggesting a row refresh operation took place. If this " 

640 "event handler is expected to be " 

641 "emitting row refresh operations within an existing load or refresh " 

642 "operation, set restore_load_context=True when establishing the " 

643 "listener to ensure the context remains unchanged when the event " 

644 "handler completes." % (state_str(state),) 

645 ) 

646 

647 

648def _instance_processor( 

649 query_entity, 

650 mapper, 

651 context, 

652 result, 

653 path, 

654 adapter, 

655 only_load_props=None, 

656 refresh_state=None, 

657 polymorphic_discriminator=None, 

658 _polymorphic_from=None, 

659): 

660 """Produce a mapper level row processor callable 

661 which processes rows into mapped instances.""" 

662 

663 # note that this method, most of which exists in a closure 

664 # called _instance(), resists being broken out, as 

665 # attempts to do so tend to add significant function 

666 # call overhead. _instance() is the most 

667 # performance-critical section in the whole ORM. 

668 

669 identity_class = mapper._identity_class 

670 compile_state = context.compile_state 

671 

672 # look for "row getter" functions that have been assigned along 

673 # with the compile state that were cached from a previous load. 

674 # these are operator.itemgetter() objects that each will extract a 

675 # particular column from each row. 

676 

677 getter_key = ("getters", mapper) 

678 getters = path.get(compile_state.attributes, getter_key, None) 

679 

680 if getters is None: 

681 # no getters, so go through a list of attributes we are loading for, 

682 # and the ones that are column based will have already put information 

683 # for us in another collection "memoized_setups", which represents the 

684 # output of the LoaderStrategy.setup_query() method. We can just as 

685 # easily call LoaderStrategy.create_row_processor for each, but by 

686 # getting it all at once from setup_query we save another method call 

687 # per attribute. 

688 props = mapper._prop_set 

689 if only_load_props is not None: 

690 props = props.intersection( 

691 mapper._props[k] for k in only_load_props 

692 ) 

693 

694 quick_populators = path.get( 

695 context.attributes, "memoized_setups", _none_set 

696 ) 

697 

698 todo = [] 

699 cached_populators = { 

700 "new": [], 

701 "quick": [], 

702 "deferred": [], 

703 "expire": [], 

704 "delayed": [], 

705 "existing": [], 

706 "eager": [], 

707 } 

708 

709 if refresh_state is None: 

710 # we can also get the "primary key" tuple getter function 

711 pk_cols = mapper.primary_key 

712 

713 if adapter: 

714 pk_cols = [adapter.columns[c] for c in pk_cols] 

715 primary_key_getter = result._tuple_getter(pk_cols) 

716 else: 

717 primary_key_getter = None 

718 

719 getters = { 

720 "cached_populators": cached_populators, 

721 "todo": todo, 

722 "primary_key_getter": primary_key_getter, 

723 } 

724 for prop in props: 

725 if prop in quick_populators: 

726 # this is an inlined path just for column-based attributes. 

727 col = quick_populators[prop] 

728 if col is _DEFER_FOR_STATE: 

729 cached_populators["new"].append( 

730 (prop.key, prop._deferred_column_loader) 

731 ) 

732 elif col is _SET_DEFERRED_EXPIRED: 

733 # note that in this path, we are no longer 

734 # searching in the result to see if the column might 

735 # be present in some unexpected way. 

736 cached_populators["expire"].append((prop.key, False)) 

737 elif col is _RAISE_FOR_STATE: 

738 cached_populators["new"].append( 

739 (prop.key, prop._raise_column_loader) 

740 ) 

741 else: 

742 getter = None 

743 if adapter: 

744 # this logic had been removed for all 1.4 releases 

745 # up until 1.4.18; the adapter here is particularly 

746 # the compound eager adapter which isn't accommodated 

747 # in the quick_populators right now. The "fallback" 

748 # logic below instead took over in many more cases 

749 # until issue #6596 was identified. 

750 

751 # note there is still an issue where this codepath 

752 # produces no "getter" for cases where a joined-inh 

753 # mapping includes a labeled column property, meaning 

754 # KeyError is caught internally and we fall back to 

755 # _getter(col), which works anyway. The adapter 

756 # here for joined inh without any aliasing might not 

757 # be useful. Tests which see this include 

758 # test.orm.inheritance.test_basic -> 

759 # EagerTargetingTest.test_adapt_stringency 

760 # OptimizedLoadTest.test_column_expression_joined 

761 # PolymorphicOnNotLocalTest.test_polymorphic_on_column_prop # noqa: E501 

762 # 

763 

764 adapted_col = adapter.columns[col] 

765 if adapted_col is not None: 

766 getter = result._getter(adapted_col, False) 

767 if not getter: 

768 getter = result._getter(col, False) 

769 if getter: 

770 cached_populators["quick"].append((prop.key, getter)) 

771 else: 

772 # fall back to the ColumnProperty itself, which 

773 # will iterate through all of its columns 

774 # to see if one fits 

775 prop.create_row_processor( 

776 context, 

777 query_entity, 

778 path, 

779 mapper, 

780 result, 

781 adapter, 

782 cached_populators, 

783 ) 

784 else: 

785 # loader strategies like subqueryload, selectinload, 

786 # joinedload, basically relationships, these need to interact 

787 # with the context each time to work correctly. 

788 todo.append(prop) 

789 

790 path.set(compile_state.attributes, getter_key, getters) 

791 

792 cached_populators = getters["cached_populators"] 

793 

794 populators = {key: list(value) for key, value in cached_populators.items()} 

795 for prop in getters["todo"]: 

796 prop.create_row_processor( 

797 context, query_entity, path, mapper, result, adapter, populators 

798 ) 

799 

800 propagated_loader_options = context.propagated_loader_options 

801 load_path = ( 

802 context.compile_state.current_path + path 

803 if context.compile_state.current_path.path 

804 else path 

805 ) 

806 

807 session_identity_map = context.session.identity_map 

808 

809 populate_existing = context.populate_existing or mapper.always_refresh 

810 load_evt = bool(mapper.class_manager.dispatch.load) 

811 refresh_evt = bool(mapper.class_manager.dispatch.refresh) 

812 persistent_evt = bool(context.session.dispatch.loaded_as_persistent) 

813 if persistent_evt: 

814 loaded_as_persistent = context.session.dispatch.loaded_as_persistent 

815 instance_state = attributes.instance_state 

816 instance_dict = attributes.instance_dict 

817 session_id = context.session.hash_key 

818 runid = context.runid 

819 identity_token = context.identity_token 

820 

821 version_check = context.version_check 

822 if version_check: 

823 version_id_col = mapper.version_id_col 

824 if version_id_col is not None: 

825 if adapter: 

826 version_id_col = adapter.columns[version_id_col] 

827 version_id_getter = result._getter(version_id_col) 

828 else: 

829 version_id_getter = None 

830 

831 if not refresh_state and _polymorphic_from is not None: 

832 key = ("loader", path.path) 

833 if key in context.attributes and context.attributes[key].strategy == ( 

834 ("selectinload_polymorphic", True), 

835 ): 

836 selectin_load_via = mapper._should_selectin_load( 

837 context.attributes[key].local_opts["entities"], 

838 _polymorphic_from, 

839 ) 

840 else: 

841 selectin_load_via = mapper._should_selectin_load( 

842 None, _polymorphic_from 

843 ) 

844 

845 if selectin_load_via and selectin_load_via is not _polymorphic_from: 

846 # only_load_props goes w/ refresh_state only, and in a refresh 

847 # we are a single row query for the exact entity; polymorphic 

848 # loading does not apply 

849 assert only_load_props is None 

850 

851 callable_ = _load_subclass_via_in(context, path, selectin_load_via) 

852 

853 PostLoad.callable_for_path( 

854 context, 

855 load_path, 

856 selectin_load_via.mapper, 

857 selectin_load_via, 

858 callable_, 

859 selectin_load_via, 

860 ) 

861 

862 post_load = PostLoad.for_context(context, load_path, only_load_props) 

863 

864 if refresh_state: 

865 refresh_identity_key = refresh_state.key 

866 if refresh_identity_key is None: 

867 # super-rare condition; a refresh is being called 

868 # on a non-instance-key instance; this is meant to only 

869 # occur within a flush() 

870 refresh_identity_key = mapper._identity_key_from_state( 

871 refresh_state 

872 ) 

873 else: 

874 refresh_identity_key = None 

875 

876 primary_key_getter = getters["primary_key_getter"] 

877 

878 if mapper.allow_partial_pks: 

879 is_not_primary_key = _none_set.issuperset 

880 else: 

881 is_not_primary_key = _none_set.intersection 

882 

883 def _instance(row): 

884 

885 # determine the state that we'll be populating 

886 if refresh_identity_key: 

887 # fixed state that we're refreshing 

888 state = refresh_state 

889 instance = state.obj() 

890 dict_ = instance_dict(instance) 

891 isnew = state.runid != runid 

892 currentload = True 

893 loaded_instance = False 

894 else: 

895 # look at the row, see if that identity is in the 

896 # session, or we have to create a new one 

897 identitykey = ( 

898 identity_class, 

899 primary_key_getter(row), 

900 identity_token, 

901 ) 

902 

903 instance = session_identity_map.get(identitykey) 

904 

905 if instance is not None: 

906 # existing instance 

907 state = instance_state(instance) 

908 dict_ = instance_dict(instance) 

909 

910 isnew = state.runid != runid 

911 currentload = not isnew 

912 loaded_instance = False 

913 

914 if version_check and version_id_getter and not currentload: 

915 _validate_version_id( 

916 mapper, state, dict_, row, version_id_getter 

917 ) 

918 

919 else: 

920 # create a new instance 

921 

922 # check for non-NULL values in the primary key columns, 

923 # else no entity is returned for the row 

924 if is_not_primary_key(identitykey[1]): 

925 return None 

926 

927 isnew = True 

928 currentload = True 

929 loaded_instance = True 

930 

931 instance = mapper.class_manager.new_instance() 

932 

933 dict_ = instance_dict(instance) 

934 state = instance_state(instance) 

935 state.key = identitykey 

936 state.identity_token = identity_token 

937 

938 # attach instance to session. 

939 state.session_id = session_id 

940 session_identity_map._add_unpresent(state, identitykey) 

941 

942 effective_populate_existing = populate_existing 

943 if refresh_state is state: 

944 effective_populate_existing = True 

945 

946 # populate. this looks at whether this state is new 

947 # for this load or was existing, and whether or not this 

948 # row is the first row with this identity. 

949 if currentload or effective_populate_existing: 

950 # full population routines. Objects here are either 

951 # just created, or we are doing a populate_existing 

952 

953 # be conservative about setting load_path when populate_existing 

954 # is in effect; want to maintain options from the original 

955 # load. see test_expire->test_refresh_maintains_deferred_options 

956 if isnew and ( 

957 propagated_loader_options or not effective_populate_existing 

958 ): 

959 state.load_options = propagated_loader_options 

960 state.load_path = load_path 

961 

962 _populate_full( 

963 context, 

964 row, 

965 state, 

966 dict_, 

967 isnew, 

968 load_path, 

969 loaded_instance, 

970 effective_populate_existing, 

971 populators, 

972 ) 

973 

974 if isnew: 

975 # state.runid should be equal to context.runid / runid 

976 # here, however for event checks we are being more conservative 

977 # and checking against existing run id 

978 # assert state.runid == runid 

979 

980 existing_runid = state.runid 

981 

982 if loaded_instance: 

983 if load_evt: 

984 state.manager.dispatch.load(state, context) 

985 if state.runid != existing_runid: 

986 _warn_for_runid_changed(state) 

987 if persistent_evt: 

988 loaded_as_persistent(context.session, state) 

989 if state.runid != existing_runid: 

990 _warn_for_runid_changed(state) 

991 elif refresh_evt: 

992 state.manager.dispatch.refresh( 

993 state, context, only_load_props 

994 ) 

995 if state.runid != runid: 

996 _warn_for_runid_changed(state) 

997 

998 if effective_populate_existing or state.modified: 

999 if refresh_state and only_load_props: 

1000 state._commit(dict_, only_load_props) 

1001 else: 

1002 state._commit_all(dict_, session_identity_map) 

1003 

1004 if post_load: 

1005 post_load.add_state(state, True) 

1006 

1007 else: 

1008 # partial population routines, for objects that were already 

1009 # in the Session, but a row matches them; apply eager loaders 

1010 # on existing objects, etc. 

1011 unloaded = state.unloaded 

1012 isnew = state not in context.partials 

1013 

1014 if not isnew or unloaded or populators["eager"]: 

1015 # state is having a partial set of its attributes 

1016 # refreshed. Populate those attributes, 

1017 # and add to the "context.partials" collection. 

1018 

1019 to_load = _populate_partial( 

1020 context, 

1021 row, 

1022 state, 

1023 dict_, 

1024 isnew, 

1025 load_path, 

1026 unloaded, 

1027 populators, 

1028 ) 

1029 

1030 if isnew: 

1031 if refresh_evt: 

1032 existing_runid = state.runid 

1033 state.manager.dispatch.refresh(state, context, to_load) 

1034 if state.runid != existing_runid: 

1035 _warn_for_runid_changed(state) 

1036 

1037 state._commit(dict_, to_load) 

1038 

1039 if post_load and context.invoke_all_eagers: 

1040 post_load.add_state(state, False) 

1041 

1042 return instance 

1043 

1044 if mapper.polymorphic_map and not _polymorphic_from and not refresh_state: 

1045 # if we are doing polymorphic, dispatch to a different _instance() 

1046 # method specific to the subclass mapper 

1047 def ensure_no_pk(row): 

1048 identitykey = ( 

1049 identity_class, 

1050 primary_key_getter(row), 

1051 identity_token, 

1052 ) 

1053 if not is_not_primary_key(identitykey[1]): 

1054 return identitykey 

1055 else: 

1056 return None 

1057 

1058 _instance = _decorate_polymorphic_switch( 

1059 _instance, 

1060 context, 

1061 query_entity, 

1062 mapper, 

1063 result, 

1064 path, 

1065 polymorphic_discriminator, 

1066 adapter, 

1067 ensure_no_pk, 

1068 ) 

1069 

1070 return _instance 

1071 

1072 

1073def _load_subclass_via_in(context, path, entity): 

1074 mapper = entity.mapper 

1075 

1076 zero_idx = len(mapper.base_mapper.primary_key) == 1 

1077 

1078 if entity.is_aliased_class: 

1079 q, enable_opt, disable_opt = mapper._subclass_load_via_in(entity) 

1080 else: 

1081 q, enable_opt, disable_opt = mapper._subclass_load_via_in_mapper 

1082 

1083 def do_load(context, path, states, load_only, effective_entity): 

1084 orig_query = context.query 

1085 

1086 options = (enable_opt,) + orig_query._with_options + (disable_opt,) 

1087 q2 = q.options(*options) 

1088 

1089 q2._compile_options = context.compile_state.default_compile_options 

1090 q2._compile_options += {"_current_path": path.parent} 

1091 

1092 if context.populate_existing: 

1093 q2 = q2.execution_options(populate_existing=True) 

1094 

1095 context.session.execute( 

1096 q2, 

1097 dict( 

1098 primary_keys=[ 

1099 state.key[1][0] if zero_idx else state.key[1] 

1100 for state, load_attrs in states 

1101 ] 

1102 ), 

1103 ).unique().scalars().all() 

1104 

1105 return do_load 

1106 

1107 

1108def _populate_full( 

1109 context, 

1110 row, 

1111 state, 

1112 dict_, 

1113 isnew, 

1114 load_path, 

1115 loaded_instance, 

1116 populate_existing, 

1117 populators, 

1118): 

1119 if isnew: 

1120 # first time we are seeing a row with this identity. 

1121 state.runid = context.runid 

1122 

1123 for key, getter in populators["quick"]: 

1124 dict_[key] = getter(row) 

1125 if populate_existing: 

1126 for key, set_callable in populators["expire"]: 

1127 dict_.pop(key, None) 

1128 if set_callable: 

1129 state.expired_attributes.add(key) 

1130 else: 

1131 for key, set_callable in populators["expire"]: 

1132 if set_callable: 

1133 state.expired_attributes.add(key) 

1134 

1135 for key, populator in populators["new"]: 

1136 populator(state, dict_, row) 

1137 for key, populator in populators["delayed"]: 

1138 populator(state, dict_, row) 

1139 elif load_path != state.load_path: 

1140 # new load path, e.g. object is present in more than one 

1141 # column position in a series of rows 

1142 state.load_path = load_path 

1143 

1144 # if we have data, and the data isn't in the dict, OK, let's put 

1145 # it in. 

1146 for key, getter in populators["quick"]: 

1147 if key not in dict_: 

1148 dict_[key] = getter(row) 

1149 

1150 # otherwise treat like an "already seen" row 

1151 for key, populator in populators["existing"]: 

1152 populator(state, dict_, row) 

1153 # TODO: allow "existing" populator to know this is 

1154 # a new path for the state: 

1155 # populator(state, dict_, row, new_path=True) 

1156 

1157 else: 

1158 # have already seen rows with this identity in this same path. 

1159 for key, populator in populators["existing"]: 

1160 populator(state, dict_, row) 

1161 

1162 # TODO: same path 

1163 # populator(state, dict_, row, new_path=False) 

1164 

1165 

1166def _populate_partial( 

1167 context, row, state, dict_, isnew, load_path, unloaded, populators 

1168): 

1169 

1170 if not isnew: 

1171 to_load = context.partials[state] 

1172 for key, populator in populators["existing"]: 

1173 if key in to_load: 

1174 populator(state, dict_, row) 

1175 else: 

1176 to_load = unloaded 

1177 context.partials[state] = to_load 

1178 

1179 for key, getter in populators["quick"]: 

1180 if key in to_load: 

1181 dict_[key] = getter(row) 

1182 for key, set_callable in populators["expire"]: 

1183 if key in to_load: 

1184 dict_.pop(key, None) 

1185 if set_callable: 

1186 state.expired_attributes.add(key) 

1187 for key, populator in populators["new"]: 

1188 if key in to_load: 

1189 populator(state, dict_, row) 

1190 for key, populator in populators["delayed"]: 

1191 if key in to_load: 

1192 populator(state, dict_, row) 

1193 for key, populator in populators["eager"]: 

1194 if key not in unloaded: 

1195 populator(state, dict_, row) 

1196 

1197 return to_load 

1198 

1199 

1200def _validate_version_id(mapper, state, dict_, row, getter): 

1201 

1202 if mapper._get_state_attr_by_column( 

1203 state, dict_, mapper.version_id_col 

1204 ) != getter(row): 

1205 raise orm_exc.StaleDataError( 

1206 "Instance '%s' has version id '%s' which " 

1207 "does not match database-loaded version id '%s'." 

1208 % ( 

1209 state_str(state), 

1210 mapper._get_state_attr_by_column( 

1211 state, dict_, mapper.version_id_col 

1212 ), 

1213 getter(row), 

1214 ) 

1215 ) 

1216 

1217 

1218def _decorate_polymorphic_switch( 

1219 instance_fn, 

1220 context, 

1221 query_entity, 

1222 mapper, 

1223 result, 

1224 path, 

1225 polymorphic_discriminator, 

1226 adapter, 

1227 ensure_no_pk, 

1228): 

1229 if polymorphic_discriminator is not None: 

1230 polymorphic_on = polymorphic_discriminator 

1231 else: 

1232 polymorphic_on = mapper.polymorphic_on 

1233 if polymorphic_on is None: 

1234 return instance_fn 

1235 

1236 if adapter: 

1237 polymorphic_on = adapter.columns[polymorphic_on] 

1238 

1239 def configure_subclass_mapper(discriminator): 

1240 try: 

1241 sub_mapper = mapper.polymorphic_map[discriminator] 

1242 except KeyError: 

1243 raise AssertionError( 

1244 "No such polymorphic_identity %r is defined" % discriminator 

1245 ) 

1246 else: 

1247 if sub_mapper is mapper: 

1248 return None 

1249 elif not sub_mapper.isa(mapper): 

1250 return False 

1251 

1252 return _instance_processor( 

1253 query_entity, 

1254 sub_mapper, 

1255 context, 

1256 result, 

1257 path, 

1258 adapter, 

1259 _polymorphic_from=mapper, 

1260 ) 

1261 

1262 polymorphic_instances = util.PopulateDict(configure_subclass_mapper) 

1263 

1264 getter = result._getter(polymorphic_on) 

1265 

1266 def polymorphic_instance(row): 

1267 discriminator = getter(row) 

1268 if discriminator is not None: 

1269 _instance = polymorphic_instances[discriminator] 

1270 if _instance: 

1271 return _instance(row) 

1272 elif _instance is False: 

1273 identitykey = ensure_no_pk(row) 

1274 

1275 if identitykey: 

1276 raise sa_exc.InvalidRequestError( 

1277 "Row with identity key %s can't be loaded into an " 

1278 "object; the polymorphic discriminator column '%s' " 

1279 "refers to %s, which is not a sub-mapper of " 

1280 "the requested %s" 

1281 % ( 

1282 identitykey, 

1283 polymorphic_on, 

1284 mapper.polymorphic_map[discriminator], 

1285 mapper, 

1286 ) 

1287 ) 

1288 else: 

1289 return None 

1290 else: 

1291 return instance_fn(row) 

1292 else: 

1293 identitykey = ensure_no_pk(row) 

1294 

1295 if identitykey: 

1296 raise sa_exc.InvalidRequestError( 

1297 "Row with identity key %s can't be loaded into an " 

1298 "object; the polymorphic discriminator column '%s' is " 

1299 "NULL" % (identitykey, polymorphic_on) 

1300 ) 

1301 else: 

1302 return None 

1303 

1304 return polymorphic_instance 

1305 

1306 

1307class PostLoad(object): 

1308 """Track loaders and states for "post load" operations.""" 

1309 

1310 __slots__ = "loaders", "states", "load_keys" 

1311 

1312 def __init__(self): 

1313 self.loaders = {} 

1314 self.states = util.OrderedDict() 

1315 self.load_keys = None 

1316 

1317 def add_state(self, state, overwrite): 

1318 # the states for a polymorphic load here are all shared 

1319 # within a single PostLoad object among multiple subtypes. 

1320 # Filtering of callables on a per-subclass basis needs to be done at 

1321 # the invocation level 

1322 self.states[state] = overwrite 

1323 

1324 def invoke(self, context, path): 

1325 if not self.states: 

1326 return 

1327 path = path_registry.PathRegistry.coerce(path) 

1328 for token, limit_to_mapper, loader, arg, kw in self.loaders.values(): 

1329 states = [ 

1330 (state, overwrite) 

1331 for state, overwrite in self.states.items() 

1332 if state.manager.mapper.isa(limit_to_mapper) 

1333 ] 

1334 if states: 

1335 loader(context, path, states, self.load_keys, *arg, **kw) 

1336 self.states.clear() 

1337 

1338 @classmethod 

1339 def for_context(cls, context, path, only_load_props): 

1340 pl = context.post_load_paths.get(path.path) 

1341 if pl is not None and only_load_props: 

1342 pl.load_keys = only_load_props 

1343 return pl 

1344 

1345 @classmethod 

1346 def path_exists(self, context, path, key): 

1347 return ( 

1348 path.path in context.post_load_paths 

1349 and key in context.post_load_paths[path.path].loaders 

1350 ) 

1351 

1352 @classmethod 

1353 def callable_for_path( 

1354 cls, context, path, limit_to_mapper, token, loader_callable, *arg, **kw 

1355 ): 

1356 if path.path in context.post_load_paths: 

1357 pl = context.post_load_paths[path.path] 

1358 else: 

1359 pl = context.post_load_paths[path.path] = PostLoad() 

1360 pl.loaders[token] = (token, limit_to_mapper, loader_callable, arg, kw) 

1361 

1362 

1363def load_scalar_attributes(mapper, state, attribute_names, passive): 

1364 """initiate a column-based attribute refresh operation.""" 

1365 

1366 # assert mapper is _state_mapper(state) 

1367 session = state.session 

1368 if not session: 

1369 raise orm_exc.DetachedInstanceError( 

1370 "Instance %s is not bound to a Session; " 

1371 "attribute refresh operation cannot proceed" % (state_str(state)) 

1372 ) 

1373 

1374 has_key = bool(state.key) 

1375 

1376 result = False 

1377 

1378 no_autoflush = ( 

1379 bool(passive & attributes.NO_AUTOFLUSH) or state.session.autocommit 

1380 ) 

1381 

1382 # in the case of inheritance, particularly concrete and abstract 

1383 # concrete inheritance, the class manager might have some keys 

1384 # of attributes on the superclass that we didn't actually map. 

1385 # These could be mapped as "concrete, don't load" or could be completely 

1386 # excluded from the mapping and we know nothing about them. Filter them 

1387 # here to prevent them from coming through. 

1388 if attribute_names: 

1389 attribute_names = attribute_names.intersection(mapper.attrs.keys()) 

1390 

1391 if mapper.inherits and not mapper.concrete: 

1392 # because we are using Core to produce a select() that we 

1393 # pass to the Query, we aren't calling setup() for mapped 

1394 # attributes; in 1.0 this means deferred attrs won't get loaded 

1395 # by default 

1396 statement = mapper._optimized_get_statement(state, attribute_names) 

1397 if statement is not None: 

1398 # this was previously aliased(mapper, statement), however, 

1399 # statement is a select() and Query's coercion now raises for this 

1400 # since you can't "select" from a "SELECT" statement. only 

1401 # from_statement() allows this. 

1402 # note: using from_statement() here means there is an adaption 

1403 # with adapt_on_names set up. the other option is to make the 

1404 # aliased() against a subquery which affects the SQL. 

1405 

1406 from .query import FromStatement 

1407 

1408 stmt = FromStatement(mapper, statement).options( 

1409 strategy_options.Load(mapper).undefer("*") 

1410 ) 

1411 

1412 result = load_on_ident( 

1413 session, 

1414 stmt, 

1415 None, 

1416 only_load_props=attribute_names, 

1417 refresh_state=state, 

1418 no_autoflush=no_autoflush, 

1419 ) 

1420 

1421 if result is False: 

1422 if has_key: 

1423 identity_key = state.key 

1424 else: 

1425 # this codepath is rare - only valid when inside a flush, and the 

1426 # object is becoming persistent but hasn't yet been assigned 

1427 # an identity_key. 

1428 # check here to ensure we have the attrs we need. 

1429 pk_attrs = [ 

1430 mapper._columntoproperty[col].key for col in mapper.primary_key 

1431 ] 

1432 if state.expired_attributes.intersection(pk_attrs): 

1433 raise sa_exc.InvalidRequestError( 

1434 "Instance %s cannot be refreshed - it's not " 

1435 " persistent and does not " 

1436 "contain a full primary key." % state_str(state) 

1437 ) 

1438 identity_key = mapper._identity_key_from_state(state) 

1439 

1440 if ( 

1441 _none_set.issubset(identity_key) and not mapper.allow_partial_pks 

1442 ) or _none_set.issuperset(identity_key): 

1443 util.warn_limited( 

1444 "Instance %s to be refreshed doesn't " 

1445 "contain a full primary key - can't be refreshed " 

1446 "(and shouldn't be expired, either).", 

1447 state_str(state), 

1448 ) 

1449 return 

1450 

1451 result = load_on_ident( 

1452 session, 

1453 future.select(mapper).set_label_style( 

1454 LABEL_STYLE_TABLENAME_PLUS_COL 

1455 ), 

1456 identity_key, 

1457 refresh_state=state, 

1458 only_load_props=attribute_names, 

1459 no_autoflush=no_autoflush, 

1460 ) 

1461 

1462 # if instance is pending, a refresh operation 

1463 # may not complete (even if PK attributes are assigned) 

1464 if has_key and result is None: 

1465 raise orm_exc.ObjectDeletedError(state)