Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/loading.py: 11%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

662 statements  

1# orm/loading.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: ignore-errors 

8 

9 

10"""private module containing functions used to convert database 

11rows into object instances and associated state. 

12 

13the functions here are called primarily by Query, Mapper, 

14as well as some of the attribute loading strategies. 

15 

16""" 

17 

18from __future__ import annotations 

19 

20from typing import Any 

21from typing import Dict 

22from typing import Iterable 

23from typing import List 

24from typing import Mapping 

25from typing import Optional 

26from typing import Sequence 

27from typing import Tuple 

28from typing import TYPE_CHECKING 

29from typing import TypeVar 

30from typing import Union 

31 

32from . import attributes 

33from . import exc as orm_exc 

34from . import path_registry 

35from .base import _DEFER_FOR_STATE 

36from .base import _RAISE_FOR_STATE 

37from .base import _SET_DEFERRED_EXPIRED 

38from .base import PassiveFlag 

39from .context import _ORMCompileState 

40from .context import FromStatement 

41from .context import QueryContext 

42from .strategies import _SelectInLoader 

43from .util import _none_set 

44from .util import state_str 

45from .. import exc as sa_exc 

46from .. import util 

47from ..engine import result_tuple 

48from ..engine.result import ChunkedIteratorResult 

49from ..engine.result import FrozenResult 

50from ..engine.result import SimpleResultMetaData 

51from ..sql import select 

52from ..sql import util as sql_util 

53from ..sql.selectable import ForUpdateArg 

54from ..sql.selectable import LABEL_STYLE_TABLENAME_PLUS_COL 

55from ..sql.selectable import SelectState 

56from ..util import EMPTY_DICT 

57from ..util.typing import TupleAny 

58from ..util.typing import Unpack 

59 

60if TYPE_CHECKING: 

61 from ._typing import _IdentityKeyType 

62 from .base import LoaderCallableStatus 

63 from .interfaces import ORMOption 

64 from .mapper import Mapper 

65 from .query import Query 

66 from .session import Session 

67 from .state import InstanceState 

68 from ..engine.cursor import CursorResult 

69 from ..engine.interfaces import _ExecuteOptions 

70 from ..engine.result import Result 

71 from ..sql import Select 

72 

73_T = TypeVar("_T", bound=Any) 

74_O = TypeVar("_O", bound=object) 

75_new_runid = util.counter() 

76 

77 

78_PopulatorDict = Dict[str, List[Tuple[str, Any]]] 

79 

80 

81def instances( 

82 cursor: CursorResult[Unpack[TupleAny]], context: QueryContext 

83) -> Result[Unpack[TupleAny]]: 

84 """Return a :class:`.Result` given an ORM query context. 

85 

86 :param cursor: a :class:`.CursorResult`, generated by a statement 

87 which came from :class:`.ORMCompileState` 

88 

89 :param context: a :class:`.QueryContext` object 

90 

91 :return: a :class:`.Result` object representing ORM results 

92 

93 .. versionchanged:: 1.4 The instances() function now uses 

94 :class:`.Result` objects and has an all new interface. 

95 

96 """ 

97 

98 context.runid = _new_runid() 

99 

100 if context.top_level_context: 

101 is_top_level = False 

102 context.post_load_paths = context.top_level_context.post_load_paths 

103 else: 

104 is_top_level = True 

105 context.post_load_paths = {} 

106 

107 compile_state = context.compile_state 

108 filtered = compile_state._has_mapper_entities 

109 single_entity = ( 

110 not context.load_options._only_return_tuples 

111 and len(compile_state._entities) == 1 

112 and compile_state._entities[0].supports_single_entity 

113 ) 

114 

115 try: 

116 (process, labels, extra) = list( 

117 zip( 

118 *[ 

119 query_entity.row_processor(context, cursor) 

120 for query_entity in context.compile_state._entities 

121 ] 

122 ) 

123 ) 

124 

125 if context.yield_per and ( 

126 context.loaders_require_buffering 

127 or context.loaders_require_uniquing 

128 ): 

129 raise sa_exc.InvalidRequestError( 

130 "Can't use yield_per with eager loaders that require uniquing " 

131 "or row buffering, e.g. joinedload() against collections " 

132 "or subqueryload(). Consider the selectinload() strategy " 

133 "for better flexibility in loading objects." 

134 ) 

135 

136 except Exception: 

137 with util.safe_reraise(): 

138 cursor.close() 

139 

140 def _no_unique(entry): 

141 raise sa_exc.InvalidRequestError( 

142 "Can't use the ORM yield_per feature in conjunction with unique()" 

143 ) 

144 

145 def _not_hashable(datatype, *, legacy=False, uncertain=False): 

146 if not legacy: 

147 

148 def go(obj): 

149 if uncertain: 

150 try: 

151 return hash(obj) 

152 except: 

153 pass 

154 

155 raise sa_exc.InvalidRequestError( 

156 "Can't apply uniqueness to row tuple containing value of " 

157 f"""type {datatype!r}; { 

158 'the values returned appear to be' 

159 if uncertain 

160 else 'this datatype produces' 

161 } non-hashable values""" 

162 ) 

163 

164 return go 

165 elif not uncertain: 

166 return id 

167 else: 

168 _use_id = False 

169 

170 def go(obj): 

171 nonlocal _use_id 

172 

173 if not _use_id: 

174 try: 

175 return hash(obj) 

176 except: 

177 pass 

178 

179 # in #10459, we considered using a warning here, however 

180 # as legacy query uses result.unique() in all cases, this 

181 # would lead to too many warning cases. 

182 _use_id = True 

183 

184 return id(obj) 

185 

186 return go 

187 

188 unique_filters = [ 

189 ( 

190 _no_unique 

191 if context.yield_per 

192 else ( 

193 _not_hashable( 

194 ent.column.type, # type: ignore 

195 legacy=context.load_options._legacy_uniquing, 

196 uncertain=ent._null_column_type, 

197 ) 

198 if ( 

199 not ent.use_id_for_hash 

200 and (ent._non_hashable_value or ent._null_column_type) 

201 ) 

202 else id if ent.use_id_for_hash else None 

203 ) 

204 ) 

205 for ent in context.compile_state._entities 

206 ] 

207 

208 row_metadata = SimpleResultMetaData( 

209 labels, extra, _unique_filters=unique_filters 

210 ) 

211 

212 def chunks(size): # type: ignore 

213 while True: 

214 yield_per = size 

215 

216 context.partials = {} 

217 

218 if yield_per: 

219 fetch = cursor.fetchmany(yield_per) 

220 

221 if not fetch: 

222 break 

223 else: 

224 fetch = cursor._raw_all_rows() 

225 

226 if single_entity: 

227 proc = process[0] 

228 rows = [proc(row) for row in fetch] 

229 else: 

230 rows = [ 

231 tuple([proc(row) for proc in process]) for row in fetch 

232 ] 

233 

234 # if we are the originating load from a query, meaning we 

235 # aren't being called as a result of a nested "post load", 

236 # iterate through all the collected post loaders and fire them 

237 # off. Previously this used to work recursively, however that 

238 # prevented deeply nested structures from being loadable 

