Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/loading.py: 11%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

666 statements  

1# orm/loading.py 

2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: ignore-errors 

8 

9 

10"""private module containing functions used to convert database 

11rows into object instances and associated state. 

12 

13the functions here are called primarily by Query, Mapper, 

14as well as some of the attribute loading strategies. 

15 

16""" 

17 

18from __future__ import annotations 

19 

20from typing import Any 

21from typing import Dict 

22from typing import Iterable 

23from typing import List 

24from typing import Mapping 

25from typing import Optional 

26from typing import Sequence 

27from typing import Tuple 

28from typing import TYPE_CHECKING 

29from typing import TypeVar 

30from typing import Union 

31 

32from . import attributes 

33from . import exc as orm_exc 

34from . import path_registry 

35from .base import _DEFER_FOR_STATE 

36from .base import _RAISE_FOR_STATE 

37from .base import _SET_DEFERRED_EXPIRED 

38from .base import PassiveFlag 

39from .context import _ORMCompileState 

40from .context import FromStatement 

41from .context import QueryContext 

42from .strategies import _SelectInLoader 

43from .util import _none_set 

44from .util import state_str 

45from .. import exc as sa_exc 

46from .. import util 

47from ..engine import result_tuple 

48from ..engine.result import ChunkedIteratorResult 

49from ..engine.result import FrozenResult 

50from ..engine.result import SimpleResultMetaData 

51from ..sql import select 

52from ..sql import util as sql_util 

53from ..sql.selectable import ForUpdateArg 

54from ..sql.selectable import SelectState 

55from ..util import EMPTY_DICT 

56from ..util.typing import TupleAny 

57from ..util.typing import Unpack 

58 

59if TYPE_CHECKING: 

60 from ._typing import _IdentityKeyType 

61 from .base import LoaderCallableStatus 

62 from .interfaces import ORMOption 

63 from .mapper import Mapper 

64 from .query import Query 

65 from .session import Session 

66 from .state import InstanceState 

67 from ..engine.cursor import CursorResult 

68 from ..engine.interfaces import _ExecuteOptions 

69 from ..engine.result import Result 

70 from ..sql import Select 

71 

72_T = TypeVar("_T", bound=Any) 

73_O = TypeVar("_O", bound=object) 

74_new_runid = util.counter() 

75 

76 

77_PopulatorDict = Dict[str, List[Tuple[str, Any]]] 

78 

79 

80def instances( 

81 cursor: CursorResult[Unpack[TupleAny]], context: QueryContext 

82) -> Result[Unpack[TupleAny]]: 

83 """Return a :class:`.Result` given an ORM query context. 

84 

85 :param cursor: a :class:`.CursorResult`, generated by a statement 

86 which came from :class:`.ORMCompileState` 

87 

88 :param context: a :class:`.QueryContext` object 

89 

90 :return: a :class:`.Result` object representing ORM results 

91 

92 .. versionchanged:: 1.4 The instances() function now uses 

93 :class:`.Result` objects and has an all new interface. 

94 

95 """ 

96 

97 context.runid = _new_runid() 

98 

99 if context.top_level_context: 

100 is_top_level = False 

101 context.post_load_paths = context.top_level_context.post_load_paths 

102 else: 

103 is_top_level = True 

104 context.post_load_paths = {} 

105 

106 compile_state = context.compile_state 

107 filtered = compile_state._has_mapper_entities 

108 single_entity = ( 

109 not context.load_options._only_return_tuples 

110 and len(compile_state._entities) == 1 

111 and compile_state._entities[0].supports_single_entity 

112 ) 

113 

114 try: 

115 process, labels, extra = list( 

116 zip( 

117 *[ 

118 query_entity.row_processor(context, cursor) 

119 for query_entity in context.compile_state._entities 

120 ] 

121 ) 

122 ) 

123 

124 if context.yield_per and ( 

125 context.loaders_require_buffering 

126 or context.loaders_require_uniquing 

127 ): 

128 raise sa_exc.InvalidRequestError( 

129 "Can't use yield_per with eager loaders that require uniquing " 

130 "or row buffering, e.g. joinedload() against collections " 

131 "or subqueryload(). Consider the selectinload() strategy " 

132 "for better flexibility in loading objects." 

133 ) 

134 

135 except Exception: 

136 with util.safe_reraise(): 

137 cursor.close() 

138 

139 def _not_hashable(datatype, *, legacy=False, uncertain=False): 

140 if not legacy: 

141 

142 def go(obj): 

143 if uncertain: 

144 try: 

145 return hash(obj) 

146 except: 

147 pass 

148 

149 raise sa_exc.InvalidRequestError( 

150 "Can't apply uniqueness to row tuple containing value of " 

151 f"""type {datatype!r}; { 

152 'the values returned appear to be' 

153 if uncertain 

154 else 'this datatype produces' 

155 } non-hashable values""" 

156 ) 

157 

158 return go 

159 elif not uncertain: 

160 return id 

161 else: 

162 _use_id = False 

163 

164 def go(obj): 

165 nonlocal _use_id 

166 

167 if not _use_id: 

168 try: 

169 return hash(obj) 

170 except: 

171 pass 

172 

173 # in #10459, we considered using a warning here, however 

174 # as legacy query uses result.unique() in all cases, this 

175 # would lead to too many warning cases. 

176 _use_id = True 

177 

178 return id(obj) 

179 

180 return go 

181 

182 _uniquing_is_active = False 

183 

184 def _create_unique_filters(result): 

185 nonlocal _uniquing_is_active 

186 

187 if result._yield_per: 

188 raise sa_exc.InvalidRequestError( 

189 "Can't use the ORM yield_per feature " 

190 "in conjunction with unique()" 

191 ) 

192 

193 _uniquing_is_active = True 

194 return [ 

195 ( 

196 _not_hashable( 

197 ent.column.type, # type: ignore 

198 legacy=context.load_options._legacy_uniquing, 

199 uncertain=ent._null_column_type, 

200 ) 

201 if ( 

202 not ent.use_id_for_hash 

203 and (ent._non_hashable_value or ent._null_column_type) 

204 ) 

205 else id if ent.use_id_for_hash else None 

206 ) 

207 for ent in context.compile_state._entities 

208 ] 

209 

210 row_metadata = SimpleResultMetaData( 

211 labels, extra, _create_unique_filters=_create_unique_filters 

212 ) 

213 

214 def chunks(size): # type: ignore 

215 while True: 

216 yield_per = size 

217 

218 context.partials = {} 

219 

220 if yield_per: 

221 if _uniquing_is_active: 

222 raise sa_exc.InvalidRequestError( 

223 "Can't use the ORM yield_per feature " 

224 "in conjunction with unique()" 

225 ) 

226 fetch = cursor.fetchmany(yield_per) 

227 

228 if not fetch: 

229 break 

230 else: 

231 fetch = cursor._raw_all_tuples() 

232 

233 if single_entity: 

234 proc = process[0] 

235 rows = [proc(row) for row in fetch] 

236 else: 

237 rows = [ 

238 tuple([proc(row) for proc in process]) for row in fetch 

239 ] 

240 

241 # if we are the originating load from a query, meaning we 

