Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/loading.py: 11%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

659 statements  

1# orm/loading.py 

2# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: ignore-errors 

8 

9 

10"""private module containing functions used to convert database 

11rows into object instances and associated state. 

12 

13the functions here are called primarily by Query, Mapper, 

14as well as some of the attribute loading strategies. 

15 

16""" 

17 

18from __future__ import annotations 

19 

20from typing import Any 

21from typing import Dict 

22from typing import Iterable 

23from typing import List 

24from typing import Mapping 

25from typing import Optional 

26from typing import Sequence 

27from typing import Tuple 

28from typing import TYPE_CHECKING 

29from typing import TypeVar 

30from typing import Union 

31 

32from . import attributes 

33from . import exc as orm_exc 

34from . import path_registry 

35from .base import _DEFER_FOR_STATE 

36from .base import _RAISE_FOR_STATE 

37from .base import _SET_DEFERRED_EXPIRED 

38from .base import PassiveFlag 

39from .context import FromStatement 

40from .context import ORMCompileState 

41from .context import QueryContext 

42from .util import _none_set 

43from .util import state_str 

44from .. import exc as sa_exc 

45from .. import util 

46from ..engine import result_tuple 

47from ..engine.result import ChunkedIteratorResult 

48from ..engine.result import FrozenResult 

49from ..engine.result import SimpleResultMetaData 

50from ..sql import select 

51from ..sql import util as sql_util 

52from ..sql.selectable import ForUpdateArg 

53from ..sql.selectable import LABEL_STYLE_TABLENAME_PLUS_COL 

54from ..sql.selectable import SelectState 

55from ..util import EMPTY_DICT 

56from ..util.typing import TupleAny 

57from ..util.typing import Unpack 

58 

59if TYPE_CHECKING: 

60 from ._typing import _IdentityKeyType 

61 from .base import LoaderCallableStatus 

62 from .interfaces import ORMOption 

63 from .mapper import Mapper 

64 from .query import Query 

65 from .session import Session 

66 from .state import InstanceState 

67 from ..engine.cursor import CursorResult 

68 from ..engine.interfaces import _ExecuteOptions 

69 from ..engine.result import Result 

70 from ..sql import Select 

71 

72_T = TypeVar("_T", bound=Any) 

73_O = TypeVar("_O", bound=object) 

74_new_runid = util.counter() 

75 

76 

77_PopulatorDict = Dict[str, List[Tuple[str, Any]]] 

78 

79 

80def instances( 

81 cursor: CursorResult[Unpack[TupleAny]], context: QueryContext 

82) -> Result[Unpack[TupleAny]]: 

83 """Return a :class:`.Result` given an ORM query context. 

84 

85 :param cursor: a :class:`.CursorResult`, generated by a statement 

86 which came from :class:`.ORMCompileState` 

87 

88 :param context: a :class:`.QueryContext` object 

89 

90 :return: a :class:`.Result` object representing ORM results 

91 

92 .. versionchanged:: 1.4 The instances() function now uses 

93 :class:`.Result` objects and has an all new interface. 

94 

95 """ 

96 

97 context.runid = _new_runid() 

98 

99 if context.top_level_context: 

100 is_top_level = False 

101 context.post_load_paths = context.top_level_context.post_load_paths 

102 else: 

103 is_top_level = True 

104 context.post_load_paths = {} 

105 

106 compile_state = context.compile_state 

107 filtered = compile_state._has_mapper_entities 

108 single_entity = ( 

109 not context.load_options._only_return_tuples 

110 and len(compile_state._entities) == 1 

111 and compile_state._entities[0].supports_single_entity 

112 ) 

113 

114 try: 

115 (process, labels, extra) = list( 

116 zip( 

117 *[ 

118 query_entity.row_processor(context, cursor) 

119 for query_entity in context.compile_state._entities 

120 ] 

121 ) 

122 ) 

123 

124 if context.yield_per and ( 

125 context.loaders_require_buffering 

126 or context.loaders_require_uniquing 

127 ): 

128 raise sa_exc.InvalidRequestError( 

129 "Can't use yield_per with eager loaders that require uniquing " 

130 "or row buffering, e.g. joinedload() against collections " 

131 "or subqueryload(). Consider the selectinload() strategy " 

132 "for better flexibility in loading objects." 

133 ) 

134 

135 except Exception: 

136 with util.safe_reraise(): 

137 cursor.close() 

138 

139 def _no_unique(entry): 

140 raise sa_exc.InvalidRequestError( 

141 "Can't use the ORM yield_per feature in conjunction with unique()" 

142 ) 

143 

144 def _not_hashable(datatype, *, legacy=False, uncertain=False): 

145 if not legacy: 

146 

147 def go(obj): 

148 if uncertain: 

149 try: 

150 return hash(obj) 

151 except: 

152 pass 

153 

154 raise sa_exc.InvalidRequestError( 

155 "Can't apply uniqueness to row tuple containing value of " 

156 f"""type {datatype!r}; { 

157 'the values returned appear to be' 

158 if uncertain 

159 else 'this datatype produces' 

160 } non-hashable values""" 

161 ) 

162 

163 return go 

164 elif not uncertain: 

165 return id 

166 else: 

167 _use_id = False 

168 

169 def go(obj): 

170 nonlocal _use_id 

171 

172 if not _use_id: 

173 try: 

174 return hash(obj) 

175 except: 

176 pass 

177 

178 # in #10459, we considered using a warning here, however 

179 # as legacy query uses result.unique() in all cases, this 

180 # would lead to too many warning cases. 

181 _use_id = True 

182 

183 return id(obj) 

184 

185 return go 

186 

187 unique_filters = [ 

188 ( 

189 _no_unique 

190 if context.yield_per 

191 else ( 

192 _not_hashable( 

193 ent.column.type, # type: ignore 

194 legacy=context.load_options._legacy_uniquing, 

195 uncertain=ent._null_column_type, 

196 ) 

197 if ( 

198 not ent.use_id_for_hash 

199 and (ent._non_hashable_value or ent._null_column_type) 

200 ) 

201 else id if ent.use_id_for_hash else None 

202 ) 

203 ) 

204 for ent in context.compile_state._entities 

205 ] 

206 

207 row_metadata = SimpleResultMetaData( 

208 labels, extra, _unique_filters=unique_filters 

209 ) 

210 

211 def chunks(size): # type: ignore 

212 while True: 

213 yield_per = size 

214 

215 context.partials = {} 

216 

217 if yield_per: 

218 fetch = cursor.fetchmany(yield_per) 

219 

220 if not fetch: 

221 break 

222 else: 

223 fetch = cursor._raw_all_rows() 

224 

225 if single_entity: 

226 proc = process[0] 

227 rows = [proc(row) for row in fetch] 

228 else: 

229 rows = [ 

230 tuple([proc(row) for proc in process]) for row in fetch 

231 ] 

232 

233 # if we are the originating load from a query, meaning we 

234 # aren't being called as a result of a nested "post load", 

235 # iterate through all the collected post loaders and fire them 

236 # off. Previously this used to work recursively, however that 

237 # prevented deeply nested structures from being loadable 

238 if is_top_level: 