239 if is_top_level: 

240 if yield_per: 

241 # if using yield per, memoize the state of the 

242 # collection so that it can be restored 

243 top_level_post_loads = list( 

244 context.post_load_paths.items() 

245 ) 

246 

247 while context.post_load_paths: 

248 post_loads = list(context.post_load_paths.items()) 

249 context.post_load_paths.clear() 

250 for path, post_load in post_loads: 

251 post_load.invoke(context, path) 

252 

253 if yield_per: 

254 context.post_load_paths.clear() 

255 context.post_load_paths.update(top_level_post_loads) 

256 

257 yield rows 

258 

259 if not yield_per: 

260 break 

261 

262 if context.execution_options.get("prebuffer_rows", False): 

263 # this is a bit of a hack at the moment. 

264 # I would rather have some option in the result to pre-buffer 

265 # internally. 

266 _prebuffered = list(chunks(None)) 

267 

268 def chunks(size): 

269 return iter(_prebuffered) 

270 

271 result = ChunkedIteratorResult( 

272 row_metadata, 

273 chunks, 

274 source_supports_scalars=single_entity, 

275 raw=cursor, 

276 dynamic_yield_per=cursor.context._is_server_side, 

277 ) 

278 

279 # filtered and single_entity are used to indicate to legacy Query that the 

280 # query has ORM entities, so legacy deduping and scalars should be called 

281 # on the result. 

282 result._attributes = result._attributes.union( 

283 dict(filtered=filtered, is_single_entity=single_entity) 

284 ) 

285 

286 # multi_row_eager_loaders OTOH is specific to joinedload. 

287 if context.compile_state.multi_row_eager_loaders: 

288 

289 def require_unique(obj): 

290 raise sa_exc.InvalidRequestError( 

291 "The unique() method must be invoked on this Result, " 

292 "as it contains results that include joined eager loads " 

293 "against collections" 

294 ) 

295 

296 result._unique_filter_state = (None, require_unique) 

297 

298 if context.yield_per: 

299 result.yield_per(context.yield_per) 

300 

301 return result 

302 

303 

304@util.preload_module("sqlalchemy.orm.context") 

305def merge_frozen_result(session, statement, frozen_result, load=True): 

306 """Merge a :class:`_engine.FrozenResult` back into a :class:`_orm.Session`, 

307 returning a new :class:`_engine.Result` object with :term:`persistent` 

308 objects. 

309 

310 See the section :ref:`do_orm_execute_re_executing` for an example. 

311 

312 .. seealso:: 

313 

314 :ref:`do_orm_execute_re_executing` 

315 

316 :meth:`_engine.Result.freeze` 

317 

318 :class:`_engine.FrozenResult` 

319 

320 """ 

321 querycontext = util.preloaded.orm_context 

322 

323 if load: 

324 # flush current contents if we expect to load data 

325 session._autoflush() 

326 

327 ctx = querycontext._ORMSelectCompileState._create_entities_collection( 

328 statement, legacy=False 

329 ) 

330 

331 with session.no_autoflush: 

332 mapped_entities = [ 

333 i 

334 for i, e in enumerate(ctx._entities) 

335 if isinstance(e, querycontext._MapperEntity) 

336 ] 

337 keys = [ent._label_name for ent in ctx._entities] 

338 

339 keyed_tuple = result_tuple( 

340 keys, [ent._extra_entities for ent in ctx._entities] 

341 ) 

342 

343 result = [] 

344 for newrow in frozen_result._rewrite_rows(): 

345 for i in mapped_entities: 

346 if newrow[i] is not None: 

347 newrow[i] = session._merge( 

348 attributes.instance_state(newrow[i]), 

349 attributes.instance_dict(newrow[i]), 

350 load=load, 

351 _recursive={}, 

352 _resolve_conflict_map={}, 

353 ) 

354 

355 result.append(keyed_tuple(newrow)) 

356 

357 return frozen_result.with_new_rows(result) 

358 

359 

360@util.became_legacy_20( 

361 ":func:`_orm.merge_result`", 

362 alternative="The function as well as the method on :class:`_orm.Query` " 

363 "is superseded by the :func:`_orm.merge_frozen_result` function.", 

364) 

365@util.preload_module("sqlalchemy.orm.context") 

366def merge_result( 

367 query: Query[Any], 

368 iterator: Union[FrozenResult, Iterable[Sequence[Any]], Iterable[object]], 

369 load: bool = True, 

370) -> Union[FrozenResult, Iterable[Any]]: 

371 """Merge a result into the given :class:`.Query` object's Session. 

372 

373 See :meth:`_orm.Query.merge_result` for top-level documentation on this 

374 function. 

375 

376 """ 

377 

378 querycontext = util.preloaded.orm_context 

379 

380 session = query.session 

381 if load: 

382 # flush current contents if we expect to load data 

383 session._autoflush() 

384 

385 # TODO: need test coverage and documentation for the FrozenResult 

386 # use case. 

387 if isinstance(iterator, FrozenResult): 

388 frozen_result = iterator 

389 iterator = iter(frozen_result.data) 

390 else: 

391 frozen_result = None 

392 

393 ctx = querycontext._ORMSelectCompileState._create_entities_collection( 

394 query, legacy=True 

395 ) 

396 

397 autoflush = session.autoflush 

398 try: 

399 session.autoflush = False 

400 single_entity = not frozen_result and len(ctx._entities) == 1 

401 

402 if single_entity: 

403 if isinstance(ctx._entities[0], querycontext._MapperEntity): 

404 result = [ 

405 session._merge( 

406 attributes.instance_state(instance), 

407 attributes.instance_dict(instance), 

408 load=load, 

409 _recursive={}, 

410 _resolve_conflict_map={}, 

411 ) 

412 for instance in iterator 

413 ] 

414 else: 

415 result = list(iterator) 

416 else: 

417 mapped_entities = [ 

418 i 

419 for i, e in enumerate(ctx._entities) 

420 if isinstance(e, querycontext._MapperEntity) 

421 ] 

422 result = [] 

423 keys = [ent._label_name for ent in ctx._entities] 

424 

425 keyed_tuple = result_tuple( 

426 keys, [ent._extra_entities for ent in ctx._entities] 

427 ) 

428 

429 for row in iterator: 

430 newrow = list(row) 

431 for i in mapped_entities: 

432 if newrow[i] is not None: 

433 newrow[i] = session._merge( 

434 attributes.instance_state(newrow[i]), 

435 attributes.instance_dict(newrow[i]), 

436 load=load, 

437 _recursive={}, 

438 _resolve_conflict_map={}, 

439 ) 

440 result.append(keyed_tuple(newrow)) 

441 

442 if frozen_result: 

443 return frozen_result.with_new_rows(result) 

444 else: 

445 return iter(result) 

446 finally: 

447 session.autoflush = autoflush 

448 

449 

450def get_from_identity( 

451 session: Session, 

452 mapper: Mapper[_O], 

453 key: _IdentityKeyType[_O], 

454 passive: PassiveFlag, 

455) -> Union[LoaderCallableStatus, Optional[_O]]: 

456 """Look up the given key in the given session's identity map, 

457 check the object for expired state if found. 

458 

459 """ 