242 # aren't being called as a result of a nested "post load", 

243 # iterate through all the collected post loaders and fire them 

244 # off. Previously this used to work recursively, however that 

245 # prevented deeply nested structures from being loadable 

246 if is_top_level: 

247 if yield_per: 

248 # if using yield per, memoize the state of the 

249 # collection so that it can be restored 

250 top_level_post_loads = list( 

251 context.post_load_paths.items() 

252 ) 

253 

254 while context.post_load_paths: 

255 post_loads = list(context.post_load_paths.items()) 

256 context.post_load_paths.clear() 

257 for path, post_load in post_loads: 

258 post_load.invoke(context, path) 

259 

260 if yield_per: 

261 context.post_load_paths.clear() 

262 context.post_load_paths.update(top_level_post_loads) 

263 

264 yield rows 

265 

266 if not yield_per: 

267 break 

268 

269 if context.execution_options.get("prebuffer_rows", False): 

270 # this is a bit of a hack at the moment. 

271 # I would rather have some option in the result to pre-buffer 

272 # internally. 

273 _prebuffered = list(chunks(None)) 

274 

275 def chunks(size): 

276 return iter(_prebuffered) 

277 

278 result = ChunkedIteratorResult( 

279 row_metadata, 

280 chunks, 

281 source_supports_scalars=single_entity, 

282 raw=cursor, 

283 dynamic_yield_per=cursor.context._is_server_side, 

284 context=context, 

285 ) 

286 

287 # filtered and single_entity are used to indicate to legacy Query that the 

288 # query has ORM entities, so legacy deduping and scalars should be called 

289 # on the result. 

290 result._attributes = result._attributes.union( 

291 dict(filtered=filtered, is_single_entity=single_entity) 

292 ) 

293 

294 # multi_row_eager_loaders OTOH is specific to joinedload. 

295 if context.requires_uniquing: 

296 

297 def require_unique(obj): 

298 raise sa_exc.InvalidRequestError( 

299 "The unique() method must be invoked on this Result, " 

300 "as it contains results that include joined eager loads " 

301 "against collections" 

302 ) 

303 

304 result._unique_filter_state = (set(), require_unique) 

305 

306 if context.yield_per: 

307 result.yield_per(context.yield_per) 

308 

309 return result 

310 

311 

312@util.preload_module("sqlalchemy.orm.context") 

313def merge_frozen_result(session, statement, frozen_result, load=True): 

314 """Merge a :class:`_engine.FrozenResult` back into a :class:`_orm.Session`, 

315 returning a new :class:`_engine.Result` object with :term:`persistent` 

316 objects. 

317 

318 See the section :ref:`do_orm_execute_re_executing` for an example. 

319 

320 .. seealso:: 

321 

322 :ref:`do_orm_execute_re_executing` 

323 

324 :meth:`_engine.Result.freeze` 

325 

326 :class:`_engine.FrozenResult` 

327 

328 """ 

329 querycontext = util.preloaded.orm_context 

330 

331 if load: 

332 # flush current contents if we expect to load data 

333 session._autoflush() 

334 

335 ctx = querycontext._ORMSelectCompileState._create_entities_collection( 

336 statement, legacy=False 

337 ) 

338 

339 with session.no_autoflush: 

340 mapped_entities = [ 

341 i 

342 for i, e in enumerate(ctx._entities) 

343 if isinstance(e, querycontext._MapperEntity) 

344 ] 

345 keys = [ent._label_name for ent in ctx._entities] 

346 

347 keyed_tuple = result_tuple( 

348 keys, [ent._extra_entities for ent in ctx._entities] 

349 ) 

350 

351 result = [] 

352 for newrow in frozen_result._rewrite_rows(): 

353 for i in mapped_entities: 

354 if newrow[i] is not None: 

355 newrow[i] = session._merge( 

356 attributes.instance_state(newrow[i]), 

357 attributes.instance_dict(newrow[i]), 

358 load=load, 

359 _recursive={}, 

360 _resolve_conflict_map={}, 

361 ) 

362 

363 result.append(keyed_tuple(newrow)) 

364 

365 return frozen_result.with_new_rows(result) 

366 

367 

368@util.became_legacy_20( 

369 ":func:`_orm.merge_result`", 

370 alternative="The function as well as the method on :class:`_orm.Query` " 

371 "is superseded by the :func:`_orm.merge_frozen_result` function.", 

372) 

373@util.preload_module("sqlalchemy.orm.context") 

374def merge_result( 

375 query: Query[Any], 

376 iterator: Union[FrozenResult, Iterable[Sequence[Any]], Iterable[object]], 

377 load: bool = True, 

378) -> Union[FrozenResult, Iterable[Any]]: 

379 """Merge a result into the given :class:`.Query` object's Session. 

380 

381 See :meth:`_orm.Query.merge_result` for top-level documentation on this 

382 function. 

383 

384 """ 

385 

386 querycontext = util.preloaded.orm_context 

387 

388 session = query.session 

389 if load: 

390 # flush current contents if we expect to load data 

391 session._autoflush() 

392 

393 # TODO: need test coverage and documentation for the FrozenResult 

394 # use case. 

395 if isinstance(iterator, FrozenResult): 

396 frozen_result = iterator 

397 iterator = iter(frozen_result.data) 

398 else: 

399 frozen_result = None 

400 

401 ctx = querycontext._ORMSelectCompileState._create_entities_collection( 

402 query, legacy=True 

403 ) 

404 

405 autoflush = session.autoflush 

406 try: 

407 session.autoflush = False 

408 single_entity = not frozen_result and len(ctx._entities) == 1 

409 

410 if single_entity: 

411 if isinstance(ctx._entities[0], querycontext._MapperEntity): 

412 result = [ 

413 session._merge( 

414 attributes.instance_state(instance), 

415 attributes.instance_dict(instance), 

416 load=load, 

417 _recursive={}, 

418 _resolve_conflict_map={}, 

419 ) 

420 for instance in iterator 

421 ] 

422 else: 

423 result = list(iterator) 

424 else: 

425 mapped_entities = [ 

426 i 

427 for i, e in enumerate(ctx._entities) 

428 if isinstance(e, querycontext._MapperEntity) 

429 ] 

430 result = [] 

431 keys = [ent._label_name for ent in ctx._entities] 

432 

433 keyed_tuple = result_tuple( 

434 keys, [ent._extra_entities for ent in ctx._entities] 

435 ) 

436 

437 for row in iterator: 

438 newrow = list(row) 

439 for i in mapped_entities: 

440 if newrow[i] is not None: 

441 newrow[i] = session._merge( 

442 attributes.instance_state(newrow[i]), 

443 attributes.instance_dict(newrow[i]), 

444 load=load, 

445 _recursive={}, 

446 _resolve_conflict_map={}, 

447 ) 

448 result.append(keyed_tuple(newrow)) 

449 

450 if frozen_result: 

451 return frozen_result.with_new_rows(result) 

452 else: 

453 return iter(result) 

454 finally: 

455 session.autoflush = autoflush 

456 

457 

458def get_from_identity( 

459 session: Session, 

460 mapper: Mapper[_O], 

461 key: _IdentityKeyType[_O], 

462 passive: PassiveFlag, 

463) -> Union[LoaderCallableStatus, Optional[_O]]: 