239 if yield_per: 

240 # if using yield per, memoize the state of the 

241 # collection so that it can be restored 

242 top_level_post_loads = list( 

243 context.post_load_paths.items() 

244 ) 

245 

246 while context.post_load_paths: 

247 post_loads = list(context.post_load_paths.items()) 

248 context.post_load_paths.clear() 

249 for path, post_load in post_loads: 

250 post_load.invoke(context, path) 

251 

252 if yield_per: 

253 context.post_load_paths.clear() 

254 context.post_load_paths.update(top_level_post_loads) 

255 

256 yield rows 

257 

258 if not yield_per: 

259 break 

260 

261 if context.execution_options.get("prebuffer_rows", False): 

262 # this is a bit of a hack at the moment. 

263 # I would rather have some option in the result to pre-buffer 

264 # internally. 

265 _prebuffered = list(chunks(None)) 

266 

267 def chunks(size): 

268 return iter(_prebuffered) 

269 

270 result = ChunkedIteratorResult( 

271 row_metadata, 

272 chunks, 

273 source_supports_scalars=single_entity, 

274 raw=cursor, 

275 dynamic_yield_per=cursor.context._is_server_side, 

276 ) 

277 

278 # filtered and single_entity are used to indicate to legacy Query that the 

279 # query has ORM entities, so legacy deduping and scalars should be called 

280 # on the result. 

281 result._attributes = result._attributes.union( 

282 dict(filtered=filtered, is_single_entity=single_entity) 

283 ) 

284 

285 # multi_row_eager_loaders OTOH is specific to joinedload. 

286 if context.compile_state.multi_row_eager_loaders: 

287 

288 def require_unique(obj): 

289 raise sa_exc.InvalidRequestError( 

290 "The unique() method must be invoked on this Result, " 

291 "as it contains results that include joined eager loads " 

292 "against collections" 

293 ) 

294 

295 result._unique_filter_state = (None, require_unique) 

296 

297 if context.yield_per: 

298 result.yield_per(context.yield_per) 

299 

300 return result 

301 

302 

303@util.preload_module("sqlalchemy.orm.context") 

304def merge_frozen_result(session, statement, frozen_result, load=True): 

305 """Merge a :class:`_engine.FrozenResult` back into a :class:`_orm.Session`, 

306 returning a new :class:`_engine.Result` object with :term:`persistent` 

307 objects. 

308 

309 See the section :ref:`do_orm_execute_re_executing` for an example. 

310 

311 .. seealso:: 

312 

313 :ref:`do_orm_execute_re_executing` 

314 

315 :meth:`_engine.Result.freeze` 

316 

317 :class:`_engine.FrozenResult` 

318 

319 """ 

320 querycontext = util.preloaded.orm_context 

321 

322 if load: 

323 # flush current contents if we expect to load data 

324 session._autoflush() 

325 

326 ctx = querycontext.ORMSelectCompileState._create_entities_collection( 

327 statement, legacy=False 

328 ) 

329 

330 autoflush = session.autoflush 

331 try: 

332 session.autoflush = False 

333 mapped_entities = [ 

334 i 

335 for i, e in enumerate(ctx._entities) 

336 if isinstance(e, querycontext._MapperEntity) 

337 ] 

338 keys = [ent._label_name for ent in ctx._entities] 

339 

340 keyed_tuple = result_tuple( 

341 keys, [ent._extra_entities for ent in ctx._entities] 

342 ) 

343 

344 result = [] 

345 for newrow in frozen_result.rewrite_rows(): 

346 for i in mapped_entities: 

347 if newrow[i] is not None: 

348 newrow[i] = session._merge( 

349 attributes.instance_state(newrow[i]), 

350 attributes.instance_dict(newrow[i]), 

351 load=load, 

352 _recursive={}, 

353 _resolve_conflict_map={}, 

354 ) 

355 

356 result.append(keyed_tuple(newrow)) 

357 

358 return frozen_result.with_new_rows(result) 

359 finally: 

360 session.autoflush = autoflush 

361 

362 

363@util.became_legacy_20( 

364 ":func:`_orm.merge_result`", 

365 alternative="The function as well as the method on :class:`_orm.Query` " 

366 "is superseded by the :func:`_orm.merge_frozen_result` function.", 

367) 

368@util.preload_module("sqlalchemy.orm.context") 

369def merge_result( 

370 query: Query[Any], 

371 iterator: Union[FrozenResult, Iterable[Sequence[Any]], Iterable[object]], 

372 load: bool = True, 

373) -> Union[FrozenResult, Iterable[Any]]: 

374 """Merge a result into the given :class:`.Query` object's Session. 

375 

376 See :meth:`_orm.Query.merge_result` for top-level documentation on this 

377 function. 

378 

379 """ 

380 

381 querycontext = util.preloaded.orm_context 

382 

383 session = query.session 

384 if load: 

385 # flush current contents if we expect to load data 

386 session._autoflush() 

387 

388 # TODO: need test coverage and documentation for the FrozenResult 

389 # use case. 

390 if isinstance(iterator, FrozenResult): 

391 frozen_result = iterator 

392 iterator = iter(frozen_result.data) 

393 else: 

394 frozen_result = None 

395 

396 ctx = querycontext.ORMSelectCompileState._create_entities_collection( 

397 query, legacy=True 

398 ) 

399 

400 autoflush = session.autoflush 

401 try: 

402 session.autoflush = False 

403 single_entity = not frozen_result and len(ctx._entities) == 1 

404 

405 if single_entity: 

406 if isinstance(ctx._entities[0], querycontext._MapperEntity): 

407 result = [ 

408 session._merge( 

409 attributes.instance_state(instance), 

410 attributes.instance_dict(instance), 

411 load=load, 

412 _recursive={}, 

413 _resolve_conflict_map={}, 

414 ) 

415 for instance in iterator 

416 ] 

417 else: 

418 result = list(iterator) 

419 else: 

420 mapped_entities = [ 

421 i 

422 for i, e in enumerate(ctx._entities) 

423 if isinstance(e, querycontext._MapperEntity) 

424 ] 

425 result = [] 

426 keys = [ent._label_name for ent in ctx._entities] 

427 

428 keyed_tuple = result_tuple( 

429 keys, [ent._extra_entities for ent in ctx._entities] 

430 ) 

431 

432 for row in iterator: 

433 newrow = list(row) 

434 for i in mapped_entities: 

435 if newrow[i] is not None: 

436 newrow[i] = session._merge( 

437 attributes.instance_state(newrow[i]), 

438 attributes.instance_dict(newrow[i]), 

439 load=load, 

440 _recursive={}, 

441 _resolve_conflict_map={}, 

442 ) 

443 result.append(keyed_tuple(newrow)) 

444 

445 if frozen_result: 

446 return frozen_result.with_new_rows(result) 

447 else: 

448 return iter(result) 

449 finally: 

450 session.autoflush = autoflush 

451 

452 

453def get_from_identity( 

454 session: Session, 

455 mapper: Mapper[_O], 

456 key: _IdentityKeyType[_O], 

457 passive: PassiveFlag, 

458) -> Union[LoaderCallableStatus, Optional[_O]]: 