460 instance = session.identity_map.get(key) 

461 if instance is not None: 

462 state = attributes.instance_state(instance) 

463 

464 if mapper.inherits and not state.mapper.isa(mapper): 

465 return attributes.PASSIVE_CLASS_MISMATCH 

466 

467 # expired - ensure it still exists 

468 if state.expired: 

469 if not passive & attributes.SQL_OK: 

470 # TODO: no coverage here 

471 return attributes.PASSIVE_NO_RESULT 

472 elif not passive & attributes.RELATED_OBJECT_OK: 

473 # this mode is used within a flush and the instance's 

474 # expired state will be checked soon enough, if necessary. 

475 # also used by immediateloader for a mutually-dependent 

476 # o2m->m2m load, :ticket:`6301` 

477 return instance 

478 try: 

479 state._load_expired(state, passive) 

480 except orm_exc.ObjectDeletedError: 

481 session._remove_newly_deleted([state]) 

482 return None 

483 return instance 

484 else: 

485 return None 

486 

487 

488def _load_on_ident( 

489 session: Session, 

490 statement: Union[Select, FromStatement], 

491 key: Optional[_IdentityKeyType], 

492 *, 

493 load_options: Optional[Sequence[ORMOption]] = None, 

494 refresh_state: Optional[InstanceState[Any]] = None, 

495 with_for_update: Optional[ForUpdateArg] = None, 

496 only_load_props: Optional[Iterable[str]] = None, 

497 no_autoflush: bool = False, 

498 bind_arguments: Mapping[str, Any] = util.EMPTY_DICT, 

499 execution_options: _ExecuteOptions = util.EMPTY_DICT, 

500 require_pk_cols: bool = False, 

501 is_user_refresh: bool = False, 

502): 

503 """Load the given identity key from the database.""" 

504 if key is not None: 

505 ident = key[1] 

506 identity_token = key[2] 

507 else: 

508 ident = identity_token = None 

509 

510 return _load_on_pk_identity( 

511 session, 

512 statement, 

513 ident, 

514 load_options=load_options, 

515 refresh_state=refresh_state, 

516 with_for_update=with_for_update, 

517 only_load_props=only_load_props, 

518 identity_token=identity_token, 

519 no_autoflush=no_autoflush, 

520 bind_arguments=bind_arguments, 

521 execution_options=execution_options, 

522 require_pk_cols=require_pk_cols, 

523 is_user_refresh=is_user_refresh, 

524 ) 

525 

526 

527def _load_on_pk_identity( 

528 session: Session, 

529 statement: Union[Select, FromStatement], 

530 primary_key_identity: Optional[Tuple[Any, ...]], 

531 *, 

532 load_options: Optional[Sequence[ORMOption]] = None, 

533 refresh_state: Optional[InstanceState[Any]] = None, 

534 with_for_update: Optional[ForUpdateArg] = None, 

535 only_load_props: Optional[Iterable[str]] = None, 

536 identity_token: Optional[Any] = None, 

537 no_autoflush: bool = False, 

538 bind_arguments: Mapping[str, Any] = util.EMPTY_DICT, 

539 execution_options: _ExecuteOptions = util.EMPTY_DICT, 

540 require_pk_cols: bool = False, 

541 is_user_refresh: bool = False, 

542): 

543 """Load the given primary key identity from the database.""" 

544 

545 query = statement 

546 q = query._clone() 

547 

548 assert not q._is_lambda_element 

549 

550 if load_options is None: 

551 load_options = QueryContext.default_load_options 

552 

553 if ( 

554 statement._compile_options 

555 is SelectState.default_select_compile_options 

556 ): 

557 compile_options = _ORMCompileState.default_compile_options 

558 else: 

559 compile_options = statement._compile_options 

560 

561 if primary_key_identity is not None: 

562 mapper = query._propagate_attrs["plugin_subject"] 

563 

564 (_get_clause, _get_params) = mapper._get_clause 

565 

566 # None present in ident - turn those comparisons 

567 # into "IS NULL" 

568 if None in primary_key_identity: 

569 nones = { 

570 _get_params[col].key 

571 for col, value in zip(mapper.primary_key, primary_key_identity) 

572 if value is None 

573 } 

574 

575 _get_clause = sql_util.adapt_criterion_to_null(_get_clause, nones) 

576 

577 if len(nones) == len(primary_key_identity): 

578 util.warn( 

579 "fully NULL primary key identity cannot load any " 

580 "object. This condition may raise an error in a future " 

581 "release." 

582 ) 

583 

584 q._where_criteria = ( 

585 sql_util._deep_annotate(_get_clause, {"_orm_adapt": True}), 

586 ) 

587 

588 params = { 

589 _get_params[primary_key].key: id_val 

590 for id_val, primary_key in zip( 

591 primary_key_identity, mapper.primary_key 

592 ) 

593 } 

594 else: 

595 params = None 

596 

597 if with_for_update is not None: 

598 version_check = True 

599 q._for_update_arg = with_for_update 

600 elif query._for_update_arg is not None: 

601 version_check = True 

602 q._for_update_arg = query._for_update_arg 

603 else: 

604 version_check = False 

605 

606 if require_pk_cols and only_load_props: 

607 if not refresh_state: 

608 raise sa_exc.ArgumentError( 

609 "refresh_state is required when require_pk_cols is present" 

610 ) 

611 

612 refresh_state_prokeys = refresh_state.mapper._primary_key_propkeys 

613 has_changes = { 

614 key 

615 for key in refresh_state_prokeys.difference(only_load_props) 

616 if refresh_state.attrs[key].history.has_changes() 

617 } 

618 if has_changes: 

619 # raise if pending pk changes are present. 

620 # technically, this could be limited to the case where we have 

621 # relationships in the only_load_props collection to be refreshed 

622 # also (and only ones that have a secondary eager loader, at that). 

623 # however, the error is in place across the board so that behavior 

624 # here is easier to predict. The use case it prevents is one 

625 # of mutating PK attrs, leaving them unflushed, 

626 # calling session.refresh(), and expecting those attrs to remain 

627 # still unflushed. It seems likely someone doing all those 

628 # things would be better off having the PK attributes flushed 

629 # to the database before tinkering like that (session.refresh() is 

630 # tinkering). 

631 raise sa_exc.InvalidRequestError( 

632 f"Please flush pending primary key changes on " 

633 "attributes " 

634 f"{has_changes} for mapper {refresh_state.mapper} before " 

635 "proceeding with a refresh" 

636 ) 

637 

638 # overall, the ORM has no internal flow right now for "dont load the 

639 # primary row of an object at all, but fire off 

640 # selectinload/subqueryload/immediateload for some relationships". 

641 # It would probably be a pretty big effort to add such a flow. So 

642 # here, the case for #8703 is introduced; user asks to refresh some 

643 # relationship attributes only which are 

644 # selectinload/subqueryload/immediateload/ etc. (not joinedload). 

645 # ORM complains there's no columns in the primary row to load. 

646 # So here, we just add the PK cols if that 

647 # case is detected, so that there is a SELECT emitted for the primary 

648 # row. 

649 # 