464 """Look up the given key in the given session's identity map, 

465 check the object for expired state if found. 

466 

467 """ 

468 instance = session.identity_map.get(key) 

469 if instance is not None: 

470 state = attributes.instance_state(instance) 

471 

472 if mapper.inherits and not state.mapper.isa(mapper): 

473 return attributes.PASSIVE_CLASS_MISMATCH 

474 

475 # expired - ensure it still exists 

476 if state.expired: 

477 if not passive & attributes.SQL_OK: 

478 # TODO: no coverage here 

479 return attributes.PASSIVE_NO_RESULT 

480 elif not passive & attributes.RELATED_OBJECT_OK: 

481 # this mode is used within a flush and the instance's 

482 # expired state will be checked soon enough, if necessary. 

483 # also used by immediateloader for a mutually-dependent 

484 # o2m->m2m load, :ticket:`6301` 

485 return instance 

486 try: 

487 state._load_expired(state, passive) 

488 except orm_exc.ObjectDeletedError: 

489 session._remove_newly_deleted([state]) 

490 return None 

491 return instance 

492 else: 

493 return None 

494 

495 

496def _load_on_ident( 

497 session: Session, 

498 statement: Union[Select, FromStatement], 

499 key: Optional[_IdentityKeyType], 

500 *, 

501 load_options: Optional[Sequence[ORMOption]] = None, 

502 refresh_state: Optional[InstanceState[Any]] = None, 

503 with_for_update: Optional[ForUpdateArg] = None, 

504 only_load_props: Optional[Iterable[str]] = None, 

505 no_autoflush: bool = False, 

506 bind_arguments: Mapping[str, Any] = util.EMPTY_DICT, 

507 execution_options: _ExecuteOptions = util.EMPTY_DICT, 

508 require_pk_cols: bool = False, 

509 is_user_refresh: bool = False, 

510): 

511 """Load the given identity key from the database.""" 

512 if key is not None: 

513 ident = key[1] 

514 identity_token = key[2] 

515 else: 

516 ident = identity_token = None 

517 

518 return _load_on_pk_identity( 

519 session, 

520 statement, 

521 ident, 

522 load_options=load_options, 

523 refresh_state=refresh_state, 

524 with_for_update=with_for_update, 

525 only_load_props=only_load_props, 

526 identity_token=identity_token, 

527 no_autoflush=no_autoflush, 

528 bind_arguments=bind_arguments, 

529 execution_options=execution_options, 

530 require_pk_cols=require_pk_cols, 

531 is_user_refresh=is_user_refresh, 

532 ) 

533 

534 

535def _load_on_pk_identity( 

536 session: Session, 

537 statement: Union[Select, FromStatement], 

538 primary_key_identity: Optional[Tuple[Any, ...]], 

539 *, 

540 load_options: Optional[Sequence[ORMOption]] = None, 

541 refresh_state: Optional[InstanceState[Any]] = None, 

542 with_for_update: Optional[ForUpdateArg] = None, 

543 only_load_props: Optional[Iterable[str]] = None, 

544 identity_token: Optional[Any] = None, 

545 no_autoflush: bool = False, 

546 bind_arguments: Mapping[str, Any] = util.EMPTY_DICT, 

547 execution_options: _ExecuteOptions = util.EMPTY_DICT, 

548 require_pk_cols: bool = False, 

549 is_user_refresh: bool = False, 

550): 

551 """Load the given primary key identity from the database.""" 

552 

553 query = statement 

554 q = query._clone() 

555 

556 assert not q._is_lambda_element 

557 

558 if load_options is None: 

559 load_options = QueryContext.default_load_options 

560 

561 if ( 

562 statement._compile_options 

563 is SelectState.default_select_compile_options 

564 ): 

565 compile_options = _ORMCompileState.default_compile_options 

566 else: 

567 compile_options = statement._compile_options 

568 

569 if primary_key_identity is not None: 

570 mapper = query._propagate_attrs["plugin_subject"] 

571 

572 _get_clause, _get_params = mapper._get_clause 

573 

574 # None present in ident - turn those comparisons 

575 # into "IS NULL" 

576 if None in primary_key_identity: 

577 nones = { 

578 _get_params[col].key 

579 for col, value in zip(mapper.primary_key, primary_key_identity) 

580 if value is None 

581 } 

582 

583 _get_clause = sql_util.adapt_criterion_to_null(_get_clause, nones) 

584 

585 if len(nones) == len(primary_key_identity): 

586 util.warn( 

587 "fully NULL primary key identity cannot load any " 

588 "object. This condition may raise an error in a future " 

589 "release." 

590 ) 

591 

592 q._where_criteria = (_get_clause,) 

593 

594 params = { 

595 _get_params[primary_key].key: id_val 

596 for id_val, primary_key in zip( 

597 primary_key_identity, mapper.primary_key 

598 ) 

599 } 

600 else: 

601 params = None 

602 

603 if with_for_update is not None: 

604 version_check = True 

605 q._for_update_arg = with_for_update 

606 elif query._for_update_arg is not None: 

607 version_check = True 

608 q._for_update_arg = query._for_update_arg 

609 else: 

610 version_check = False 

611 

612 if require_pk_cols and only_load_props: 

613 if not refresh_state: 

614 raise sa_exc.ArgumentError( 

615 "refresh_state is required when require_pk_cols is present" 

616 ) 

617 

618 refresh_state_prokeys = refresh_state.mapper._primary_key_propkeys 

619 has_changes = { 

620 key 

621 for key in refresh_state_prokeys.difference(only_load_props) 

622 if refresh_state.attrs[key].history.has_changes() 

623 } 

624 if has_changes: 

625 # raise if pending pk changes are present. 

626 # technically, this could be limited to the case where we have 

627 # relationships in the only_load_props collection to be refreshed 

628 # also (and only ones that have a secondary eager loader, at that). 

629 # however, the error is in place across the board so that behavior 

630 # here is easier to predict. The use case it prevents is one 

631 # of mutating PK attrs, leaving them unflushed, 

632 # calling session.refresh(), and expecting those attrs to remain 

633 # still unflushed. It seems likely someone doing all those 

634 # things would be better off having the PK attributes flushed 

635 # to the database before tinkering like that (session.refresh() is 

636 # tinkering). 

637 raise sa_exc.InvalidRequestError( 

638 f"Please flush pending primary key changes on " 

639 "attributes " 

640 f"{has_changes} for mapper {refresh_state.mapper} before " 

641 "proceeding with a refresh" 

642 ) 

643 

644 # overall, the ORM has no internal flow right now for "dont load the 

645 # primary row of an object at all, but fire off 

646 # selectinload/subqueryload/immediateload for some relationships". 

647 # It would probably be a pretty big effort to add such a flow. So 

648 # here, the case for #8703 is introduced; user asks to refresh some 

649 # relationship attributes only which are 

650 # selectinload/subqueryload/immediateload/ etc. (not joinedload). 

651 # ORM complains there's no columns in the primary row to load. 

652 # So here, we just add the PK cols if that 

653 # case is detected, so that there is a SELECT emitted for the primary 