459 """Look up the given key in the given session's identity map, 

460 check the object for expired state if found. 

461 

462 """ 

463 instance = session.identity_map.get(key) 

464 if instance is not None: 

465 state = attributes.instance_state(instance) 

466 

467 if mapper.inherits and not state.mapper.isa(mapper): 

468 return attributes.PASSIVE_CLASS_MISMATCH 

469 

470 # expired - ensure it still exists 

471 if state.expired: 

472 if not passive & attributes.SQL_OK: 

473 # TODO: no coverage here 

474 return attributes.PASSIVE_NO_RESULT 

475 elif not passive & attributes.RELATED_OBJECT_OK: 

476 # this mode is used within a flush and the instance's 

477 # expired state will be checked soon enough, if necessary. 

478 # also used by immediateloader for a mutually-dependent 

479 # o2m->m2m load, :ticket:`6301` 

480 return instance 

481 try: 

482 state._load_expired(state, passive) 

483 except orm_exc.ObjectDeletedError: 

484 session._remove_newly_deleted([state]) 

485 return None 

486 return instance 

487 else: 

488 return None 

489 

490 

491def load_on_ident( 

492 session: Session, 

493 statement: Union[Select, FromStatement], 

494 key: Optional[_IdentityKeyType], 

495 *, 

496 load_options: Optional[Sequence[ORMOption]] = None, 

497 refresh_state: Optional[InstanceState[Any]] = None, 

498 with_for_update: Optional[ForUpdateArg] = None, 

499 only_load_props: Optional[Iterable[str]] = None, 

500 no_autoflush: bool = False, 

501 bind_arguments: Mapping[str, Any] = util.EMPTY_DICT, 

502 execution_options: _ExecuteOptions = util.EMPTY_DICT, 

503 require_pk_cols: bool = False, 

504 is_user_refresh: bool = False, 

505): 

506 """Load the given identity key from the database.""" 

507 if key is not None: 

508 ident = key[1] 

509 identity_token = key[2] 

510 else: 

511 ident = identity_token = None 

512 

513 return load_on_pk_identity( 

514 session, 

515 statement, 

516 ident, 

517 load_options=load_options, 

518 refresh_state=refresh_state, 

519 with_for_update=with_for_update, 

520 only_load_props=only_load_props, 

521 identity_token=identity_token, 

522 no_autoflush=no_autoflush, 

523 bind_arguments=bind_arguments, 

524 execution_options=execution_options, 

525 require_pk_cols=require_pk_cols, 

526 is_user_refresh=is_user_refresh, 

527 ) 

528 

529 

530def load_on_pk_identity( 

531 session: Session, 

532 statement: Union[Select, FromStatement], 

533 primary_key_identity: Optional[Tuple[Any, ...]], 

534 *, 

535 load_options: Optional[Sequence[ORMOption]] = None, 

536 refresh_state: Optional[InstanceState[Any]] = None, 

537 with_for_update: Optional[ForUpdateArg] = None, 

538 only_load_props: Optional[Iterable[str]] = None, 

539 identity_token: Optional[Any] = None, 

540 no_autoflush: bool = False, 

541 bind_arguments: Mapping[str, Any] = util.EMPTY_DICT, 

542 execution_options: _ExecuteOptions = util.EMPTY_DICT, 

543 require_pk_cols: bool = False, 

544 is_user_refresh: bool = False, 

545): 

546 """Load the given primary key identity from the database.""" 

547 

548 query = statement 

549 q = query._clone() 

550 

551 assert not q._is_lambda_element 

552 

553 if load_options is None: 

554 load_options = QueryContext.default_load_options 

555 

556 if ( 

557 statement._compile_options 

558 is SelectState.default_select_compile_options 

559 ): 

560 compile_options = ORMCompileState.default_compile_options 

561 else: 

562 compile_options = statement._compile_options 

563 

564 if primary_key_identity is not None: 

565 mapper = query._propagate_attrs["plugin_subject"] 

566 

567 (_get_clause, _get_params) = mapper._get_clause 

568 

569 # None present in ident - turn those comparisons 

570 # into "IS NULL" 

571 if None in primary_key_identity: 

572 nones = { 

573 _get_params[col].key 

574 for col, value in zip(mapper.primary_key, primary_key_identity) 

575 if value is None 

576 } 

577 

578 _get_clause = sql_util.adapt_criterion_to_null(_get_clause, nones) 

579 

580 if len(nones) == len(primary_key_identity): 

581 util.warn( 

582 "fully NULL primary key identity cannot load any " 

583 "object. This condition may raise an error in a future " 

584 "release." 

585 ) 

586 

587 q._where_criteria = ( 

588 sql_util._deep_annotate(_get_clause, {"_orm_adapt": True}), 

589 ) 

590 

591 params = { 

592 _get_params[primary_key].key: id_val 

593 for id_val, primary_key in zip( 

594 primary_key_identity, mapper.primary_key 

595 ) 

596 } 

597 else: 

598 params = None 

599 

600 if with_for_update is not None: 

601 version_check = True 

602 q._for_update_arg = with_for_update 

603 elif query._for_update_arg is not None: 

604 version_check = True 

605 q._for_update_arg = query._for_update_arg 

606 else: 

607 version_check = False 

608 

609 if require_pk_cols and only_load_props: 

610 if not refresh_state: 

611 raise sa_exc.ArgumentError( 

612 "refresh_state is required when require_pk_cols is present" 

613 ) 

614 

615 refresh_state_prokeys = refresh_state.mapper._primary_key_propkeys 

616 has_changes = { 

617 key 

618 for key in refresh_state_prokeys.difference(only_load_props) 

619 if refresh_state.attrs[key].history.has_changes() 

620 } 

621 if has_changes: 

622 # raise if pending pk changes are present. 

623 # technically, this could be limited to the case where we have 

624 # relationships in the only_load_props collection to be refreshed 

625 # also (and only ones that have a secondary eager loader, at that). 

626 # however, the error is in place across the board so that behavior 

627 # here is easier to predict. The use case it prevents is one 

628 # of mutating PK attrs, leaving them unflushed, 

629 # calling session.refresh(), and expecting those attrs to remain 

630 # still unflushed. It seems likely someone doing all those 

631 # things would be better off having the PK attributes flushed 

632 # to the database before tinkering like that (session.refresh() is 

633 # tinkering). 

634 raise sa_exc.InvalidRequestError( 

635 f"Please flush pending primary key changes on " 

636 "attributes " 

637 f"{has_changes} for mapper {refresh_state.mapper} before " 

638 "proceeding with a refresh" 

639 ) 

640 

641 # overall, the ORM has no internal flow right now for "dont load the 

642 # primary row of an object at all, but fire off 

643 # selectinload/subqueryload/immediateload for some relationships". 

644 # It would probably be a pretty big effort to add such a flow. So 

645 # here, the case for #8703 is introduced; user asks to refresh some 

646 # relationship attributes only which are 

647 # selectinload/subqueryload/immediateload/ etc. (not joinedload). 

648 # ORM complains there's no columns in the primary row to load. 

649 # So here, we just add the PK cols if that 