650 # Let's just state right up front, for this one little case, 

651 # the ORM here is adding a whole extra SELECT just to satisfy 

652 # limitations in the internal flow. This is really not a thing 

653 # SQLAlchemy finds itself doing like, ever, obviously, we are 

654 # constantly working to *remove* SELECTs we don't need. We 

655 # rationalize this for now based on 1. session.refresh() is not 

656 # commonly used 2. session.refresh() with only relationship attrs is 

657 # even less commonly used 3. the SELECT in question is very low 

658 # latency. 

659 # 

660 # to add the flow to not include the SELECT, the quickest way 

661 # might be to just manufacture a single-row result set to send off to 

662 # instances(), but we'd have to weave that into context.py and all 

663 # that. For 2.0.0, we have enough big changes to navigate for now. 

664 # 

665 mp = refresh_state.mapper._props 

666 for p in only_load_props: 

667 if mp[p]._is_relationship: 

668 only_load_props = refresh_state_prokeys.union(only_load_props) 

669 break 

670 

671 if refresh_state and refresh_state.load_options: 

672 compile_options += {"_current_path": refresh_state.load_path.parent} 

673 q = q.options(*refresh_state.load_options) 

674 

675 new_compile_options, load_options = _set_get_options( 

676 compile_options, 

677 load_options, 

678 version_check=version_check, 

679 only_load_props=only_load_props, 

680 refresh_state=refresh_state, 

681 identity_token=identity_token, 

682 is_user_refresh=is_user_refresh, 

683 ) 

684 

685 q._compile_options = new_compile_options 

686 q._order_by = None 

687 

688 if no_autoflush: 

689 load_options += {"_autoflush": False} 

690 

691 execution_options = util.EMPTY_DICT.merge_with( 

692 execution_options, {"_sa_orm_load_options": load_options} 

693 ) 

694 result = ( 

695 session.execute( 

696 q, 

697 params=params, 

698 execution_options=execution_options, 

699 bind_arguments=bind_arguments, 

700 ) 

701 .unique() 

702 .scalars() 

703 ) 

704 

705 try: 

706 return result.one() 

707 except orm_exc.NoResultFound: 

708 return None 

709 

710 

711def _set_get_options( 

712 compile_opt, 

713 load_opt, 

714 populate_existing=None, 

715 version_check=None, 

716 only_load_props=None, 

717 refresh_state=None, 

718 identity_token=None, 

719 is_user_refresh=None, 

720): 

721 compile_options = {} 

722 load_options = {} 

723 if version_check: 

724 load_options["_version_check"] = version_check 

725 if populate_existing: 

726 load_options["_populate_existing"] = populate_existing 

727 if refresh_state: 

728 load_options["_refresh_state"] = refresh_state 

729 compile_options["_for_refresh_state"] = True 

730 if only_load_props: 

731 compile_options["_only_load_props"] = frozenset(only_load_props) 

732 if identity_token: 

733 load_options["_identity_token"] = identity_token 

734 

735 if is_user_refresh: 

736 load_options["_is_user_refresh"] = is_user_refresh 

737 if load_options: 

738 load_opt += load_options 

739 if compile_options: 

740 compile_opt += compile_options 

741 

742 return compile_opt, load_opt 

743 

744 

745def _setup_entity_query( 

746 compile_state, 

747 mapper, 

748 query_entity, 

749 path, 

750 adapter, 

751 column_collection, 

752 with_polymorphic=None, 

753 only_load_props=None, 

754 polymorphic_discriminator=None, 

755 **kw, 

756): 

757 if with_polymorphic: 

758 poly_properties = mapper._iterate_polymorphic_properties( 

759 with_polymorphic 

760 ) 

761 else: 

762 poly_properties = mapper._polymorphic_properties 

763 

764 quick_populators = {} 

765 

766 path.set(compile_state.attributes, "memoized_setups", quick_populators) 

767 

768 # for the lead entities in the path, e.g. not eager loads, and 

769 # assuming a user-passed aliased class, e.g. not a from_self() or any 

770 # implicit aliasing, don't add columns to the SELECT that aren't 

771 # in the thing that's aliased. 

772 check_for_adapt = adapter and len(path) == 1 and path[-1].is_aliased_class 

773 

774 for value in poly_properties: 

775 if only_load_props and value.key not in only_load_props: 

776 continue 

777 value.setup( 

778 compile_state, 

779 query_entity, 

780 path, 

781 adapter, 

782 only_load_props=only_load_props, 

783 column_collection=column_collection, 

784 memoized_populators=quick_populators, 

785 check_for_adapt=check_for_adapt, 

786 **kw, 

787 ) 

788 

789 if ( 

790 polymorphic_discriminator is not None 

791 and polymorphic_discriminator is not mapper.polymorphic_on 

792 ): 

793 if adapter: 

794 pd = adapter.columns[polymorphic_discriminator] 

795 else: 

796 pd = polymorphic_discriminator 

797 column_collection.append(pd) 

798 

799 

800def _warn_for_runid_changed(state): 

801 util.warn( 

802 "Loading context for %s has changed within a load/refresh " 

803 "handler, suggesting a row refresh operation took place. If this " 

804 "event handler is expected to be " 

805 "emitting row refresh operations within an existing load or refresh " 

806 "operation, set restore_load_context=True when establishing the " 

807 "listener to ensure the context remains unchanged when the event " 

808 "handler completes." % (state_str(state),) 

809 ) 

810 

811 

812def _instance_processor( 

813 query_entity, 

814 mapper, 

815 context, 

816 result, 

817 path, 

818 adapter, 

819 only_load_props=None, 

820 refresh_state=None, 

821 polymorphic_discriminator=None, 

822 _polymorphic_from=None, 

823): 

824 """Produce a mapper level row processor callable 

825 which processes rows into mapped instances.""" 

826 

827 # note that this method, most of which exists in a closure 

828 # called _instance(), resists being broken out, as 

829 # attempts to do so tend to add significant function 

830 # call overhead. _instance() is the most 

831 # performance-critical section in the whole ORM. 

832 

833 identity_class = mapper._identity_class 

834 compile_state = context.compile_state 

835 

836 # look for "row getter" functions that have been assigned along 

837 # with the compile state that were cached from a previous load. 

838 # these are operator.itemgetter() objects that each will extract a 

839 # particular column from each row. 

840 

841 getter_key = ("getters", mapper) 

842 getters = path.get(compile_state.attributes, getter_key, None) 

843 

844 if getters is None: 

845 # no getters, so go through a list of attributes we are loading for, 

846 # and the ones that are column based will have already put information 

847 # for us in another collection "memoized_setups", which represents the 

848 # output of the LoaderStrategy.setup_query() method. We can just as 

849 # easily call LoaderStrategy.create_row_processor for each, but by 

850 # getting it all at once from setup_query we save another method call 

851 # per attribute. 

852 props = mapper._prop_set 

853 if only_load_props is not None: 

854 props = props.intersection( 

855 mapper._props[k] for k in only_load_props 

856 ) 

857 

858 quick_populators = path.get( 

859 context.attributes, "memoized_setups", EMPTY_DICT 

860 ) 

861 

862 todo = [] 