654 # row. 

655 # 

656 # Let's just state right up front, for this one little case, 

657 # the ORM here is adding a whole extra SELECT just to satisfy 

658 # limitations in the internal flow. This is really not a thing 

659 # SQLAlchemy finds itself doing like, ever, obviously, we are 

660 # constantly working to *remove* SELECTs we don't need. We 

661 # rationalize this for now based on 1. session.refresh() is not 

662 # commonly used 2. session.refresh() with only relationship attrs is 

663 # even less commonly used 3. the SELECT in question is very low 

664 # latency. 

665 # 

666 # to add the flow to not include the SELECT, the quickest way 

667 # might be to just manufacture a single-row result set to send off to 

668 # instances(), but we'd have to weave that into context.py and all 

669 # that. For 2.0.0, we have enough big changes to navigate for now. 

670 # 

671 mp = refresh_state.mapper._props 

672 for p in only_load_props: 

673 if mp[p]._is_relationship: 

674 only_load_props = refresh_state_prokeys.union(only_load_props) 

675 break 

676 

677 if refresh_state and refresh_state.load_options: 

678 compile_options += {"_current_path": refresh_state.load_path.parent} 

679 q = q.options(*refresh_state.load_options) 

680 

681 new_compile_options, load_options = _set_get_options( 

682 compile_options, 

683 load_options, 

684 version_check=version_check, 

685 only_load_props=only_load_props, 

686 refresh_state=refresh_state, 

687 identity_token=identity_token, 

688 is_user_refresh=is_user_refresh, 

689 ) 

690 

691 q._compile_options = new_compile_options 

692 q._order_by = None 

693 

694 if no_autoflush: 

695 load_options += {"_autoflush": False} 

696 

697 execution_options = util.EMPTY_DICT.merge_with( 

698 execution_options, 

699 util.immutabledict(_sa_orm_load_options=load_options), 

700 ) 

701 result = ( 

702 session.execute( 

703 q, 

704 params=params, 

705 execution_options=execution_options, 

706 bind_arguments=bind_arguments, 

707 ) 

708 .unique() 

709 .scalars() 

710 ) 

711 

712 try: 

713 return result.one() 

714 except orm_exc.NoResultFound: 

715 return None 

716 

717 

718def _set_get_options( 

719 compile_opt, 

720 load_opt, 

721 populate_existing=None, 

722 version_check=None, 

723 only_load_props=None, 

724 refresh_state=None, 

725 identity_token=None, 

726 is_user_refresh=None, 

727): 

728 compile_options = {} 

729 load_options = {} 

730 if version_check: 

731 load_options["_version_check"] = version_check 

732 if populate_existing: 

733 load_options["_populate_existing"] = populate_existing 

734 if refresh_state: 

735 load_options["_refresh_state"] = refresh_state 

736 compile_options["_for_refresh_state"] = True 

737 if only_load_props: 

738 compile_options["_only_load_props"] = frozenset(only_load_props) 

739 if identity_token: 

740 load_options["_identity_token"] = identity_token 

741 

742 if is_user_refresh: 

743 load_options["_is_user_refresh"] = is_user_refresh 

744 if load_options: 

745 load_opt += load_options 

746 if compile_options: 

747 compile_opt += compile_options 

748 

749 return compile_opt, load_opt 

750 

751 

752def _setup_entity_query( 

753 compile_state, 

754 mapper, 

755 query_entity, 

756 path, 

757 adapter, 

758 column_collection, 

759 with_polymorphic=None, 

760 only_load_props=None, 

761 polymorphic_discriminator=None, 

762 **kw, 

763): 

764 if with_polymorphic: 

765 poly_properties = mapper._iterate_polymorphic_properties( 

766 with_polymorphic 

767 ) 

768 else: 

769 poly_properties = mapper._polymorphic_properties 

770 

771 quick_populators = {} 

772 

773 path.set(compile_state.attributes, "memoized_setups", quick_populators) 

774 

775 # for the lead entities in the path, e.g. not eager loads, and 

776 # assuming a user-passed aliased class, e.g. not a from_self() or any 

777 # implicit aliasing, don't add columns to the SELECT that aren't 

778 # in the thing that's aliased. 

779 check_for_adapt = adapter and len(path) == 1 and path[-1].is_aliased_class 

780 

781 for value in poly_properties: 

782 if only_load_props and value.key not in only_load_props: 

783 continue 

784 value.setup( 

785 compile_state, 

786 query_entity, 

787 path, 

788 adapter, 

789 only_load_props=only_load_props, 

790 column_collection=column_collection, 

791 memoized_populators=quick_populators, 

792 check_for_adapt=check_for_adapt, 

793 **kw, 

794 ) 

795 

796 if ( 

797 polymorphic_discriminator is not None 

798 and polymorphic_discriminator is not mapper.polymorphic_on 

799 ): 

800 if adapter: 

801 pd = adapter.columns[polymorphic_discriminator] 

802 else: 

803 pd = polymorphic_discriminator 

804 column_collection.append(pd) 

805 

806 

807def _warn_for_runid_changed(state): 

808 util.warn( 

809 "Loading context for %s has changed within a load/refresh " 

810 "handler, suggesting a row refresh operation took place. If this " 

811 "event handler is expected to be " 

812 "emitting row refresh operations within an existing load or refresh " 

813 "operation, set restore_load_context=True when establishing the " 

814 "listener to ensure the context remains unchanged when the event " 

815 "handler completes." % (state_str(state),) 

816 ) 

817 

818 

819def _instance_processor( 

820 query_entity, 

821 mapper, 

822 context, 

823 result, 

824 path, 

825 adapter, 

826 only_load_props=None, 

827 refresh_state=None, 

828 polymorphic_discriminator=None, 

829 _polymorphic_from=None, 

830): 

831 """Produce a mapper level row processor callable 

832 which processes rows into mapped instances.""" 

833 

834 # note that this method, most of which exists in a closure 

835 # called _instance(), resists being broken out, as 

836 # attempts to do so tend to add significant function 

837 # call overhead. _instance() is the most 

838 # performance-critical section in the whole ORM. 

839 

840 identity_class = mapper._identity_class 

841 compile_state = context.compile_state 

842 

843 # look for "row getter" functions that have been assigned along 

844 # with the compile state that were cached from a previous load. 

845 # these are operator.itemgetter() objects that each will extract a 

846 # particular column from each row. 

847 

848 getter_key = ("getters", mapper) 

849 getters = path.get(compile_state.attributes, getter_key, None) 

850 

851 if getters is None: 

852 # no getters, so go through a list of attributes we are loading for, 

853 # and the ones that are column based will have already put information 

854 # for us in another collection "memoized_setups", which represents the 

855 # output of the LoaderStrategy.setup_query() method. We can just as 

856 # easily call LoaderStrategy.create_row_processor for each, but by 

857 # getting it all at once from setup_query we save another method call 

858 # per attribute. 

859 props = mapper._prop_set 

860 if only_load_props is not None: 

861 props = props.intersection( 

862 mapper._props[k] for k in only_load_props 

863 ) 

864 

865 quick_populators = path.get( 

866 context.attributes, "memoized_setups", EMPTY_DICT 

867 ) 