650 # case is detected, so that there is a SELECT emitted for the primary 

651 # row. 

652 # 

653 # Let's just state right up front, for this one little case, 

654 # the ORM here is adding a whole extra SELECT just to satisfy 

655 # limitations in the internal flow. This is really not a thing 

656 # SQLAlchemy finds itself doing like, ever, obviously, we are 

657 # constantly working to *remove* SELECTs we don't need. We 

658 # rationalize this for now based on 1. session.refresh() is not 

659 # commonly used 2. session.refresh() with only relationship attrs is 

660 # even less commonly used 3. the SELECT in question is very low 

661 # latency. 

662 # 

663 # to add the flow to not include the SELECT, the quickest way 

664 # might be to just manufacture a single-row result set to send off to 

665 # instances(), but we'd have to weave that into context.py and all 

666 # that. For 2.0.0, we have enough big changes to navigate for now. 

667 # 

668 mp = refresh_state.mapper._props 

669 for p in only_load_props: 

670 if mp[p]._is_relationship: 

671 only_load_props = refresh_state_prokeys.union(only_load_props) 

672 break 

673 

674 if refresh_state and refresh_state.load_options: 

675 compile_options += {"_current_path": refresh_state.load_path.parent} 

676 q = q.options(*refresh_state.load_options) 

677 

678 new_compile_options, load_options = _set_get_options( 

679 compile_options, 

680 load_options, 

681 version_check=version_check, 

682 only_load_props=only_load_props, 

683 refresh_state=refresh_state, 

684 identity_token=identity_token, 

685 is_user_refresh=is_user_refresh, 

686 ) 

687 

688 q._compile_options = new_compile_options 

689 q._order_by = None 

690 

691 if no_autoflush: 

692 load_options += {"_autoflush": False} 

693 

694 execution_options = util.EMPTY_DICT.merge_with( 

695 execution_options, {"_sa_orm_load_options": load_options} 

696 ) 

697 result = ( 

698 session.execute( 

699 q, 

700 params=params, 

701 execution_options=execution_options, 

702 bind_arguments=bind_arguments, 

703 ) 

704 .unique() 

705 .scalars() 

706 ) 

707 

708 try: 

709 return result.one() 

710 except orm_exc.NoResultFound: 

711 return None 

712 

713 

714def _set_get_options( 

715 compile_opt, 

716 load_opt, 

717 populate_existing=None, 

718 version_check=None, 

719 only_load_props=None, 

720 refresh_state=None, 

721 identity_token=None, 

722 is_user_refresh=None, 

723): 

724 compile_options = {} 

725 load_options = {} 

726 if version_check: 

727 load_options["_version_check"] = version_check 

728 if populate_existing: 

729 load_options["_populate_existing"] = populate_existing 

730 if refresh_state: 

731 load_options["_refresh_state"] = refresh_state 

732 compile_options["_for_refresh_state"] = True 

733 if only_load_props: 

734 compile_options["_only_load_props"] = frozenset(only_load_props) 

735 if identity_token: 

736 load_options["_identity_token"] = identity_token 

737 

738 if is_user_refresh: 

739 load_options["_is_user_refresh"] = is_user_refresh 

740 if load_options: 

741 load_opt += load_options 

742 if compile_options: 

743 compile_opt += compile_options 

744 

745 return compile_opt, load_opt 

746 

747 

748def _setup_entity_query( 

749 compile_state, 

750 mapper, 

751 query_entity, 

752 path, 

753 adapter, 

754 column_collection, 

755 with_polymorphic=None, 

756 only_load_props=None, 

757 polymorphic_discriminator=None, 

758 **kw, 

759): 

760 if with_polymorphic: 

761 poly_properties = mapper._iterate_polymorphic_properties( 

762 with_polymorphic 

763 ) 

764 else: 

765 poly_properties = mapper._polymorphic_properties 

766 

767 quick_populators = {} 

768 

769 path.set(compile_state.attributes, "memoized_setups", quick_populators) 

770 

771 # for the lead entities in the path, e.g. not eager loads, and 

772 # assuming a user-passed aliased class, e.g. not a from_self() or any 

773 # implicit aliasing, don't add columns to the SELECT that aren't 

774 # in the thing that's aliased. 

775 check_for_adapt = adapter and len(path) == 1 and path[-1].is_aliased_class 

776 

777 for value in poly_properties: 

778 if only_load_props and value.key not in only_load_props: 

779 continue 

780 value.setup( 

781 compile_state, 

782 query_entity, 

783 path, 

784 adapter, 

785 only_load_props=only_load_props, 

786 column_collection=column_collection, 

787 memoized_populators=quick_populators, 

788 check_for_adapt=check_for_adapt, 

789 **kw, 

790 ) 

791 

792 if ( 

793 polymorphic_discriminator is not None 

794 and polymorphic_discriminator is not mapper.polymorphic_on 

795 ): 

796 if adapter: 

797 pd = adapter.columns[polymorphic_discriminator] 

798 else: 

799 pd = polymorphic_discriminator 

800 column_collection.append(pd) 

801 

802 

803def _warn_for_runid_changed(state): 

804 util.warn( 

805 "Loading context for %s has changed within a load/refresh " 

806 "handler, suggesting a row refresh operation took place. If this " 

807 "event handler is expected to be " 

808 "emitting row refresh operations within an existing load or refresh " 

809 "operation, set restore_load_context=True when establishing the " 

810 "listener to ensure the context remains unchanged when the event " 

811 "handler completes." % (state_str(state),) 

812 ) 

813 

814 

815def _instance_processor( 

816 query_entity, 

817 mapper, 

818 context, 

819 result, 

820 path, 

821 adapter, 

822 only_load_props=None, 

823 refresh_state=None, 

824 polymorphic_discriminator=None, 

825 _polymorphic_from=None, 

826): 

827 """Produce a mapper level row processor callable 

828 which processes rows into mapped instances.""" 

829 

830 # note that this method, most of which exists in a closure 

831 # called _instance(), resists being broken out, as 

832 # attempts to do so tend to add significant function 

833 # call overhead. _instance() is the most 

834 # performance-critical section in the whole ORM. 

835 

836 identity_class = mapper._identity_class 

837 compile_state = context.compile_state 

838 

839 # look for "row getter" functions that have been assigned along 

840 # with the compile state that were cached from a previous load. 

841 # these are operator.itemgetter() objects that each will extract a 

842 # particular column from each row. 

843 

844 getter_key = ("getters", mapper) 

845 getters = path.get(compile_state.attributes, getter_key, None) 

846 

847 if getters is None: 

848 # no getters, so go through a list of attributes we are loading for, 

849 # and the ones that are column based will have already put information 

850 # for us in another collection "memoized_setups", which represents the 

851 # output of the LoaderStrategy.setup_query() method. We can just as 

852 # easily call LoaderStrategy.create_row_processor for each, but by 

853 # getting it all at once from setup_query we save another method call 

854 # per attribute. 

855 props = mapper._prop_set 

856 if only_load_props is not None: 

857 props = props.intersection( 

858 mapper._props[k] for k in only_load_props 

859 ) 

860 