863 cached_populators = { 

864 "new": [], 

865 "quick": [], 

866 "deferred": [], 

867 "expire": [], 

868 "existing": [], 

869 "eager": [], 

870 } 

871 

872 if refresh_state is None: 

873 # we can also get the "primary key" tuple getter function 

874 pk_cols = mapper.primary_key 

875 

876 if adapter: 

877 pk_cols = [adapter.columns[c] for c in pk_cols] 

878 primary_key_getter = result._tuple_getter(pk_cols) 

879 else: 

880 primary_key_getter = None 

881 

882 getters = { 

883 "cached_populators": cached_populators, 

884 "todo": todo, 

885 "primary_key_getter": primary_key_getter, 

886 } 

887 for prop in props: 

888 if prop in quick_populators: 

889 # this is an inlined path just for column-based attributes. 

890 col = quick_populators[prop] 

891 if col is _DEFER_FOR_STATE: 

892 cached_populators["new"].append( 

893 (prop.key, prop._deferred_column_loader) 

894 ) 

895 elif col is _SET_DEFERRED_EXPIRED: 

896 # note that in this path, we are no longer 

897 # searching in the result to see if the column might 

898 # be present in some unexpected way. 

899 cached_populators["expire"].append((prop.key, False)) 

900 elif col is _RAISE_FOR_STATE: 

901 cached_populators["new"].append( 

902 (prop.key, prop._raise_column_loader) 

903 ) 

904 else: 

905 getter = None 

906 if adapter: 

907 # this logic had been removed for all 1.4 releases 

908 # up until 1.4.18; the adapter here is particularly 

909 # the compound eager adapter which isn't accommodated 

910 # in the quick_populators right now. The "fallback" 

911 # logic below instead took over in many more cases 

912 # until issue #6596 was identified. 

913 

914 # note there is still an issue where this codepath 

915 # produces no "getter" for cases where a joined-inh 

916 # mapping includes a labeled column property, meaning 

917 # KeyError is caught internally and we fall back to 

918 # _getter(col), which works anyway. The adapter 

919 # here for joined inh without any aliasing might not 

920 # be useful. Tests which see this include 

921 # test.orm.inheritance.test_basic -> 

922 # EagerTargetingTest.test_adapt_stringency 

923 # OptimizedLoadTest.test_column_expression_joined 

924 # PolymorphicOnNotLocalTest.test_polymorphic_on_column_prop # noqa: E501 

925 # 

926 

927 adapted_col = adapter.columns[col] 

928 if adapted_col is not None: 

929 getter = result._getter(adapted_col, False) 

930 if not getter: 

931 getter = result._getter(col, False) 

932 if getter: 

933 cached_populators["quick"].append((prop.key, getter)) 

934 else: 

935 # fall back to the ColumnProperty itself, which 

936 # will iterate through all of its columns 

937 # to see if one fits 

938 prop.create_row_processor( 

939 context, 

940 query_entity, 

941 path, 

942 mapper, 

943 result, 

944 adapter, 

945 cached_populators, 

946 ) 

947 else: 

948 # loader strategies like subqueryload, selectinload, 

949 # joinedload, basically relationships, these need to interact 

950 # with the context each time to work correctly. 

951 todo.append(prop) 

952 

953 path.set(compile_state.attributes, getter_key, getters) 

954 

955 cached_populators = getters["cached_populators"] 

956 

957 populators = {key: list(value) for key, value in cached_populators.items()} 

958 for prop in getters["todo"]: 

959 prop.create_row_processor( 

960 context, query_entity, path, mapper, result, adapter, populators 

961 ) 

962 

963 propagated_loader_options = context.propagated_loader_options 

964 load_path = ( 

965 context.compile_state.current_path + path 

966 if context.compile_state.current_path.path 

967 else path 

968 ) 

969 

970 session_identity_map = context.session.identity_map 

971 

972 populate_existing = context.populate_existing or mapper.always_refresh 

973 load_evt = bool(mapper.class_manager.dispatch.load) 

974 refresh_evt = bool(mapper.class_manager.dispatch.refresh) 

975 persistent_evt = bool(context.session.dispatch.loaded_as_persistent) 

976 if persistent_evt: 

977 loaded_as_persistent = context.session.dispatch.loaded_as_persistent 

978 instance_state = attributes.instance_state 

979 instance_dict = attributes.instance_dict 

980 session_id = context.session.hash_key 

981 runid = context.runid 

982 identity_token = context.identity_token 

983 

984 version_check = context.version_check 

985 if version_check: 

986 version_id_col = mapper.version_id_col 

987 if version_id_col is not None: 

988 if adapter: 

989 version_id_col = adapter.columns[version_id_col] 

990 version_id_getter = result._getter(version_id_col) 

991 else: 

992 version_id_getter = None 

993 

994 if not refresh_state and _polymorphic_from is not None: 

995 key = ("loader", path.path) 

996 

997 if key in context.attributes and context.attributes[key].strategy == ( 

998 ("selectinload_polymorphic", True), 

999 ): 

1000 option_entities = context.attributes[key].local_opts["entities"] 

1001 else: 

1002 option_entities = None 

1003 selectin_load_via = mapper._should_selectin_load( 

1004 option_entities, 

1005 _polymorphic_from, 

1006 ) 

1007 

1008 if selectin_load_via and selectin_load_via is not _polymorphic_from: 

1009 # only_load_props goes w/ refresh_state only, and in a refresh 

1010 # we are a single row query for the exact entity; polymorphic 

1011 # loading does not apply 

1012 assert only_load_props is None 

1013 

1014 if selectin_load_via.is_mapper: 

1015 _load_supers = [] 

1016 _endmost_mapper = selectin_load_via 

1017 while ( 

1018 _endmost_mapper 

1019 and _endmost_mapper is not _polymorphic_from 

1020 ): 

1021 _load_supers.append(_endmost_mapper) 

1022 _endmost_mapper = _endmost_mapper.inherits 

1023 else: 

1024 _load_supers = [selectin_load_via] 

1025 

1026 for _selectinload_entity in _load_supers: 

1027 if _PostLoad.path_exists( 

1028 context, load_path, _selectinload_entity 

1029 ): 

1030 continue 

1031 callable_ = _load_subclass_via_in( 

1032 context, 

1033 path, 

1034 _selectinload_entity, 

1035 _polymorphic_from, 

1036 option_entities, 

1037 ) 

1038 _PostLoad.callable_for_path( 

1039 context, 

1040 load_path, 

1041 _selectinload_entity.mapper, 

1042 _selectinload_entity, 

1043 callable_, 

1044 _selectinload_entity, 

1045 ) 

1046 

1047 post_load = _PostLoad.for_context(context, load_path, only_load_props) 

1048 

1049 if refresh_state: 

1050 refresh_identity_key = refresh_state.key 

1051 if refresh_identity_key is None: 

1052 # super-rare condition; a refresh is being called 

1053 # on a non-instance-key instance; this is meant to only 

1054 # occur within a flush() 

1055 refresh_identity_key = mapper._identity_key_from_state( 

1056 refresh_state 

1057 ) 

1058 else: 

1059 refresh_identity_key = None 