868 

869 todo = [] 

870 cached_populators = { 

871 "new": [], 

872 "quick": [], 

873 "deferred": [], 

874 "expire": [], 

875 "existing": [], 

876 "eager": [], 

877 } 

878 

879 if refresh_state is None: 

880 # we can also get the "primary key" tuple getter function 

881 pk_cols = mapper.primary_key 

882 

883 if adapter: 

884 pk_cols = [adapter.columns[c] for c in pk_cols] 

885 primary_key_getter = result._tuple_getter(pk_cols) 

886 else: 

887 primary_key_getter = None 

888 

889 getters = { 

890 "cached_populators": cached_populators, 

891 "todo": todo, 

892 "primary_key_getter": primary_key_getter, 

893 } 

894 for prop in props: 

895 if prop in quick_populators: 

896 # this is an inlined path just for column-based attributes. 

897 col = quick_populators[prop] 

898 if col is _DEFER_FOR_STATE: 

899 cached_populators["new"].append( 

900 (prop.key, prop._deferred_column_loader) 

901 ) 

902 elif col is _SET_DEFERRED_EXPIRED: 

903 # note that in this path, we are no longer 

904 # searching in the result to see if the column might 

905 # be present in some unexpected way. 

906 cached_populators["expire"].append((prop.key, False)) 

907 elif col is _RAISE_FOR_STATE: 

908 cached_populators["new"].append( 

909 (prop.key, prop._raise_column_loader) 

910 ) 

911 else: 

912 getter = None 

913 if adapter: 

914 # this logic had been removed for all 1.4 releases 

915 # up until 1.4.18; the adapter here is particularly 

916 # the compound eager adapter which isn't accommodated 

917 # in the quick_populators right now. The "fallback" 

918 # logic below instead took over in many more cases 

919 # until issue #6596 was identified. 

920 

921 # note there is still an issue where this codepath 

922 # produces no "getter" for cases where a joined-inh 

923 # mapping includes a labeled column property, meaning 

924 # KeyError is caught internally and we fall back to 

925 # _getter(col), which works anyway. The adapter 

926 # here for joined inh without any aliasing might not 

927 # be useful. Tests which see this include 

928 # test.orm.inheritance.test_basic -> 

929 # EagerTargetingTest.test_adapt_stringency 

930 # OptimizedLoadTest.test_column_expression_joined 

931 # PolymorphicOnNotLocalTest.test_polymorphic_on_column_prop # noqa: E501 

932 # 

933 

934 adapted_col = adapter.columns[col] 

935 if adapted_col is not None: 

936 getter = result._getter(adapted_col, False) 

937 if not getter: 

938 getter = result._getter(col, False) 

939 if getter: 

940 cached_populators["quick"].append((prop.key, getter)) 

941 else: 

942 # fall back to the ColumnProperty itself, which 

943 # will iterate through all of its columns 

944 # to see if one fits 

945 prop.create_row_processor( 

946 context, 

947 query_entity, 

948 path, 

949 mapper, 

950 result, 

951 adapter, 

952 cached_populators, 

953 ) 

954 else: 

955 # loader strategies like subqueryload, selectinload, 

956 # joinedload, basically relationships, these need to interact 

957 # with the context each time to work correctly. 

958 todo.append(prop) 

959 

960 path.set(compile_state.attributes, getter_key, getters) 

961 

962 cached_populators = getters["cached_populators"] 

963 

964 populators = {key: list(value) for key, value in cached_populators.items()} 

965 for prop in getters["todo"]: 

966 prop.create_row_processor( 

967 context, query_entity, path, mapper, result, adapter, populators 

968 ) 

969 

970 propagated_loader_options = context.propagated_loader_options 

971 load_path = ( 

972 context.compile_state.current_path + path 

973 if context.compile_state.current_path.path 

974 else path 

975 ) 

976 

977 session_identity_map = context.session.identity_map 

978 

979 populate_existing = context.populate_existing or mapper.always_refresh 

980 load_evt = bool(mapper.class_manager.dispatch.load) 

981 refresh_evt = bool(mapper.class_manager.dispatch.refresh) 

982 persistent_evt = bool(context.session.dispatch.loaded_as_persistent) 

983 if persistent_evt: 

984 loaded_as_persistent = context.session.dispatch.loaded_as_persistent 

985 instance_state = attributes.instance_state 

986 instance_dict = attributes.instance_dict 

987 session_id = context.session.hash_key 

988 runid = context.runid 

989 identity_token = context.identity_token 

990 

991 version_check = context.version_check 

992 if version_check: 

993 version_id_col = mapper.version_id_col 

994 if version_id_col is not None: 

995 if adapter: 

996 version_id_col = adapter.columns[version_id_col] 

997 version_id_getter = result._getter(version_id_col) 

998 else: 

999 version_id_getter = None 

1000 

1001 if not refresh_state and _polymorphic_from is not None: 

1002 key = ("loader", path.path) 

1003 

1004 if key in context.attributes and context.attributes[key].strategy == ( 

1005 ("selectinload_polymorphic", True), 

1006 ): 

1007 option_entities = context.attributes[key].local_opts["entities"] 

1008 else: 

1009 option_entities = None 

1010 selectin_load_via = mapper._should_selectin_load( 

1011 option_entities, 

1012 _polymorphic_from, 

1013 ) 

1014 

1015 if selectin_load_via and selectin_load_via is not _polymorphic_from: 

1016 # only_load_props goes w/ refresh_state only, and in a refresh 

1017 # we are a single row query for the exact entity; polymorphic 

1018 # loading does not apply 

1019 assert only_load_props is None 

1020 

1021 if selectin_load_via.is_mapper: 

1022 _load_supers = [] 

1023 _endmost_mapper = selectin_load_via 

1024 while ( 

1025 _endmost_mapper 

1026 and _endmost_mapper is not _polymorphic_from 

1027 ): 

1028 _load_supers.append(_endmost_mapper) 

1029 _endmost_mapper = _endmost_mapper.inherits 

1030 else: 

1031 _load_supers = [selectin_load_via] 

1032 

1033 for _selectinload_entity in _load_supers: 

1034 if _PostLoad.path_exists( 

1035 context, load_path, _selectinload_entity 

1036 ): 

1037 continue 

1038 callable_ = _load_subclass_via_in( 

1039 context, 

1040 path, 

1041 _selectinload_entity, 

1042 _polymorphic_from, 

1043 option_entities, 

1044 ) 

1045 _PostLoad.callable_for_path( 

1046 context, 

1047 load_path, 

1048 _selectinload_entity.mapper, 

1049 _selectinload_entity, 

1050 callable_, 

1051 _selectinload_entity, 

1052 ) 

1053 

1054 post_load = _PostLoad.for_context(context, load_path, only_load_props) 

1055 

1056 if refresh_state: 

1057 refresh_identity_key = refresh_state.key 

1058 if refresh_identity_key is None: 

1059 # super-rare condition; a refresh is being called 

1060 # on a non-instance-key instance; this is meant to only 

1061 # occur within a flush() 

1062 refresh_identity_key = mapper._identity_key_from_state( 

1063 refresh_state 

1064 ) 