861 quick_populators = path.get( 

862 context.attributes, "memoized_setups", EMPTY_DICT 

863 ) 

864 

865 todo = [] 

866 cached_populators = { 

867 "new": [], 

868 "quick": [], 

869 "deferred": [], 

870 "expire": [], 

871 "existing": [], 

872 "eager": [], 

873 } 

874 

875 if refresh_state is None: 

876 # we can also get the "primary key" tuple getter function 

877 pk_cols = mapper.primary_key 

878 

879 if adapter: 

880 pk_cols = [adapter.columns[c] for c in pk_cols] 

881 primary_key_getter = result._tuple_getter(pk_cols) 

882 else: 

883 primary_key_getter = None 

884 

885 getters = { 

886 "cached_populators": cached_populators, 

887 "todo": todo, 

888 "primary_key_getter": primary_key_getter, 

889 } 

890 for prop in props: 

891 if prop in quick_populators: 

892 # this is an inlined path just for column-based attributes. 

893 col = quick_populators[prop] 

894 if col is _DEFER_FOR_STATE: 

895 cached_populators["new"].append( 

896 (prop.key, prop._deferred_column_loader) 

897 ) 

898 elif col is _SET_DEFERRED_EXPIRED: 

899 # note that in this path, we are no longer 

900 # searching in the result to see if the column might 

901 # be present in some unexpected way. 

902 cached_populators["expire"].append((prop.key, False)) 

903 elif col is _RAISE_FOR_STATE: 

904 cached_populators["new"].append( 

905 (prop.key, prop._raise_column_loader) 

906 ) 

907 else: 

908 getter = None 

909 if adapter: 

910 # this logic had been removed for all 1.4 releases 

911 # up until 1.4.18; the adapter here is particularly 

912 # the compound eager adapter which isn't accommodated 

913 # in the quick_populators right now. The "fallback" 

914 # logic below instead took over in many more cases 

915 # until issue #6596 was identified. 

916 

917 # note there is still an issue where this codepath 

918 # produces no "getter" for cases where a joined-inh 

919 # mapping includes a labeled column property, meaning 

920 # KeyError is caught internally and we fall back to 

921 # _getter(col), which works anyway. The adapter 

922 # here for joined inh without any aliasing might not 

923 # be useful. Tests which see this include 

924 # test.orm.inheritance.test_basic -> 

925 # EagerTargetingTest.test_adapt_stringency 

926 # OptimizedLoadTest.test_column_expression_joined 

927 # PolymorphicOnNotLocalTest.test_polymorphic_on_column_prop # noqa: E501 

928 # 

929 

930 adapted_col = adapter.columns[col] 

931 if adapted_col is not None: 

932 getter = result._getter(adapted_col, False) 

933 if not getter: 

934 getter = result._getter(col, False) 

935 if getter: 

936 cached_populators["quick"].append((prop.key, getter)) 

937 else: 

938 # fall back to the ColumnProperty itself, which 

939 # will iterate through all of its columns 

940 # to see if one fits 

941 prop.create_row_processor( 

942 context, 

943 query_entity, 

944 path, 

945 mapper, 

946 result, 

947 adapter, 

948 cached_populators, 

949 ) 

950 else: 

951 # loader strategies like subqueryload, selectinload, 

952 # joinedload, basically relationships, these need to interact 

953 # with the context each time to work correctly. 

954 todo.append(prop) 

955 

956 path.set(compile_state.attributes, getter_key, getters) 

957 

958 cached_populators = getters["cached_populators"] 

959 

960 populators = {key: list(value) for key, value in cached_populators.items()} 

961 for prop in getters["todo"]: 

962 prop.create_row_processor( 

963 context, query_entity, path, mapper, result, adapter, populators 

964 ) 

965 

966 propagated_loader_options = context.propagated_loader_options 

967 load_path = ( 

968 context.compile_state.current_path + path 

969 if context.compile_state.current_path.path 

970 else path 

971 ) 

972 

973 session_identity_map = context.session.identity_map 

974 

975 populate_existing = context.populate_existing or mapper.always_refresh 

976 load_evt = bool(mapper.class_manager.dispatch.load) 

977 refresh_evt = bool(mapper.class_manager.dispatch.refresh) 

978 persistent_evt = bool(context.session.dispatch.loaded_as_persistent) 

979 if persistent_evt: 

980 loaded_as_persistent = context.session.dispatch.loaded_as_persistent 

981 instance_state = attributes.instance_state 

982 instance_dict = attributes.instance_dict 

983 session_id = context.session.hash_key 

984 runid = context.runid 

985 identity_token = context.identity_token 

986 

987 version_check = context.version_check 

988 if version_check: 

989 version_id_col = mapper.version_id_col 

990 if version_id_col is not None: 

991 if adapter: 

992 version_id_col = adapter.columns[version_id_col] 

993 version_id_getter = result._getter(version_id_col) 

994 else: 

995 version_id_getter = None 

996 

997 if not refresh_state and _polymorphic_from is not None: 

998 key = ("loader", path.path) 

999 

1000 if key in context.attributes and context.attributes[key].strategy == ( 

1001 ("selectinload_polymorphic", True), 

1002 ): 

1003 option_entities = context.attributes[key].local_opts["entities"] 

1004 else: 

1005 option_entities = None 

1006 selectin_load_via = mapper._should_selectin_load( 

1007 option_entities, 

1008 _polymorphic_from, 

1009 ) 

1010 

1011 if selectin_load_via and selectin_load_via is not _polymorphic_from: 

1012 # only_load_props goes w/ refresh_state only, and in a refresh 

1013 # we are a single row query for the exact entity; polymorphic 

1014 # loading does not apply 

1015 assert only_load_props is None 

1016 

1017 if selectin_load_via.is_mapper: 

1018 _load_supers = [] 

1019 _endmost_mapper = selectin_load_via 

1020 while ( 

1021 _endmost_mapper 

1022 and _endmost_mapper is not _polymorphic_from 

1023 ): 

1024 _load_supers.append(_endmost_mapper) 

1025 _endmost_mapper = _endmost_mapper.inherits 

1026 else: 

1027 _load_supers = [selectin_load_via] 

1028 

1029 for _selectinload_entity in _load_supers: 

1030 if PostLoad.path_exists( 

1031 context, load_path, _selectinload_entity 

1032 ): 

1033 continue 

1034 callable_ = _load_subclass_via_in( 

1035 context, 

1036 path, 

1037 _selectinload_entity, 

1038 _polymorphic_from, 

1039 option_entities, 

1040 ) 

1041 PostLoad.callable_for_path( 

1042 context, 

1043 load_path, 

1044 _selectinload_entity.mapper, 

1045 _selectinload_entity, 

1046 callable_, 

1047 _selectinload_entity, 

1048 ) 

1049 

1050 post_load = PostLoad.for_context(context, load_path, only_load_props) 

1051 

1052 if refresh_state: 

1053 refresh_identity_key = refresh_state.key 

1054 if refresh_identity_key is None: 

1055 # super-rare condition; a refresh is being called 

1056 # on a non-instance-key instance; this is meant to only 

1057 # occur within a flush() 