1060 

1061 primary_key_getter = getters["primary_key_getter"] 

1062 

1063 if mapper.allow_partial_pks: 

1064 is_not_primary_key = _none_set.issuperset 

1065 else: 

1066 is_not_primary_key = _none_set.intersection 

1067 

1068 def _instance(row): 

1069 # determine the state that we'll be populating 

1070 if refresh_identity_key: 

1071 # fixed state that we're refreshing 

1072 state = refresh_state 

1073 instance = state.obj() 

1074 dict_ = instance_dict(instance) 

1075 isnew = state.runid != runid 

1076 currentload = True 

1077 loaded_instance = False 

1078 else: 

1079 # look at the row, see if that identity is in the 

1080 # session, or we have to create a new one 

1081 identitykey = ( 

1082 identity_class, 

1083 primary_key_getter(row), 

1084 identity_token, 

1085 ) 

1086 

1087 instance = session_identity_map.get(identitykey) 

1088 

1089 if instance is not None: 

1090 # existing instance 

1091 state = instance_state(instance) 

1092 dict_ = instance_dict(instance) 

1093 

1094 isnew = state.runid != runid 

1095 currentload = not isnew 

1096 loaded_instance = False 

1097 

1098 if version_check and version_id_getter and not currentload: 

1099 _validate_version_id( 

1100 mapper, state, dict_, row, version_id_getter 

1101 ) 

1102 

1103 else: 

1104 # create a new instance 

1105 

1106 # check for non-NULL values in the primary key columns, 

1107 # else no entity is returned for the row 

1108 if is_not_primary_key(identitykey[1]): 

1109 return None 

1110 

1111 isnew = True 

1112 currentload = True 

1113 loaded_instance = True 

1114 

1115 instance = mapper.class_manager.new_instance() 

1116 

1117 dict_ = instance_dict(instance) 

1118 state = instance_state(instance) 

1119 state.key = identitykey 

1120 state.identity_token = identity_token 

1121 

1122 # attach instance to session. 

1123 state.session_id = session_id 

1124 session_identity_map._add_unpresent(state, identitykey) 

1125 

1126 effective_populate_existing = populate_existing 

1127 if refresh_state is state: 

1128 effective_populate_existing = True 

1129 

1130 # populate. this looks at whether this state is new 

1131 # for this load or was existing, and whether or not this 

1132 # row is the first row with this identity. 

1133 if currentload or effective_populate_existing: 

1134 # full population routines. Objects here are either 

1135 # just created, or we are doing a populate_existing 

1136 

1137 # be conservative about setting load_path when populate_existing 

1138 # is in effect; want to maintain options from the original 

1139 # load. see test_expire->test_refresh_maintains_deferred_options 

1140 if isnew and ( 

1141 propagated_loader_options or not effective_populate_existing 

1142 ): 

1143 state.load_options = propagated_loader_options 

1144 state.load_path = load_path 

1145 

1146 _populate_full( 

1147 context, 

1148 row, 

1149 state, 

1150 dict_, 

1151 isnew, 

1152 load_path, 

1153 loaded_instance, 

1154 effective_populate_existing, 

1155 populators, 

1156 ) 

1157 

1158 if isnew: 

1159 # state.runid should be equal to context.runid / runid 

1160 # here, however for event checks we are being more conservative 

1161 # and checking against existing run id 

1162 # assert state.runid == runid 

1163 

1164 existing_runid = state.runid 

1165 

1166 if loaded_instance: 

1167 if load_evt: 

1168 state.manager.dispatch.load(state, context) 

1169 if state.runid != existing_runid: 

1170 _warn_for_runid_changed(state) 

1171 if persistent_evt: 

1172 loaded_as_persistent(context.session, state) 

1173 if state.runid != existing_runid: 

1174 _warn_for_runid_changed(state) 

1175 elif refresh_evt: 

1176 state.manager.dispatch.refresh( 

1177 state, context, only_load_props 

1178 ) 

1179 if state.runid != runid: 

1180 _warn_for_runid_changed(state) 

1181 

1182 if effective_populate_existing or state.modified: 

1183 if refresh_state and only_load_props: 

1184 state._commit(dict_, only_load_props) 

1185 else: 

1186 state._commit_all(dict_, session_identity_map) 

1187 

1188 if post_load: 

1189 post_load.add_state(state, True) 

1190 

1191 else: 

1192 # partial population routines, for objects that were already 

1193 # in the Session, but a row matches them; apply eager loaders 

1194 # on existing objects, etc. 

1195 unloaded = state.unloaded 

1196 isnew = state not in context.partials 

1197 

1198 if not isnew or unloaded or populators["eager"]: 

1199 # state is having a partial set of its attributes 

1200 # refreshed. Populate those attributes, 

1201 # and add to the "context.partials" collection. 

1202 

1203 to_load = _populate_partial( 

1204 context, 

1205 row, 

1206 state, 

1207 dict_, 

1208 isnew, 

1209 load_path, 

1210 unloaded, 

1211 populators, 

1212 ) 

1213 

1214 if isnew: 

1215 if refresh_evt: 

1216 existing_runid = state.runid 

1217 state.manager.dispatch.refresh(state, context, to_load) 

1218 if state.runid != existing_runid: 

1219 _warn_for_runid_changed(state) 

1220 

1221 state._commit(dict_, to_load) 

1222 

1223 if post_load and context.invoke_all_eagers: 

1224 post_load.add_state(state, False) 

1225 

1226 return instance 

1227 

1228 if mapper.polymorphic_map and not _polymorphic_from and not refresh_state: 

1229 # if we are doing polymorphic, dispatch to a different _instance() 

1230 # method specific to the subclass mapper 

1231 def ensure_no_pk(row): 

1232 identitykey = ( 

1233 identity_class, 

1234 primary_key_getter(row), 

1235 identity_token, 

1236 ) 

1237 if not is_not_primary_key(identitykey[1]): 

1238 return identitykey 

1239 else: 

1240 return None 

1241 

1242 _instance = _decorate_polymorphic_switch( 

1243 _instance, 

1244 context, 

1245 query_entity, 

1246 mapper, 

1247 result, 

1248 path, 

1249 polymorphic_discriminator, 

1250 adapter, 

1251 ensure_no_pk, 

1252 ) 

1253 

1254 return _instance 

1255 

1256 

1257def _load_subclass_via_in( 

1258 context, path, entity, polymorphic_from, option_entities 

1259): 

1260 mapper = entity.mapper 

1261 

1262 # TODO: polymorphic_from seems to be a Mapper in all cases. 

1263 # this is likely not needed, but as we dont have typing in loading.py 

1264 # yet, err on the safe side 

1265 polymorphic_from_mapper = polymorphic_from.mapper 

1266 not_against_basemost = polymorphic_from_mapper.inherits is not None 

1267 

1268 zero_idx = len(mapper.base_mapper.primary_key) == 1 

1269 

1270 if entity.is_aliased_class or not_against_basemost: 

1271 q, enable_opt, disable_opt = mapper._subclass_load_via_in( 

1272 entity, polymorphic_from 

1273 ) 

1274 else: 