1065 else: 

1066 refresh_identity_key = None 

1067 

1068 primary_key_getter = getters["primary_key_getter"] 

1069 

1070 if mapper.allow_partial_pks: 

1071 is_not_primary_key = _none_set.issuperset 

1072 else: 

1073 is_not_primary_key = _none_set.intersection 

1074 

1075 def _instance(row): 

1076 # determine the state that we'll be populating 

1077 if refresh_identity_key: 

1078 # fixed state that we're refreshing 

1079 state = refresh_state 

1080 instance = state.obj() 

1081 dict_ = instance_dict(instance) 

1082 isnew = state.runid != runid 

1083 currentload = True 

1084 loaded_instance = False 

1085 else: 

1086 # look at the row, see if that identity is in the 

1087 # session, or we have to create a new one 

1088 identitykey = ( 

1089 identity_class, 

1090 primary_key_getter(row), 

1091 identity_token, 

1092 ) 

1093 

1094 instance = session_identity_map.get(identitykey) 

1095 

1096 if instance is not None: 

1097 # existing instance 

1098 state = instance_state(instance) 

1099 dict_ = instance_dict(instance) 

1100 

1101 isnew = state.runid != runid 

1102 currentload = not isnew 

1103 loaded_instance = False 

1104 

1105 if version_check and version_id_getter and not currentload: 

1106 _validate_version_id( 

1107 mapper, state, dict_, row, version_id_getter 

1108 ) 

1109 

1110 else: 

1111 # create a new instance 

1112 

1113 # check for non-NULL values in the primary key columns, 

1114 # else no entity is returned for the row 

1115 if is_not_primary_key(identitykey[1]): 

1116 return None 

1117 

1118 isnew = True 

1119 currentload = True 

1120 loaded_instance = True 

1121 

1122 instance = mapper.class_manager.new_instance() 

1123 

1124 dict_ = instance_dict(instance) 

1125 state = instance_state(instance) 

1126 state.key = identitykey 

1127 state.identity_token = identity_token 

1128 

1129 # attach instance to session. 

1130 state.session_id = session_id 

1131 session_identity_map._add_unpresent(state, identitykey) 

1132 

1133 effective_populate_existing = populate_existing 

1134 if refresh_state is state: 

1135 effective_populate_existing = True 

1136 

1137 # populate. this looks at whether this state is new 

1138 # for this load or was existing, and whether or not this 

1139 # row is the first row with this identity. 

1140 if currentload or effective_populate_existing: 

1141 # full population routines. Objects here are either 

1142 # just created, or we are doing a populate_existing 

1143 

1144 # be conservative about setting load_path when populate_existing 

1145 # is in effect; want to maintain options from the original 

1146 # load. see test_expire->test_refresh_maintains_deferred_options 

1147 if isnew and ( 

1148 propagated_loader_options or not effective_populate_existing 

1149 ): 

1150 state.load_options = propagated_loader_options 

1151 state.load_path = load_path 

1152 

1153 _populate_full( 

1154 context, 

1155 row, 

1156 state, 

1157 dict_, 

1158 isnew, 

1159 load_path, 

1160 loaded_instance, 

1161 effective_populate_existing, 

1162 populators, 

1163 ) 

1164 

1165 if isnew: 

1166 # state.runid should be equal to context.runid / runid 

1167 # here, however for event checks we are being more conservative 

1168 # and checking against existing run id 

1169 # assert state.runid == runid 

1170 

1171 existing_runid = state.runid 

1172 

1173 if loaded_instance: 

1174 if load_evt: 

1175 state.manager.dispatch.load(state, context) 

1176 if state.runid != existing_runid: 

1177 _warn_for_runid_changed(state) 

1178 if persistent_evt: 

1179 loaded_as_persistent(context.session, state) 

1180 if state.runid != existing_runid: 

1181 _warn_for_runid_changed(state) 

1182 elif refresh_evt: 

1183 state.manager.dispatch.refresh( 

1184 state, context, only_load_props 

1185 ) 

1186 if state.runid != runid: 

1187 _warn_for_runid_changed(state) 

1188 

1189 if effective_populate_existing or state.modified: 

1190 if refresh_state and only_load_props: 

1191 state._commit(dict_, only_load_props) 

1192 else: 

1193 state._commit_all(dict_, session_identity_map) 

1194 

1195 if post_load: 

1196 post_load.add_state(state, True) 

1197 

1198 else: 

1199 # partial population routines, for objects that were already 

1200 # in the Session, but a row matches them; apply eager loaders 

1201 # on existing objects, etc. 

1202 unloaded = state.unloaded 

1203 isnew = state not in context.partials 

1204 

1205 if not isnew or unloaded or populators["eager"]: 

1206 # state is having a partial set of its attributes 

1207 # refreshed. Populate those attributes, 

1208 # and add to the "context.partials" collection. 

1209 

1210 to_load = _populate_partial( 

1211 context, 

1212 row, 

1213 state, 

1214 dict_, 

1215 isnew, 

1216 load_path, 

1217 unloaded, 

1218 populators, 

1219 ) 

1220 

1221 if isnew: 

1222 if refresh_evt: 

1223 existing_runid = state.runid 

1224 state.manager.dispatch.refresh(state, context, to_load) 

1225 if state.runid != existing_runid: 

1226 _warn_for_runid_changed(state) 

1227 

1228 state._commit(dict_, to_load) 

1229 

1230 if post_load and context.invoke_all_eagers: 

1231 post_load.add_state(state, False) 

1232 

1233 return instance 

1234 

1235 if mapper.polymorphic_map and not _polymorphic_from and not refresh_state: 

1236 # if we are doing polymorphic, dispatch to a different _instance() 

1237 # method specific to the subclass mapper 

1238 def ensure_no_pk(row): 

1239 identitykey = ( 

1240 identity_class, 

1241 primary_key_getter(row), 

1242 identity_token, 

1243 ) 

1244 if not is_not_primary_key(identitykey[1]): 

1245 return identitykey 

1246 else: 

1247 return None 

1248 

1249 _instance = _decorate_polymorphic_switch( 

1250 _instance, 

1251 context, 

1252 query_entity, 

1253 mapper, 

1254 result, 

1255 path, 

1256 polymorphic_discriminator, 

1257 adapter, 

1258 ensure_no_pk, 

1259 ) 

1260 

1261 return _instance 

1262 

1263 

1264def _load_subclass_via_in( 

1265 context, path, entity, polymorphic_from, option_entities 

1266): 

1267 mapper = entity.mapper 

1268 

1269 # TODO: polymorphic_from seems to be a Mapper in all cases. 

1270 # this is likely not needed, but as we dont have typing in loading.py 

1271 # yet, err on the safe side 

1272 polymorphic_from_mapper = polymorphic_from.mapper 

1273 not_against_basemost = polymorphic_from_mapper.inherits is not None 

1274 

1275 zero_idx = len(mapper.base_mapper.primary_key) == 1 

1276 

1277 if entity.is_aliased_class or not_against_basemost: 

1278 q, enable_opt, disable_opt = mapper._subclass_load_via_in( 

1279 entity, polymorphic_from 

1280 ) 