1058 refresh_identity_key = mapper._identity_key_from_state( 

1059 refresh_state 

1060 ) 

1061 else: 

1062 refresh_identity_key = None 

1063 

1064 primary_key_getter = getters["primary_key_getter"] 

1065 

1066 if mapper.allow_partial_pks: 

1067 is_not_primary_key = _none_set.issuperset 

1068 else: 

1069 is_not_primary_key = _none_set.intersection 

1070 

1071 def _instance(row): 

1072 # determine the state that we'll be populating 

1073 if refresh_identity_key: 

1074 # fixed state that we're refreshing 

1075 state = refresh_state 

1076 instance = state.obj() 

1077 dict_ = instance_dict(instance) 

1078 isnew = state.runid != runid 

1079 currentload = True 

1080 loaded_instance = False 

1081 else: 

1082 # look at the row, see if that identity is in the 

1083 # session, or we have to create a new one 

1084 identitykey = ( 

1085 identity_class, 

1086 primary_key_getter(row), 

1087 identity_token, 

1088 ) 

1089 

1090 instance = session_identity_map.get(identitykey) 

1091 

1092 if instance is not None: 

1093 # existing instance 

1094 state = instance_state(instance) 

1095 dict_ = instance_dict(instance) 

1096 

1097 isnew = state.runid != runid 

1098 currentload = not isnew 

1099 loaded_instance = False 

1100 

1101 if version_check and version_id_getter and not currentload: 

1102 _validate_version_id( 

1103 mapper, state, dict_, row, version_id_getter 

1104 ) 

1105 

1106 else: 

1107 # create a new instance 

1108 

1109 # check for non-NULL values in the primary key columns, 

1110 # else no entity is returned for the row 

1111 if is_not_primary_key(identitykey[1]): 

1112 return None 

1113 

1114 isnew = True 

1115 currentload = True 

1116 loaded_instance = True 

1117 

1118 instance = mapper.class_manager.new_instance() 

1119 

1120 dict_ = instance_dict(instance) 

1121 state = instance_state(instance) 

1122 state.key = identitykey 

1123 state.identity_token = identity_token 

1124 

1125 # attach instance to session. 

1126 state.session_id = session_id 

1127 session_identity_map._add_unpresent(state, identitykey) 

1128 

1129 effective_populate_existing = populate_existing 

1130 if refresh_state is state: 

1131 effective_populate_existing = True 

1132 

1133 # populate. this looks at whether this state is new 

1134 # for this load or was existing, and whether or not this 

1135 # row is the first row with this identity. 

1136 if currentload or effective_populate_existing: 

1137 # full population routines. Objects here are either 

1138 # just created, or we are doing a populate_existing 

1139 

1140 # be conservative about setting load_path when populate_existing 

1141 # is in effect; want to maintain options from the original 

1142 # load. see test_expire->test_refresh_maintains_deferred_options 

1143 if isnew and ( 

1144 propagated_loader_options or not effective_populate_existing 

1145 ): 

1146 state.load_options = propagated_loader_options 

1147 state.load_path = load_path 

1148 

1149 _populate_full( 

1150 context, 

1151 row, 

1152 state, 

1153 dict_, 

1154 isnew, 

1155 load_path, 

1156 loaded_instance, 

1157 effective_populate_existing, 

1158 populators, 

1159 ) 

1160 

1161 if isnew: 

1162 # state.runid should be equal to context.runid / runid 

1163 # here, however for event checks we are being more conservative 

1164 # and checking against existing run id 

1165 # assert state.runid == runid 

1166 

1167 existing_runid = state.runid 

1168 

1169 if loaded_instance: 

1170 if load_evt: 

1171 state.manager.dispatch.load(state, context) 

1172 if state.runid != existing_runid: 

1173 _warn_for_runid_changed(state) 

1174 if persistent_evt: 

1175 loaded_as_persistent(context.session, state) 

1176 if state.runid != existing_runid: 

1177 _warn_for_runid_changed(state) 

1178 elif refresh_evt: 

1179 state.manager.dispatch.refresh( 

1180 state, context, only_load_props 

1181 ) 

1182 if state.runid != runid: 

1183 _warn_for_runid_changed(state) 

1184 

1185 if effective_populate_existing or state.modified: 

1186 if refresh_state and only_load_props: 

1187 state._commit(dict_, only_load_props) 

1188 else: 

1189 state._commit_all(dict_, session_identity_map) 

1190 

1191 if post_load: 

1192 post_load.add_state(state, True) 

1193 

1194 else: 

1195 # partial population routines, for objects that were already 

1196 # in the Session, but a row matches them; apply eager loaders 

1197 # on existing objects, etc. 

1198 unloaded = state.unloaded 

1199 isnew = state not in context.partials 

1200 

1201 if not isnew or unloaded or populators["eager"]: 

1202 # state is having a partial set of its attributes 

1203 # refreshed. Populate those attributes, 

1204 # and add to the "context.partials" collection. 

1205 

1206 to_load = _populate_partial( 

1207 context, 

1208 row, 

1209 state, 

1210 dict_, 

1211 isnew, 

1212 load_path, 

1213 unloaded, 

1214 populators, 

1215 ) 

1216 

1217 if isnew: 

1218 if refresh_evt: 

1219 existing_runid = state.runid 

1220 state.manager.dispatch.refresh(state, context, to_load) 

1221 if state.runid != existing_runid: 

1222 _warn_for_runid_changed(state) 

1223 

1224 state._commit(dict_, to_load) 

1225 

1226 if post_load and context.invoke_all_eagers: 

1227 post_load.add_state(state, False) 

1228 

1229 return instance 

1230 

1231 if mapper.polymorphic_map and not _polymorphic_from and not refresh_state: 

1232 # if we are doing polymorphic, dispatch to a different _instance() 

1233 # method specific to the subclass mapper 

1234 def ensure_no_pk(row): 

1235 identitykey = ( 

1236 identity_class, 

1237 primary_key_getter(row), 

1238 identity_token, 

1239 ) 

1240 if not is_not_primary_key(identitykey[1]): 

1241 return identitykey 

1242 else: 

1243 return None 

1244 

1245 _instance = _decorate_polymorphic_switch( 

1246 _instance, 

1247 context, 

1248 query_entity, 

1249 mapper, 

1250 result, 

1251 path, 

1252 polymorphic_discriminator, 

1253 adapter, 

1254 ensure_no_pk, 

1255 ) 

1256 

1257 return _instance 

1258 

1259 

1260def _load_subclass_via_in( 

1261 context, path, entity, polymorphic_from, option_entities 

1262): 

1263 mapper = entity.mapper 

1264 

1265 # TODO: polymorphic_from seems to be a Mapper in all cases. 

1266 # this is likely not needed, but as we dont have typing in loading.py 

1267 # yet, err on the safe side 

1268 polymorphic_from_mapper = polymorphic_from.mapper 

1269 not_against_basemost = polymorphic_from_mapper.inherits is not None 

1270 

1271 zero_idx = len(mapper.base_mapper.primary_key) == 1 

1272 

1273 if entity.is_aliased_class or not_against_basemost: 