1275 q, enable_opt, disable_opt = mapper._subclass_load_via_in_mapper 

1276 

1277 def do_load(context, path, states, load_only, effective_entity): 

1278 if not option_entities: 

1279 # filter out states for those that would have selectinloaded 

1280 # from another loader 

1281 # TODO: we are currently ignoring the case where the 

1282 # "selectin_polymorphic" option is used, as this is much more 

1283 # complex / specific / very uncommon API use 

1284 states = [ 

1285 (s, v) 

1286 for s, v in states 

1287 if s.mapper._would_selectin_load_only_from_given_mapper(mapper) 

1288 ] 

1289 

1290 if not states: 

1291 return 

1292 

1293 orig_query = context.query 

1294 

1295 if path.parent: 

1296 enable_opt_lcl = enable_opt._prepend_path(path) 

1297 disable_opt_lcl = disable_opt._prepend_path(path) 

1298 else: 

1299 enable_opt_lcl = enable_opt 

1300 disable_opt_lcl = disable_opt 

1301 options = ( 

1302 (enable_opt_lcl,) + orig_query._with_options + (disable_opt_lcl,) 

1303 ) 

1304 

1305 q2 = q.options(*options) 

1306 

1307 q2._compile_options = context.compile_state.default_compile_options 

1308 q2._compile_options += {"_current_path": path.parent} 

1309 

1310 if context.populate_existing: 

1311 q2 = q2.execution_options(populate_existing=True) 

1312 

1313 while states: 

1314 chunk = states[0 : _SelectInLoader._chunksize] 

1315 states = states[_SelectInLoader._chunksize :] 

1316 context.session.execute( 

1317 q2, 

1318 dict( 

1319 primary_keys=[ 

1320 state.key[1][0] if zero_idx else state.key[1] 

1321 for state, load_attrs in chunk 

1322 ] 

1323 ), 

1324 ).unique().scalars().all() 

1325 

1326 return do_load 

1327 

1328 

1329def _populate_full( 

1330 context, 

1331 row, 

1332 state, 

1333 dict_, 

1334 isnew, 

1335 load_path, 

1336 loaded_instance, 

1337 populate_existing, 

1338 populators, 

1339): 

1340 if isnew: 

1341 # first time we are seeing a row with this identity. 

1342 state.runid = context.runid 

1343 

1344 for key, getter in populators["quick"]: 

1345 dict_[key] = getter(row) 

1346 if populate_existing: 

1347 for key, set_callable in populators["expire"]: 

1348 dict_.pop(key, None) 

1349 if set_callable: 

1350 state.expired_attributes.add(key) 

1351 else: 

1352 for key, set_callable in populators["expire"]: 

1353 if set_callable: 

1354 state.expired_attributes.add(key) 

1355 

1356 for key, populator in populators["new"]: 

1357 populator(state, dict_, row) 

1358 

1359 elif load_path != state.load_path: 

1360 # new load path, e.g. object is present in more than one 

1361 # column position in a series of rows 

1362 state.load_path = load_path 

1363 

1364 # if we have data, and the data isn't in the dict, OK, let's put 

1365 # it in. 

1366 for key, getter in populators["quick"]: 

1367 if key not in dict_: 

1368 dict_[key] = getter(row) 

1369 

1370 # otherwise treat like an "already seen" row 

1371 for key, populator in populators["existing"]: 

1372 populator(state, dict_, row) 

1373 # TODO: allow "existing" populator to know this is 

1374 # a new path for the state: 

1375 # populator(state, dict_, row, new_path=True) 

1376 

1377 else: 

1378 # have already seen rows with this identity in this same path. 

1379 for key, populator in populators["existing"]: 

1380 populator(state, dict_, row) 

1381 

1382 # TODO: same path 

1383 # populator(state, dict_, row, new_path=False) 

1384 

1385 

1386def _populate_partial( 

1387 context, row, state, dict_, isnew, load_path, unloaded, populators 

1388): 

1389 if not isnew: 

1390 if unloaded: 

1391 # extra pass, see #8166 

1392 for key, getter in populators["quick"]: 

1393 if key in unloaded: 

1394 dict_[key] = getter(row) 

1395 

1396 to_load = context.partials[state] 

1397 for key, populator in populators["existing"]: 

1398 if key in to_load: 

1399 populator(state, dict_, row) 

1400 else: 

1401 to_load = unloaded 

1402 context.partials[state] = to_load 

1403 

1404 for key, getter in populators["quick"]: 

1405 if key in to_load: 

1406 dict_[key] = getter(row) 

1407 for key, set_callable in populators["expire"]: 

1408 if key in to_load: 

1409 dict_.pop(key, None) 

1410 if set_callable: 

1411 state.expired_attributes.add(key) 

1412 for key, populator in populators["new"]: 

1413 if key in to_load: 

1414 populator(state, dict_, row) 

1415 

1416 for key, populator in populators["eager"]: 

1417 if key not in unloaded: 

1418 populator(state, dict_, row) 

1419 

1420 return to_load 

1421 

1422 

1423def _validate_version_id(mapper, state, dict_, row, getter): 

1424 if mapper._get_state_attr_by_column( 

1425 state, dict_, mapper.version_id_col 

1426 ) != getter(row): 

1427 raise orm_exc.StaleDataError( 

1428 "Instance '%s' has version id '%s' which " 

1429 "does not match database-loaded version id '%s'." 

1430 % ( 

1431 state_str(state), 

1432 mapper._get_state_attr_by_column( 

1433 state, dict_, mapper.version_id_col 

1434 ), 

1435 getter(row), 

1436 ) 

1437 ) 

1438 

1439 

1440def _decorate_polymorphic_switch( 

1441 instance_fn, 

1442 context, 

1443 query_entity, 

1444 mapper, 

1445 result, 

1446 path, 

1447 polymorphic_discriminator, 

1448 adapter, 

1449 ensure_no_pk, 

1450): 

1451 if polymorphic_discriminator is not None: 

1452 polymorphic_on = polymorphic_discriminator 

1453 else: 

1454 polymorphic_on = mapper.polymorphic_on 

1455 if polymorphic_on is None: 

1456 return instance_fn 

1457 

1458 if adapter: 

1459 polymorphic_on = adapter.columns[polymorphic_on] 

1460 

1461 def configure_subclass_mapper(discriminator): 

1462 try: 

1463 sub_mapper = mapper.polymorphic_map[discriminator] 

1464 except KeyError: 

1465 raise AssertionError( 

1466 "No such polymorphic_identity %r is defined" % discriminator 

1467 ) 

1468 else: 

1469 if sub_mapper is mapper: 

1470 return None 

1471 elif not sub_mapper.isa(mapper): 

1472 return False 

1473 

1474 return _instance_processor( 

1475 query_entity, 

1476 sub_mapper, 

1477 context, 

1478 result, 

1479 path, 

1480 adapter, 

1481 _polymorphic_from=mapper, 

1482 ) 

1483 

1484 polymorphic_instances = util.PopulateDict(configure_subclass_mapper) 

1485 

1486 getter = result._getter(polymorphic_on) 

1487 

1488 def polymorphic_instance(row): 

1489 discriminator = getter(row) 