1281 else: 

1282 q, enable_opt, disable_opt = mapper._subclass_load_via_in_mapper 

1283 

1284 def do_load(context, path, states, load_only, effective_entity): 

1285 if not option_entities: 

1286 # filter out states for those that would have selectinloaded 

1287 # from another loader 

1288 # TODO: we are currently ignoring the case where the 

1289 # "selectin_polymorphic" option is used, as this is much more 

1290 # complex / specific / very uncommon API use 

1291 states = [ 

1292 (s, v) 

1293 for s, v in states 

1294 if s.mapper._would_selectin_load_only_from_given_mapper(mapper) 

1295 ] 

1296 

1297 if not states: 

1298 return 

1299 

1300 orig_query = context.query 

1301 

1302 if path.parent: 

1303 enable_opt_lcl = enable_opt._prepend_path(path) 

1304 disable_opt_lcl = disable_opt._prepend_path(path) 

1305 else: 

1306 enable_opt_lcl = enable_opt 

1307 disable_opt_lcl = disable_opt 

1308 options = ( 

1309 (enable_opt_lcl,) + orig_query._with_options + (disable_opt_lcl,) 

1310 ) 

1311 

1312 q2 = q.options(*options) 

1313 

1314 q2._compile_options = context.compile_state.default_compile_options 

1315 q2._compile_options += {"_current_path": path.parent} 

1316 

1317 if context.populate_existing: 

1318 q2 = q2.execution_options(populate_existing=True) 

1319 

1320 while states: 

1321 chunk = states[0 : _SelectInLoader._chunksize] 

1322 states = states[_SelectInLoader._chunksize :] 

1323 context.session.execute( 

1324 q2, 

1325 dict( 

1326 primary_keys=[ 

1327 state.key[1][0] if zero_idx else state.key[1] 

1328 for state, load_attrs in chunk 

1329 ] 

1330 ), 

1331 ).unique().scalars().all() 

1332 

1333 return do_load 

1334 

1335 

1336def _populate_full( 

1337 context, 

1338 row, 

1339 state, 

1340 dict_, 

1341 isnew, 

1342 load_path, 

1343 loaded_instance, 

1344 populate_existing, 

1345 populators, 

1346): 

1347 if isnew: 

1348 # first time we are seeing a row with this identity. 

1349 state.runid = context.runid 

1350 

1351 for key, getter in populators["quick"]: 

1352 dict_[key] = getter(row) 

1353 if populate_existing: 

1354 for key, set_callable in populators["expire"]: 

1355 dict_.pop(key, None) 

1356 if set_callable: 

1357 state.expired_attributes.add(key) 

1358 else: 

1359 for key, set_callable in populators["expire"]: 

1360 if set_callable: 

1361 state.expired_attributes.add(key) 

1362 

1363 for key, populator in populators["new"]: 

1364 populator(state, dict_, row) 

1365 

1366 elif load_path != state.load_path: 

1367 # new load path, e.g. object is present in more than one 

1368 # column position in a series of rows 

1369 state.load_path = load_path 

1370 

1371 # if we have data, and the data isn't in the dict, OK, let's put 

1372 # it in. 

1373 for key, getter in populators["quick"]: 

1374 if key not in dict_: 

1375 dict_[key] = getter(row) 

1376 

1377 # otherwise treat like an "already seen" row 

1378 for key, populator in populators["existing"]: 

1379 populator(state, dict_, row) 

1380 # TODO: allow "existing" populator to know this is 

1381 # a new path for the state: 

1382 # populator(state, dict_, row, new_path=True) 

1383 

1384 else: 

1385 # have already seen rows with this identity in this same path. 

1386 for key, populator in populators["existing"]: 

1387 populator(state, dict_, row) 

1388 

1389 # TODO: same path 

1390 # populator(state, dict_, row, new_path=False) 

1391 

1392 

1393def _populate_partial( 

1394 context, row, state, dict_, isnew, load_path, unloaded, populators 

1395): 

1396 if not isnew: 

1397 if unloaded: 

1398 # extra pass, see #8166 

1399 for key, getter in populators["quick"]: 

1400 if key in unloaded: 

1401 dict_[key] = getter(row) 

1402 

1403 to_load = context.partials[state] 

1404 for key, populator in populators["existing"]: 

1405 if key in to_load: 

1406 populator(state, dict_, row) 

1407 else: 

1408 to_load = unloaded 

1409 context.partials[state] = to_load 

1410 

1411 for key, getter in populators["quick"]: 

1412 if key in to_load: 

1413 dict_[key] = getter(row) 

1414 for key, set_callable in populators["expire"]: 

1415 if key in to_load: 

1416 dict_.pop(key, None) 

1417 if set_callable: 

1418 state.expired_attributes.add(key) 

1419 for key, populator in populators["new"]: 

1420 if key in to_load: 

1421 populator(state, dict_, row) 

1422 

1423 for key, populator in populators["eager"]: 

1424 if key not in unloaded: 

1425 populator(state, dict_, row) 

1426 

1427 return to_load 

1428 

1429 

1430def _validate_version_id(mapper, state, dict_, row, getter): 

1431 if mapper._get_state_attr_by_column( 

1432 state, dict_, mapper.version_id_col 

1433 ) != getter(row): 

1434 raise orm_exc.StaleDataError( 

1435 "Instance '%s' has version id '%s' which " 

1436 "does not match database-loaded version id '%s'." 

1437 % ( 

1438 state_str(state), 

1439 mapper._get_state_attr_by_column( 

1440 state, dict_, mapper.version_id_col 

1441 ), 

1442 getter(row), 

1443 ) 

1444 ) 

1445 

1446 

1447def _decorate_polymorphic_switch( 

1448 instance_fn, 

1449 context, 

1450 query_entity, 

1451 mapper, 

1452 result, 

1453 path, 

1454 polymorphic_discriminator, 

1455 adapter, 

1456 ensure_no_pk, 

1457): 

1458 if polymorphic_discriminator is not None: 

1459 polymorphic_on = polymorphic_discriminator 

1460 else: 

1461 polymorphic_on = mapper.polymorphic_on 

1462 if polymorphic_on is None: 

1463 return instance_fn 

1464 

1465 if adapter: 

1466 polymorphic_on = adapter.columns[polymorphic_on] 

1467 

1468 def configure_subclass_mapper(discriminator): 

1469 try: 

1470 sub_mapper = mapper.polymorphic_map[discriminator] 

1471 except KeyError: 

1472 raise AssertionError( 

1473 "No such polymorphic_identity %r is defined" % discriminator 

1474 ) 

1475 else: 

1476 if sub_mapper is mapper: 

1477 return None 

1478 elif not sub_mapper.isa(mapper): 

1479 return False 

1480 

1481 return _instance_processor( 

1482 query_entity, 

1483 sub_mapper, 

1484 context, 

1485 result, 

1486 path, 

1487 adapter, 

1488 _polymorphic_from=mapper, 

1489 ) 

1490 

1491 polymorphic_instances = util.PopulateDict(configure_subclass_mapper) 

1492 

1493 getter = result._getter(polymorphic_on) 

1494 

1495 def polymorphic_instance(row): 

1496 discriminator = getter(row) 