1274 q, enable_opt, disable_opt = mapper._subclass_load_via_in( 

1275 entity, polymorphic_from 

1276 ) 

1277 else: 

1278 q, enable_opt, disable_opt = mapper._subclass_load_via_in_mapper 

1279 

1280 def do_load(context, path, states, load_only, effective_entity): 

1281 if not option_entities: 

1282 # filter out states for those that would have selectinloaded 

1283 # from another loader 

1284 # TODO: we are currently ignoring the case where the 

1285 # "selectin_polymorphic" option is used, as this is much more 

1286 # complex / specific / very uncommon API use 

1287 states = [ 

1288 (s, v) 

1289 for s, v in states 

1290 if s.mapper._would_selectin_load_only_from_given_mapper(mapper) 

1291 ] 

1292 

1293 if not states: 

1294 return 

1295 

1296 orig_query = context.query 

1297 

1298 if path.parent: 

1299 enable_opt_lcl = enable_opt._prepend_path(path) 

1300 disable_opt_lcl = disable_opt._prepend_path(path) 

1301 else: 

1302 enable_opt_lcl = enable_opt 

1303 disable_opt_lcl = disable_opt 

1304 options = ( 

1305 (enable_opt_lcl,) + orig_query._with_options + (disable_opt_lcl,) 

1306 ) 

1307 

1308 q2 = q.options(*options) 

1309 

1310 q2._compile_options = context.compile_state.default_compile_options 

1311 q2._compile_options += {"_current_path": path.parent} 

1312 

1313 if context.populate_existing: 

1314 q2 = q2.execution_options(populate_existing=True) 

1315 

1316 context.session.execute( 

1317 q2, 

1318 dict( 

1319 primary_keys=[ 

1320 state.key[1][0] if zero_idx else state.key[1] 

1321 for state, load_attrs in states 

1322 ] 

1323 ), 

1324 ).unique().scalars().all() 

1325 

1326 return do_load 

1327 

1328 

1329def _populate_full( 

1330 context, 

1331 row, 

1332 state, 

1333 dict_, 

1334 isnew, 

1335 load_path, 

1336 loaded_instance, 

1337 populate_existing, 

1338 populators, 

1339): 

1340 if isnew: 

1341 # first time we are seeing a row with this identity. 

1342 state.runid = context.runid 

1343 

1344 for key, getter in populators["quick"]: 

1345 dict_[key] = getter(row) 

1346 if populate_existing: 

1347 for key, set_callable in populators["expire"]: 

1348 dict_.pop(key, None) 

1349 if set_callable: 

1350 state.expired_attributes.add(key) 

1351 else: 

1352 for key, set_callable in populators["expire"]: 

1353 if set_callable: 

1354 state.expired_attributes.add(key) 

1355 

1356 for key, populator in populators["new"]: 

1357 populator(state, dict_, row) 

1358 

1359 elif load_path != state.load_path: 

1360 # new load path, e.g. object is present in more than one 

1361 # column position in a series of rows 

1362 state.load_path = load_path 

1363 

1364 # if we have data, and the data isn't in the dict, OK, let's put 

1365 # it in. 

1366 for key, getter in populators["quick"]: 

1367 if key not in dict_: 

1368 dict_[key] = getter(row) 

1369 

1370 # otherwise treat like an "already seen" row 

1371 for key, populator in populators["existing"]: 

1372 populator(state, dict_, row) 

1373 # TODO: allow "existing" populator to know this is 

1374 # a new path for the state: 

1375 # populator(state, dict_, row, new_path=True) 

1376 

1377 else: 

1378 # have already seen rows with this identity in this same path. 

1379 for key, populator in populators["existing"]: 

1380 populator(state, dict_, row) 

1381 

1382 # TODO: same path 

1383 # populator(state, dict_, row, new_path=False) 

1384 

1385 

1386def _populate_partial( 

1387 context, row, state, dict_, isnew, load_path, unloaded, populators 

1388): 

1389 if not isnew: 

1390 if unloaded: 

1391 # extra pass, see #8166 

1392 for key, getter in populators["quick"]: 

1393 if key in unloaded: 

1394 dict_[key] = getter(row) 

1395 

1396 to_load = context.partials[state] 

1397 for key, populator in populators["existing"]: 

1398 if key in to_load: 

1399 populator(state, dict_, row) 

1400 else: 

1401 to_load = unloaded 

1402 context.partials[state] = to_load 

1403 

1404 for key, getter in populators["quick"]: 

1405 if key in to_load: 

1406 dict_[key] = getter(row) 

1407 for key, set_callable in populators["expire"]: 

1408 if key in to_load: 

1409 dict_.pop(key, None) 

1410 if set_callable: 

1411 state.expired_attributes.add(key) 

1412 for key, populator in populators["new"]: 

1413 if key in to_load: 

1414 populator(state, dict_, row) 

1415 

1416 for key, populator in populators["eager"]: 

1417 if key not in unloaded: 

1418 populator(state, dict_, row) 

1419 

1420 return to_load 

1421 

1422 

1423def _validate_version_id(mapper, state, dict_, row, getter): 

1424 if mapper._get_state_attr_by_column( 

1425 state, dict_, mapper.version_id_col 

1426 ) != getter(row): 

1427 raise orm_exc.StaleDataError( 

1428 "Instance '%s' has version id '%s' which " 

1429 "does not match database-loaded version id '%s'." 

1430 % ( 

1431 state_str(state), 

1432 mapper._get_state_attr_by_column( 

1433 state, dict_, mapper.version_id_col 

1434 ), 

1435 getter(row), 

1436 ) 

1437 ) 

1438 

1439 

1440def _decorate_polymorphic_switch( 

1441 instance_fn, 

1442 context, 

1443 query_entity, 

1444 mapper, 

1445 result, 

1446 path, 

1447 polymorphic_discriminator, 

1448 adapter, 

1449 ensure_no_pk, 

1450): 

1451 if polymorphic_discriminator is not None: 

1452 polymorphic_on = polymorphic_discriminator 

1453 else: 

1454 polymorphic_on = mapper.polymorphic_on 

1455 if polymorphic_on is None: 

1456 return instance_fn 

1457 

1458 if adapter: 

1459 polymorphic_on = adapter.columns[polymorphic_on] 

1460 

1461 def configure_subclass_mapper(discriminator): 

1462 try: 

1463 sub_mapper = mapper.polymorphic_map[discriminator] 

1464 except KeyError: 

1465 raise AssertionError( 

1466 "No such polymorphic_identity %r is defined" % discriminator 

1467 ) 

1468 else: 

1469 if sub_mapper is mapper: 

1470 return None 

1471 elif not sub_mapper.isa(mapper): 

1472 return False 

1473 

1474 return _instance_processor( 

1475 query_entity, 

1476 sub_mapper, 

1477 context, 

1478 result, 

1479 path, 

1480 adapter, 

1481 _polymorphic_from=mapper, 

1482 ) 

1483 

1484 polymorphic_instances = util.PopulateDict(configure_subclass_mapper) 

1485 

1486 getter = result._getter(polymorphic_on) 

1487 

1488 def polymorphic_instance(row): 

1489 discriminator = getter(row) 