1490 if discriminator is not None: 

1491 _instance = polymorphic_instances[discriminator] 

1492 if _instance: 

1493 return _instance(row) 

1494 elif _instance is False: 

1495 identitykey = ensure_no_pk(row) 

1496 

1497 if identitykey: 

1498 raise sa_exc.InvalidRequestError( 

1499 "Row with identity key %s can't be loaded into an " 

1500 "object; the polymorphic discriminator column '%s' " 

1501 "refers to %s, which is not a sub-mapper of " 

1502 "the requested %s" 

1503 % ( 

1504 identitykey, 

1505 polymorphic_on, 

1506 mapper.polymorphic_map[discriminator], 

1507 mapper, 

1508 ) 

1509 ) 

1510 else: 

1511 return None 

1512 else: 

1513 return instance_fn(row) 

1514 else: 

1515 identitykey = ensure_no_pk(row) 

1516 

1517 if identitykey: 

1518 raise sa_exc.InvalidRequestError( 

1519 "Row with identity key %s can't be loaded into an " 

1520 "object; the polymorphic discriminator column '%s' is " 

1521 "NULL" % (identitykey, polymorphic_on) 

1522 ) 

1523 else: 

1524 return None 

1525 

1526 return polymorphic_instance 

1527 

1528 

1529class _PostLoad: 

1530 """Track loaders and states for "post load" operations.""" 

1531 

1532 __slots__ = "loaders", "states", "load_keys" 

1533 

1534 def __init__(self): 

1535 self.loaders = {} 

1536 self.states = util.OrderedDict() 

1537 self.load_keys = None 

1538 

1539 def add_state(self, state, overwrite): 

1540 # the states for a polymorphic load here are all shared 

1541 # within a single PostLoad object among multiple subtypes. 

1542 # Filtering of callables on a per-subclass basis needs to be done at 

1543 # the invocation level 

1544 self.states[state] = overwrite 

1545 

1546 def invoke(self, context, path): 

1547 if not self.states: 

1548 return 

1549 path = path_registry.PathRegistry.coerce(path) 

1550 for ( 

1551 effective_context, 

1552 token, 

1553 limit_to_mapper, 

1554 loader, 

1555 arg, 

1556 kw, 

1557 ) in self.loaders.values(): 

1558 states = [ 

1559 (state, overwrite) 

1560 for state, overwrite in self.states.items() 

1561 if state.manager.mapper.isa(limit_to_mapper) 

1562 ] 

1563 if states: 

1564 loader( 

1565 effective_context, path, states, self.load_keys, *arg, **kw 

1566 ) 

1567 self.states.clear() 

1568 

1569 @classmethod 

1570 def for_context(cls, context, path, only_load_props): 

1571 pl = context.post_load_paths.get(path.path) 

1572 if pl is not None and only_load_props: 

1573 pl.load_keys = only_load_props 

1574 return pl 

1575 

1576 @classmethod 

1577 def path_exists(self, context, path, key): 

1578 return ( 

1579 path.path in context.post_load_paths 

1580 and key in context.post_load_paths[path.path].loaders 

1581 ) 

1582 

1583 @classmethod 

1584 def callable_for_path( 

1585 cls, context, path, limit_to_mapper, token, loader_callable, *arg, **kw 

1586 ): 

1587 if path.path in context.post_load_paths: 

1588 pl = context.post_load_paths[path.path] 

1589 else: 

1590 pl = context.post_load_paths[path.path] = _PostLoad() 

1591 pl.loaders[token] = ( 

1592 context, 

1593 token, 

1594 limit_to_mapper, 

1595 loader_callable, 

1596 arg, 

1597 kw, 

1598 ) 

1599 

1600 

1601def _load_scalar_attributes(mapper, state, attribute_names, passive): 

1602 """initiate a column-based attribute refresh operation.""" 

1603 

1604 # assert mapper is _state_mapper(state) 

1605 session = state.session 

1606 if not session: 

1607 raise orm_exc.DetachedInstanceError( 

1608 "Instance %s is not bound to a Session; " 

1609 "attribute refresh operation cannot proceed" % (state_str(state)) 

1610 ) 

1611 

1612 no_autoflush = bool(passive & attributes.NO_AUTOFLUSH) 

1613 

1614 # in the case of inheritance, particularly concrete and abstract 

1615 # concrete inheritance, the class manager might have some keys 

1616 # of attributes on the superclass that we didn't actually map. 

1617 # These could be mapped as "concrete, don't load" or could be completely 

1618 # excluded from the mapping and we know nothing about them. Filter them 

1619 # here to prevent them from coming through. 

1620 if attribute_names: 

1621 attribute_names = attribute_names.intersection(mapper.attrs.keys()) 

1622 

1623 if mapper.inherits and not mapper.concrete: 

1624 # load based on committed attributes in the object, formed into 

1625 # a truncated SELECT that only includes relevant tables. does not 

1626 # currently use state.key 

1627 statement = mapper._optimized_get_statement(state, attribute_names) 

1628 if statement is not None: 

1629 # undefer() isn't needed here because statement has the 

1630 # columns needed already, this implicitly undefers that column 

1631 stmt = FromStatement(mapper, statement) 

1632 

1633 return _load_on_ident( 

1634 session, 

1635 stmt, 

1636 None, 

1637 only_load_props=attribute_names, 

1638 refresh_state=state, 

1639 no_autoflush=no_autoflush, 

1640 ) 

1641 

1642 # normal load, use state.key as the identity to SELECT 

1643 has_key = bool(state.key) 

1644 

1645 if has_key: 

1646 identity_key = state.key 

1647 else: 

1648 # this codepath is rare - only valid when inside a flush, and the 

1649 # object is becoming persistent but hasn't yet been assigned 

1650 # an identity_key. 

1651 # check here to ensure we have the attrs we need. 

1652 pk_attrs = [ 

1653 mapper._columntoproperty[col].key for col in mapper.primary_key 

1654 ] 

1655 if state.expired_attributes.intersection(pk_attrs): 

1656 raise sa_exc.InvalidRequestError( 

1657 "Instance %s cannot be refreshed - it's not " 

1658 " persistent and does not " 

1659 "contain a full primary key." % state_str(state) 

1660 ) 

1661 identity_key = mapper._identity_key_from_state(state) 

1662 

1663 if ( 

1664 _none_set.issubset(identity_key) and not mapper.allow_partial_pks 

1665 ) or _none_set.issuperset(identity_key): 

1666 util.warn_limited( 

1667 "Instance %s to be refreshed doesn't " 

1668 "contain a full primary key - can't be refreshed " 

1669 "(and shouldn't be expired, either).", 

1670 state_str(state), 

1671 ) 

1672 return 

1673 

1674 result = _load_on_ident( 

1675 session, 

1676 select(mapper).set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL), 

1677 identity_key, 

1678 refresh_state=state, 

1679 only_load_props=attribute_names, 

1680 no_autoflush=no_autoflush, 

1681 ) 

1682 

1683 # if instance is pending, a refresh operation 

1684 # may not complete (even if PK attributes are assigned) 

1685 if has_key and result is None: 

1686 raise orm_exc.ObjectDeletedError(state)