1497 if discriminator is not None: 

1498 _instance = polymorphic_instances[discriminator] 

1499 if _instance: 

1500 return _instance(row) 

1501 elif _instance is False: 

1502 identitykey = ensure_no_pk(row) 

1503 

1504 if identitykey: 

1505 raise sa_exc.InvalidRequestError( 

1506 "Row with identity key %s can't be loaded into an " 

1507 "object; the polymorphic discriminator column '%s' " 

1508 "refers to %s, which is not a sub-mapper of " 

1509 "the requested %s" 

1510 % ( 

1511 identitykey, 

1512 polymorphic_on, 

1513 mapper.polymorphic_map[discriminator], 

1514 mapper, 

1515 ) 

1516 ) 

1517 else: 

1518 return None 

1519 else: 

1520 return instance_fn(row) 

1521 else: 

1522 identitykey = ensure_no_pk(row) 

1523 

1524 if identitykey: 

1525 raise sa_exc.InvalidRequestError( 

1526 "Row with identity key %s can't be loaded into an " 

1527 "object; the polymorphic discriminator column '%s' is " 

1528 "NULL" % (identitykey, polymorphic_on) 

1529 ) 

1530 else: 

1531 return None 

1532 

1533 return polymorphic_instance 

1534 

1535 

1536class _PostLoad: 

1537 """Track loaders and states for "post load" operations.""" 

1538 

1539 __slots__ = "loaders", "states", "load_keys" 

1540 

1541 def __init__(self): 

1542 self.loaders = {} 

1543 self.states = util.OrderedDict() 

1544 self.load_keys = None 

1545 

1546 def add_state(self, state, overwrite): 

1547 # the states for a polymorphic load here are all shared 

1548 # within a single PostLoad object among multiple subtypes. 

1549 # Filtering of callables on a per-subclass basis needs to be done at 

1550 # the invocation level 

1551 self.states[state] = overwrite 

1552 

1553 def invoke(self, context, path): 

1554 if not self.states: 

1555 return 

1556 path = path_registry.PathRegistry.coerce(path) 

1557 for ( 

1558 effective_context, 

1559 token, 

1560 limit_to_mapper, 

1561 loader, 

1562 arg, 

1563 kw, 

1564 ) in self.loaders.values(): 

1565 states = [ 

1566 (state, overwrite) 

1567 for state, overwrite in self.states.items() 

1568 if state.manager.mapper.isa(limit_to_mapper) 

1569 ] 

1570 if states: 

1571 loader( 

1572 effective_context, path, states, self.load_keys, *arg, **kw 

1573 ) 

1574 self.states.clear() 

1575 

1576 @classmethod 

1577 def for_context(cls, context, path, only_load_props): 

1578 pl = context.post_load_paths.get(path.path) 

1579 if pl is not None and only_load_props: 

1580 pl.load_keys = only_load_props 

1581 return pl 

1582 

1583 @classmethod 

1584 def path_exists(self, context, path, key): 

1585 return ( 

1586 path.path in context.post_load_paths 

1587 and key in context.post_load_paths[path.path].loaders 

1588 ) 

1589 

1590 @classmethod 

1591 def callable_for_path( 

1592 cls, context, path, limit_to_mapper, token, loader_callable, *arg, **kw 

1593 ): 

1594 if path.path in context.post_load_paths: 

1595 pl = context.post_load_paths[path.path] 

1596 else: 

1597 pl = context.post_load_paths[path.path] = _PostLoad() 

1598 pl.loaders[token] = ( 

1599 context, 

1600 token, 

1601 limit_to_mapper, 

1602 loader_callable, 

1603 arg, 

1604 kw, 

1605 ) 

1606 

1607 

1608def _load_scalar_attributes(mapper, state, attribute_names, passive): 

1609 """initiate a column-based attribute refresh operation.""" 

1610 

1611 # assert mapper is _state_mapper(state) 

1612 session = state.session 

1613 if not session: 

1614 raise orm_exc.DetachedInstanceError( 

1615 "Instance %s is not bound to a Session; " 

1616 "attribute refresh operation cannot proceed" % (state_str(state)) 

1617 ) 

1618 

1619 no_autoflush = bool(passive & attributes.NO_AUTOFLUSH) 

1620 

1621 # in the case of inheritance, particularly concrete and abstract 

1622 # concrete inheritance, the class manager might have some keys 

1623 # of attributes on the superclass that we didn't actually map. 

1624 # These could be mapped as "concrete, don't load" or could be completely 

1625 # excluded from the mapping and we know nothing about them. Filter them 

1626 # here to prevent them from coming through. 

1627 if attribute_names: 

1628 attribute_names = attribute_names.intersection(mapper.attrs.keys()) 

1629 

1630 if mapper.inherits and not mapper.concrete: 

1631 # load based on committed attributes in the object, formed into 

1632 # a truncated SELECT that only includes relevant tables. does not 

1633 # currently use state.key 

1634 statement = mapper._optimized_get_statement(state, attribute_names) 

1635 if statement is not None: 

1636 # undefer() isn't needed here because statement has the 

1637 # columns needed already, this implicitly undefers that column 

1638 stmt = FromStatement(mapper, statement) 

1639 

1640 return _load_on_ident( 

1641 session, 

1642 stmt, 

1643 None, 

1644 only_load_props=attribute_names, 

1645 refresh_state=state, 

1646 no_autoflush=no_autoflush, 

1647 ) 

1648 

1649 # normal load, use state.key as the identity to SELECT 

1650 has_key = bool(state.key) 

1651 

1652 if has_key: 

1653 identity_key = state.key 

1654 else: 

1655 # this codepath is rare - only valid when inside a flush, and the 

1656 # object is becoming persistent but hasn't yet been assigned 

1657 # an identity_key. 

1658 # check here to ensure we have the attrs we need. 

1659 pk_attrs = [ 

1660 mapper._columntoproperty[col].key for col in mapper.primary_key 

1661 ] 

1662 if state.expired_attributes.intersection(pk_attrs): 

1663 raise sa_exc.InvalidRequestError( 

1664 "Instance %s cannot be refreshed - it's not " 

1665 " persistent and does not " 

1666 "contain a full primary key." % state_str(state) 

1667 ) 

1668 identity_key = mapper._identity_key_from_state(state) 

1669 

1670 if ( 

1671 _none_set.issubset(identity_key) and not mapper.allow_partial_pks 

1672 ) or _none_set.issuperset(identity_key): 

1673 util.warn_limited( 

1674 "Instance %s to be refreshed doesn't " 

1675 "contain a full primary key - can't be refreshed " 

1676 "(and shouldn't be expired, either).", 

1677 state_str(state), 

1678 ) 

1679 return 

1680 

1681 result = _load_on_ident( 

1682 session, 

1683 select(mapper), 

1684 identity_key, 

1685 refresh_state=state, 

1686 only_load_props=attribute_names, 

1687 no_autoflush=no_autoflush, 

1688 ) 

1689 

1690 # if instance is pending, a refresh operation 

1691 # may not complete (even if PK attributes are assigned) 

1692 if has_key and result is None: 

1693 raise orm_exc.ObjectDeletedError(state)