1490 if discriminator is not None: 

1491 _instance = polymorphic_instances[discriminator] 

1492 if _instance: 

1493 return _instance(row) 

1494 elif _instance is False: 

1495 identitykey = ensure_no_pk(row) 

1496 

1497 if identitykey: 

1498 raise sa_exc.InvalidRequestError( 

1499 "Row with identity key %s can't be loaded into an " 

1500 "object; the polymorphic discriminator column '%s' " 

1501 "refers to %s, which is not a sub-mapper of " 

1502 "the requested %s" 

1503 % ( 

1504 identitykey, 

1505 polymorphic_on, 

1506 mapper.polymorphic_map[discriminator], 

1507 mapper, 

1508 ) 

1509 ) 

1510 else: 

1511 return None 

1512 else: 

1513 return instance_fn(row) 

1514 else: 

1515 identitykey = ensure_no_pk(row) 

1516 

1517 if identitykey: 

1518 raise sa_exc.InvalidRequestError( 

1519 "Row with identity key %s can't be loaded into an " 

1520 "object; the polymorphic discriminator column '%s' is " 

1521 "NULL" % (identitykey, polymorphic_on) 

1522 ) 

1523 else: 

1524 return None 

1525 

1526 return polymorphic_instance 

1527 

1528 

1529class PostLoad: 

1530 """Track loaders and states for "post load" operations.""" 

1531 

1532 __slots__ = "loaders", "states", "load_keys" 

1533 

1534 def __init__(self): 

1535 self.loaders = {} 

1536 self.states = util.OrderedDict() 

1537 self.load_keys = None 

1538 

1539 def add_state(self, state, overwrite): 

1540 # the states for a polymorphic load here are all shared 

1541 # within a single PostLoad object among multiple subtypes. 

1542 # Filtering of callables on a per-subclass basis needs to be done at 

1543 # the invocation level 

1544 self.states[state] = overwrite 

1545 

1546 def invoke(self, context, path): 

1547 if not self.states: 

1548 return 

1549 path = path_registry.PathRegistry.coerce(path) 

1550 for ( 

1551 effective_context, 

1552 token, 

1553 limit_to_mapper, 

1554 loader, 

1555 arg, 

1556 kw, 

1557 ) in self.loaders.values(): 

1558 states = [ 

1559 (state, overwrite) 

1560 for state, overwrite in self.states.items() 

1561 if state.manager.mapper.isa(limit_to_mapper) 

1562 ] 

1563 if states: 

1564 loader( 

1565 effective_context, path, states, self.load_keys, *arg, **kw 

1566 ) 

1567 self.states.clear() 

1568 

1569 @classmethod 

1570 def for_context(cls, context, path, only_load_props): 

1571 pl = context.post_load_paths.get(path.path) 

1572 if pl is not None and only_load_props: 

1573 pl.load_keys = only_load_props 

1574 return pl 

1575 

1576 @classmethod 

1577 def path_exists(self, context, path, key): 

1578 return ( 

1579 path.path in context.post_load_paths 

1580 and key in context.post_load_paths[path.path].loaders 

1581 ) 

1582 

1583 @classmethod 

1584 def callable_for_path( 

1585 cls, context, path, limit_to_mapper, token, loader_callable, *arg, **kw 

1586 ): 

1587 if path.path in context.post_load_paths: 

1588 pl = context.post_load_paths[path.path] 

1589 else: 

1590 pl = context.post_load_paths[path.path] = PostLoad() 

1591 pl.loaders[token] = ( 

1592 context, 

1593 token, 

1594 limit_to_mapper, 

1595 loader_callable, 

1596 arg, 

1597 kw, 

1598 ) 

1599 

1600 

1601def load_scalar_attributes(mapper, state, attribute_names, passive): 

1602 """initiate a column-based attribute refresh operation.""" 

1603 

1604 # assert mapper is _state_mapper(state) 

1605 session = state.session 

1606 if not session: 

1607 raise orm_exc.DetachedInstanceError( 

1608 "Instance %s is not bound to a Session; " 

1609 "attribute refresh operation cannot proceed" % (state_str(state)) 

1610 ) 

1611 

1612 no_autoflush = bool(passive & attributes.NO_AUTOFLUSH) 

1613 

1614 # in the case of inheritance, particularly concrete and abstract 

1615 # concrete inheritance, the class manager might have some keys 

1616 # of attributes on the superclass that we didn't actually map. 

1617 # These could be mapped as "concrete, don't load" or could be completely 

1618 # excluded from the mapping and we know nothing about them. Filter them 

1619 # here to prevent them from coming through. 

1620 if attribute_names: 

1621 attribute_names = attribute_names.intersection(mapper.attrs.keys()) 

1622 

1623 if mapper.inherits and not mapper.concrete: 

1624 # load based on committed attributes in the object, formed into 

1625 # a truncated SELECT that only includes relevant tables. does not 

1626 # currently use state.key 

1627 statement = mapper._optimized_get_statement(state, attribute_names) 

1628 if statement is not None: 

1629 # undefer() isn't needed here because statement has the 

1630 # columns needed already, this implicitly undefers that column 

1631 stmt = FromStatement(mapper, statement) 

1632 

1633 return load_on_ident( 

1634 session, 

1635 stmt, 

1636 None, 

1637 only_load_props=attribute_names, 

1638 refresh_state=state, 

1639 no_autoflush=no_autoflush, 

1640 ) 

1641 

1642 # normal load, use state.key as the identity to SELECT 

1643 has_key = bool(state.key) 

1644 

1645 if has_key: 

1646 identity_key = state.key 

1647 else: 

1648 # this codepath is rare - only valid when inside a flush, and the 

1649 # object is becoming persistent but hasn't yet been assigned 

1650 # an identity_key. 

1651 # check here to ensure we have the attrs we need. 

1652 pk_attrs = [ 

1653 mapper._columntoproperty[col].key for col in mapper.primary_key 

1654 ] 

1655 if state.expired_attributes.intersection(pk_attrs): 

1656 raise sa_exc.InvalidRequestError( 

1657 "Instance %s cannot be refreshed - it's not " 

1658 " persistent and does not " 

1659 "contain a full primary key." % state_str(state) 

1660 ) 

1661 identity_key = mapper._identity_key_from_state(state) 

1662 

1663 if ( 

1664 _none_set.issubset(identity_key) and not mapper.allow_partial_pks 

1665 ) or _none_set.issuperset(identity_key): 

1666 util.warn_limited( 

1667 "Instance %s to be refreshed doesn't " 

1668 "contain a full primary key - can't be refreshed " 

1669 "(and shouldn't be expired, either).", 

1670 state_str(state), 

1671 ) 

1672 return 

1673 

1674 result = load_on_ident( 

1675 session, 

1676 select(mapper).set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL), 

1677 identity_key, 

1678 refresh_state=state, 

1679 only_load_props=attribute_names, 

1680 no_autoflush=no_autoflush, 

1681 ) 

1682 

1683 # if instance is pending, a refresh operation 

1684 # may not complete (even if PK attributes are assigned) 

1685 if has_key and result is None: 

1686 raise orm_exc.ObjectDeletedError(state)