Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/loading.py: 11%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

662 statements  

1# orm/loading.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: ignore-errors 

8 

9 

10"""private module containing functions used to convert database 

11rows into object instances and associated state. 

12 

13the functions here are called primarily by Query, Mapper, 

14as well as some of the attribute loading strategies. 

15 

16""" 

17 

18from __future__ import annotations 

19 

20from typing import Any 

21from typing import Dict 

22from typing import Iterable 

23from typing import List 

24from typing import Mapping 

25from typing import Optional 

26from typing import Sequence 

27from typing import Tuple 

28from typing import TYPE_CHECKING 

29from typing import TypeVar 

30from typing import Union 

31 

32from . import attributes 

33from . import exc as orm_exc 

34from . import path_registry 

35from .base import _DEFER_FOR_STATE 

36from .base import _RAISE_FOR_STATE 

37from .base import _SET_DEFERRED_EXPIRED 

38from .base import PassiveFlag 

39from .context import _ORMCompileState 

40from .context import FromStatement 

41from .context import QueryContext 

42from .strategies import _SelectInLoader 

43from .util import _none_set 

44from .util import state_str 

45from .. import exc as sa_exc 

46from .. import util 

47from ..engine import result_tuple 

48from ..engine.result import ChunkedIteratorResult 

49from ..engine.result import FrozenResult 

50from ..engine.result import SimpleResultMetaData 

51from ..sql import select 

52from ..sql import util as sql_util 

53from ..sql.selectable import ForUpdateArg 

54from ..sql.selectable import LABEL_STYLE_TABLENAME_PLUS_COL 

55from ..sql.selectable import SelectState 

56from ..util import EMPTY_DICT 

57from ..util.typing import TupleAny 

58from ..util.typing import Unpack 

59 

60if TYPE_CHECKING: 

61 from ._typing import _IdentityKeyType 

62 from .base import LoaderCallableStatus 

63 from .interfaces import ORMOption 

64 from .mapper import Mapper 

65 from .query import Query 

66 from .session import Session 

67 from .state import InstanceState 

68 from ..engine.cursor import CursorResult 

69 from ..engine.interfaces import _ExecuteOptions 

70 from ..engine.result import Result 

71 from ..sql import Select 

72 

73_T = TypeVar("_T", bound=Any) 

74_O = TypeVar("_O", bound=object) 

75_new_runid = util.counter() 

76 

77 

78_PopulatorDict = Dict[str, List[Tuple[str, Any]]] 

79 

80 

81def instances( 

82 cursor: CursorResult[Unpack[TupleAny]], context: QueryContext 

83) -> Result[Unpack[TupleAny]]: 

84 """Return a :class:`.Result` given an ORM query context. 

85 

86 :param cursor: a :class:`.CursorResult`, generated by a statement 

87 which came from :class:`.ORMCompileState` 

88 

89 :param context: a :class:`.QueryContext` object 

90 

91 :return: a :class:`.Result` object representing ORM results 

92 

93 .. versionchanged:: 1.4 The instances() function now uses 

94 :class:`.Result` objects and has an all new interface. 

95 

96 """ 

97 

98 context.runid = _new_runid() 

99 

100 if context.top_level_context: 

101 is_top_level = False 

102 context.post_load_paths = context.top_level_context.post_load_paths 

103 else: 

104 is_top_level = True 

105 context.post_load_paths = {} 

106 

107 compile_state = context.compile_state 

108 filtered = compile_state._has_mapper_entities 

109 single_entity = ( 

110 not context.load_options._only_return_tuples 

111 and len(compile_state._entities) == 1 

112 and compile_state._entities[0].supports_single_entity 

113 ) 

114 

115 try: 

116 (process, labels, extra) = list( 

117 zip( 

118 *[ 

119 query_entity.row_processor(context, cursor) 

120 for query_entity in context.compile_state._entities 

121 ] 

122 ) 

123 ) 

124 

125 if context.yield_per and ( 

126 context.loaders_require_buffering 

127 or context.loaders_require_uniquing 

128 ): 

129 raise sa_exc.InvalidRequestError( 

130 "Can't use yield_per with eager loaders that require uniquing " 

131 "or row buffering, e.g. joinedload() against collections " 

132 "or subqueryload(). Consider the selectinload() strategy " 

133 "for better flexibility in loading objects." 

134 ) 

135 

136 except Exception: 

137 with util.safe_reraise(): 

138 cursor.close() 

139 

140 def _no_unique(entry): 

141 raise sa_exc.InvalidRequestError( 

142 "Can't use the ORM yield_per feature in conjunction with unique()" 

143 ) 

144 

145 def _not_hashable(datatype, *, legacy=False, uncertain=False): 

146 if not legacy: 

147 

148 def go(obj): 

149 if uncertain: 

150 try: 

151 return hash(obj) 

152 except: 

153 pass 

154 

155 raise sa_exc.InvalidRequestError( 

156 "Can't apply uniqueness to row tuple containing value of " 

157 f"""type {datatype!r}; { 

158 'the values returned appear to be' 

159 if uncertain 

160 else 'this datatype produces' 

161 } non-hashable values""" 

162 ) 

163 

164 return go 

165 elif not uncertain: 

166 return id 

167 else: 

168 _use_id = False 

169 

170 def go(obj): 

171 nonlocal _use_id 

172 

173 if not _use_id: 

174 try: 

175 return hash(obj) 

176 except: 

177 pass 

178 

179 # in #10459, we considered using a warning here, however 

180 # as legacy query uses result.unique() in all cases, this 

181 # would lead to too many warning cases. 

182 _use_id = True 

183 

184 return id(obj) 

185 

186 return go 

187 

188 unique_filters = [ 

189 ( 

190 _no_unique 

191 if context.yield_per 

192 else ( 

193 _not_hashable( 

194 ent.column.type, # type: ignore 

195 legacy=context.load_options._legacy_uniquing, 

196 uncertain=ent._null_column_type, 

197 ) 

198 if ( 

199 not ent.use_id_for_hash 

200 and (ent._non_hashable_value or ent._null_column_type) 

201 ) 

202 else id if ent.use_id_for_hash else None 

203 ) 

204 ) 

205 for ent in context.compile_state._entities 

206 ] 

207 

208 row_metadata = SimpleResultMetaData( 

209 labels, extra, _unique_filters=unique_filters 

210 ) 

211 

212 def chunks(size): # type: ignore 

213 while True: 

214 yield_per = size 

215 

216 context.partials = {} 

217 

218 if yield_per: 

219 fetch = cursor.fetchmany(yield_per) 

220 

221 if not fetch: 

222 break 

223 else: 

224 fetch = cursor._raw_all_rows() 

225 

226 if single_entity: 

227 proc = process[0] 

228 rows = [proc(row) for row in fetch] 

229 else: 

230 rows = [ 

231 tuple([proc(row) for proc in process]) for row in fetch 

232 ] 

233 

234 # if we are the originating load from a query, meaning we 

235 # aren't being called as a result of a nested "post load", 

236 # iterate through all the collected post loaders and fire them 

237 # off. Previously this used to work recursively, however that 

238 # prevented deeply nested structures from being loadable 

239 if is_top_level: 

240 if yield_per: 

241 # if using yield per, memoize the state of the 

242 # collection so that it can be restored 

243 top_level_post_loads = list( 

244 context.post_load_paths.items() 

245 ) 

246 

247 while context.post_load_paths: 

248 post_loads = list(context.post_load_paths.items()) 

249 context.post_load_paths.clear() 

250 for path, post_load in post_loads: 

251 post_load.invoke(context, path) 

252 

253 if yield_per: 

254 context.post_load_paths.clear() 

255 context.post_load_paths.update(top_level_post_loads) 

256 

257 yield rows 

258 

259 if not yield_per: 

260 break 

261 

262 if context.execution_options.get("prebuffer_rows", False): 

263 # this is a bit of a hack at the moment. 

264 # I would rather have some option in the result to pre-buffer 

265 # internally. 

266 _prebuffered = list(chunks(None)) 

267 

268 def chunks(size): 

269 return iter(_prebuffered) 

270 

271 result = ChunkedIteratorResult( 

272 row_metadata, 

273 chunks, 

274 source_supports_scalars=single_entity, 

275 raw=cursor, 

276 dynamic_yield_per=cursor.context._is_server_side, 

277 ) 

278 

279 # filtered and single_entity are used to indicate to legacy Query that the 

280 # query has ORM entities, so legacy deduping and scalars should be called 

281 # on the result. 

282 result._attributes = result._attributes.union( 

283 dict(filtered=filtered, is_single_entity=single_entity) 

284 ) 

285 

286 # multi_row_eager_loaders OTOH is specific to joinedload. 

287 if context.compile_state.multi_row_eager_loaders: 

288 

289 def require_unique(obj): 

290 raise sa_exc.InvalidRequestError( 

291 "The unique() method must be invoked on this Result, " 

292 "as it contains results that include joined eager loads " 

293 "against collections" 

294 ) 

295 

296 result._unique_filter_state = (None, require_unique) 

297 

298 if context.yield_per: 

299 result.yield_per(context.yield_per) 

300 

301 return result 

302 

303 

304@util.preload_module("sqlalchemy.orm.context") 

305def merge_frozen_result(session, statement, frozen_result, load=True): 

306 """Merge a :class:`_engine.FrozenResult` back into a :class:`_orm.Session`, 

307 returning a new :class:`_engine.Result` object with :term:`persistent` 

308 objects. 

309 

310 See the section :ref:`do_orm_execute_re_executing` for an example. 

311 

312 .. seealso:: 

313 

314 :ref:`do_orm_execute_re_executing` 

315 

316 :meth:`_engine.Result.freeze` 

317 

318 :class:`_engine.FrozenResult` 

319 

320 """ 

321 querycontext = util.preloaded.orm_context 

322 

323 if load: 

324 # flush current contents if we expect to load data 

325 session._autoflush() 

326 

327 ctx = querycontext._ORMSelectCompileState._create_entities_collection( 

328 statement, legacy=False 

329 ) 

330 

331 with session.no_autoflush: 

332 mapped_entities = [ 

333 i 

334 for i, e in enumerate(ctx._entities) 

335 if isinstance(e, querycontext._MapperEntity) 

336 ] 

337 keys = [ent._label_name for ent in ctx._entities] 

338 

339 keyed_tuple = result_tuple( 

340 keys, [ent._extra_entities for ent in ctx._entities] 

341 ) 

342 

343 result = [] 

344 for newrow in frozen_result._rewrite_rows(): 

345 for i in mapped_entities: 

346 if newrow[i] is not None: 

347 newrow[i] = session._merge( 

348 attributes.instance_state(newrow[i]), 

349 attributes.instance_dict(newrow[i]), 

350 load=load, 

351 _recursive={}, 

352 _resolve_conflict_map={}, 

353 ) 

354 

355 result.append(keyed_tuple(newrow)) 

356 

357 return frozen_result.with_new_rows(result) 

358 

359 

360@util.became_legacy_20( 

361 ":func:`_orm.merge_result`", 

362 alternative="The function as well as the method on :class:`_orm.Query` " 

363 "is superseded by the :func:`_orm.merge_frozen_result` function.", 

364) 

365@util.preload_module("sqlalchemy.orm.context") 

366def merge_result( 

367 query: Query[Any], 

368 iterator: Union[FrozenResult, Iterable[Sequence[Any]], Iterable[object]], 

369 load: bool = True, 

370) -> Union[FrozenResult, Iterable[Any]]: 

371 """Merge a result into the given :class:`.Query` object's Session. 

372 

373 See :meth:`_orm.Query.merge_result` for top-level documentation on this 

374 function. 

375 

376 """ 

377 

378 querycontext = util.preloaded.orm_context 

379 

380 session = query.session 

381 if load: 

382 # flush current contents if we expect to load data 

383 session._autoflush() 

384 

385 # TODO: need test coverage and documentation for the FrozenResult 

386 # use case. 

387 if isinstance(iterator, FrozenResult): 

388 frozen_result = iterator 

389 iterator = iter(frozen_result.data) 

390 else: 

391 frozen_result = None 

392 

393 ctx = querycontext._ORMSelectCompileState._create_entities_collection( 

394 query, legacy=True 

395 ) 

396 

397 autoflush = session.autoflush 

398 try: 

399 session.autoflush = False 

400 single_entity = not frozen_result and len(ctx._entities) == 1 

401 

402 if single_entity: 

403 if isinstance(ctx._entities[0], querycontext._MapperEntity): 

404 result = [ 

405 session._merge( 

406 attributes.instance_state(instance), 

407 attributes.instance_dict(instance), 

408 load=load, 

409 _recursive={}, 

410 _resolve_conflict_map={}, 

411 ) 

412 for instance in iterator 

413 ] 

414 else: 

415 result = list(iterator) 

416 else: 

417 mapped_entities = [ 

418 i 

419 for i, e in enumerate(ctx._entities) 

420 if isinstance(e, querycontext._MapperEntity) 

421 ] 

422 result = [] 

423 keys = [ent._label_name for ent in ctx._entities] 

424 

425 keyed_tuple = result_tuple( 

426 keys, [ent._extra_entities for ent in ctx._entities] 

427 ) 

428 

429 for row in iterator: 

430 newrow = list(row) 

431 for i in mapped_entities: 

432 if newrow[i] is not None: 

433 newrow[i] = session._merge( 

434 attributes.instance_state(newrow[i]), 

435 attributes.instance_dict(newrow[i]), 

436 load=load, 

437 _recursive={}, 

438 _resolve_conflict_map={}, 

439 ) 

440 result.append(keyed_tuple(newrow)) 

441 

442 if frozen_result: 

443 return frozen_result.with_new_rows(result) 

444 else: 

445 return iter(result) 

446 finally: 

447 session.autoflush = autoflush 

448 

449 

450def get_from_identity( 

451 session: Session, 

452 mapper: Mapper[_O], 

453 key: _IdentityKeyType[_O], 

454 passive: PassiveFlag, 

455) -> Union[LoaderCallableStatus, Optional[_O]]: 

456 """Look up the given key in the given session's identity map, 

457 check the object for expired state if found. 

458 

459 """ 

460 instance = session.identity_map.get(key) 

461 if instance is not None: 

462 state = attributes.instance_state(instance) 

463 

464 if mapper.inherits and not state.mapper.isa(mapper): 

465 return attributes.PASSIVE_CLASS_MISMATCH 

466 

467 # expired - ensure it still exists 

468 if state.expired: 

469 if not passive & attributes.SQL_OK: 

470 # TODO: no coverage here 

471 return attributes.PASSIVE_NO_RESULT 

472 elif not passive & attributes.RELATED_OBJECT_OK: 

473 # this mode is used within a flush and the instance's 

474 # expired state will be checked soon enough, if necessary. 

475 # also used by immediateloader for a mutually-dependent 

476 # o2m->m2m load, :ticket:`6301` 

477 return instance 

478 try: 

479 state._load_expired(state, passive) 

480 except orm_exc.ObjectDeletedError: 

481 session._remove_newly_deleted([state]) 

482 return None 

483 return instance 

484 else: 

485 return None 

486 

487 

488def _load_on_ident( 

489 session: Session, 

490 statement: Union[Select, FromStatement], 

491 key: Optional[_IdentityKeyType], 

492 *, 

493 load_options: Optional[Sequence[ORMOption]] = None, 

494 refresh_state: Optional[InstanceState[Any]] = None, 

495 with_for_update: Optional[ForUpdateArg] = None, 

496 only_load_props: Optional[Iterable[str]] = None, 

497 no_autoflush: bool = False, 

498 bind_arguments: Mapping[str, Any] = util.EMPTY_DICT, 

499 execution_options: _ExecuteOptions = util.EMPTY_DICT, 

500 require_pk_cols: bool = False, 

501 is_user_refresh: bool = False, 

502): 

503 """Load the given identity key from the database.""" 

504 if key is not None: 

505 ident = key[1] 

506 identity_token = key[2] 

507 else: 

508 ident = identity_token = None 

509 

510 return _load_on_pk_identity( 

511 session, 

512 statement, 

513 ident, 

514 load_options=load_options, 

515 refresh_state=refresh_state, 

516 with_for_update=with_for_update, 

517 only_load_props=only_load_props, 

518 identity_token=identity_token, 

519 no_autoflush=no_autoflush, 

520 bind_arguments=bind_arguments, 

521 execution_options=execution_options, 

522 require_pk_cols=require_pk_cols, 

523 is_user_refresh=is_user_refresh, 

524 ) 

525 

526 

527def _load_on_pk_identity( 

528 session: Session, 

529 statement: Union[Select, FromStatement], 

530 primary_key_identity: Optional[Tuple[Any, ...]], 

531 *, 

532 load_options: Optional[Sequence[ORMOption]] = None, 

533 refresh_state: Optional[InstanceState[Any]] = None, 

534 with_for_update: Optional[ForUpdateArg] = None, 

535 only_load_props: Optional[Iterable[str]] = None, 

536 identity_token: Optional[Any] = None, 

537 no_autoflush: bool = False, 

538 bind_arguments: Mapping[str, Any] = util.EMPTY_DICT, 

539 execution_options: _ExecuteOptions = util.EMPTY_DICT, 

540 require_pk_cols: bool = False, 

541 is_user_refresh: bool = False, 

542): 

543 """Load the given primary key identity from the database.""" 

544 

545 query = statement 

546 q = query._clone() 

547 

548 assert not q._is_lambda_element 

549 

550 if load_options is None: 

551 load_options = QueryContext.default_load_options 

552 

553 if ( 

554 statement._compile_options 

555 is SelectState.default_select_compile_options 

556 ): 

557 compile_options = _ORMCompileState.default_compile_options 

558 else: 

559 compile_options = statement._compile_options 

560 

561 if primary_key_identity is not None: 

562 mapper = query._propagate_attrs["plugin_subject"] 

563 

564 (_get_clause, _get_params) = mapper._get_clause 

565 

566 # None present in ident - turn those comparisons 

567 # into "IS NULL" 

568 if None in primary_key_identity: 

569 nones = { 

570 _get_params[col].key 

571 for col, value in zip(mapper.primary_key, primary_key_identity) 

572 if value is None 

573 } 

574 

575 _get_clause = sql_util.adapt_criterion_to_null(_get_clause, nones) 

576 

577 if len(nones) == len(primary_key_identity): 

578 util.warn( 

579 "fully NULL primary key identity cannot load any " 

580 "object. This condition may raise an error in a future " 

581 "release." 

582 ) 

583 

584 q._where_criteria = (_get_clause,) 

585 

586 params = { 

587 _get_params[primary_key].key: id_val 

588 for id_val, primary_key in zip( 

589 primary_key_identity, mapper.primary_key 

590 ) 

591 } 

592 else: 

593 params = None 

594 

595 if with_for_update is not None: 

596 version_check = True 

597 q._for_update_arg = with_for_update 

598 elif query._for_update_arg is not None: 

599 version_check = True 

600 q._for_update_arg = query._for_update_arg 

601 else: 

602 version_check = False 

603 

604 if require_pk_cols and only_load_props: 

605 if not refresh_state: 

606 raise sa_exc.ArgumentError( 

607 "refresh_state is required when require_pk_cols is present" 

608 ) 

609 

610 refresh_state_prokeys = refresh_state.mapper._primary_key_propkeys 

611 has_changes = { 

612 key 

613 for key in refresh_state_prokeys.difference(only_load_props) 

614 if refresh_state.attrs[key].history.has_changes() 

615 } 

616 if has_changes: 

617 # raise if pending pk changes are present. 

618 # technically, this could be limited to the case where we have 

619 # relationships in the only_load_props collection to be refreshed 

620 # also (and only ones that have a secondary eager loader, at that). 

621 # however, the error is in place across the board so that behavior 

622 # here is easier to predict. The use case it prevents is one 

623 # of mutating PK attrs, leaving them unflushed, 

624 # calling session.refresh(), and expecting those attrs to remain 

625 # still unflushed. It seems likely someone doing all those 

626 # things would be better off having the PK attributes flushed 

627 # to the database before tinkering like that (session.refresh() is 

628 # tinkering). 

629 raise sa_exc.InvalidRequestError( 

630 f"Please flush pending primary key changes on " 

631 "attributes " 

632 f"{has_changes} for mapper {refresh_state.mapper} before " 

633 "proceeding with a refresh" 

634 ) 

635 

636 # overall, the ORM has no internal flow right now for "dont load the 

637 # primary row of an object at all, but fire off 

638 # selectinload/subqueryload/immediateload for some relationships". 

639 # It would probably be a pretty big effort to add such a flow. So 

640 # here, the case for #8703 is introduced; user asks to refresh some 

641 # relationship attributes only which are 

642 # selectinload/subqueryload/immediateload/ etc. (not joinedload). 

643 # ORM complains there's no columns in the primary row to load. 

644 # So here, we just add the PK cols if that 

645 # case is detected, so that there is a SELECT emitted for the primary 

646 # row. 

647 # 

648 # Let's just state right up front, for this one little case, 

649 # the ORM here is adding a whole extra SELECT just to satisfy 

650 # limitations in the internal flow. This is really not a thing 

651 # SQLAlchemy finds itself doing like, ever, obviously, we are 

652 # constantly working to *remove* SELECTs we don't need. We 

653 # rationalize this for now based on 1. session.refresh() is not 

654 # commonly used 2. session.refresh() with only relationship attrs is 

655 # even less commonly used 3. the SELECT in question is very low 

656 # latency. 

657 # 

658 # to add the flow to not include the SELECT, the quickest way 

659 # might be to just manufacture a single-row result set to send off to 

660 # instances(), but we'd have to weave that into context.py and all 

661 # that. For 2.0.0, we have enough big changes to navigate for now. 

662 # 

663 mp = refresh_state.mapper._props 

664 for p in only_load_props: 

665 if mp[p]._is_relationship: 

666 only_load_props = refresh_state_prokeys.union(only_load_props) 

667 break 

668 

669 if refresh_state and refresh_state.load_options: 

670 compile_options += {"_current_path": refresh_state.load_path.parent} 

671 q = q.options(*refresh_state.load_options) 

672 

673 new_compile_options, load_options = _set_get_options( 

674 compile_options, 

675 load_options, 

676 version_check=version_check, 

677 only_load_props=only_load_props, 

678 refresh_state=refresh_state, 

679 identity_token=identity_token, 

680 is_user_refresh=is_user_refresh, 

681 ) 

682 

683 q._compile_options = new_compile_options 

684 q._order_by = None 

685 

686 if no_autoflush: 

687 load_options += {"_autoflush": False} 

688 

689 execution_options = util.EMPTY_DICT.merge_with( 

690 execution_options, {"_sa_orm_load_options": load_options} 

691 ) 

692 result = ( 

693 session.execute( 

694 q, 

695 params=params, 

696 execution_options=execution_options, 

697 bind_arguments=bind_arguments, 

698 ) 

699 .unique() 

700 .scalars() 

701 ) 

702 

703 try: 

704 return result.one() 

705 except orm_exc.NoResultFound: 

706 return None 

707 

708 

709def _set_get_options( 

710 compile_opt, 

711 load_opt, 

712 populate_existing=None, 

713 version_check=None, 

714 only_load_props=None, 

715 refresh_state=None, 

716 identity_token=None, 

717 is_user_refresh=None, 

718): 

719 compile_options = {} 

720 load_options = {} 

721 if version_check: 

722 load_options["_version_check"] = version_check 

723 if populate_existing: 

724 load_options["_populate_existing"] = populate_existing 

725 if refresh_state: 

726 load_options["_refresh_state"] = refresh_state 

727 compile_options["_for_refresh_state"] = True 

728 if only_load_props: 

729 compile_options["_only_load_props"] = frozenset(only_load_props) 

730 if identity_token: 

731 load_options["_identity_token"] = identity_token 

732 

733 if is_user_refresh: 

734 load_options["_is_user_refresh"] = is_user_refresh 

735 if load_options: 

736 load_opt += load_options 

737 if compile_options: 

738 compile_opt += compile_options 

739 

740 return compile_opt, load_opt 

741 

742 

743def _setup_entity_query( 

744 compile_state, 

745 mapper, 

746 query_entity, 

747 path, 

748 adapter, 

749 column_collection, 

750 with_polymorphic=None, 

751 only_load_props=None, 

752 polymorphic_discriminator=None, 

753 **kw, 

754): 

755 if with_polymorphic: 

756 poly_properties = mapper._iterate_polymorphic_properties( 

757 with_polymorphic 

758 ) 

759 else: 

760 poly_properties = mapper._polymorphic_properties 

761 

762 quick_populators = {} 

763 

764 path.set(compile_state.attributes, "memoized_setups", quick_populators) 

765 

766 # for the lead entities in the path, e.g. not eager loads, and 

767 # assuming a user-passed aliased class, e.g. not a from_self() or any 

768 # implicit aliasing, don't add columns to the SELECT that aren't 

769 # in the thing that's aliased. 

770 check_for_adapt = adapter and len(path) == 1 and path[-1].is_aliased_class 

771 

772 for value in poly_properties: 

773 if only_load_props and value.key not in only_load_props: 

774 continue 

775 value.setup( 

776 compile_state, 

777 query_entity, 

778 path, 

779 adapter, 

780 only_load_props=only_load_props, 

781 column_collection=column_collection, 

782 memoized_populators=quick_populators, 

783 check_for_adapt=check_for_adapt, 

784 **kw, 

785 ) 

786 

787 if ( 

788 polymorphic_discriminator is not None 

789 and polymorphic_discriminator is not mapper.polymorphic_on 

790 ): 

791 if adapter: 

792 pd = adapter.columns[polymorphic_discriminator] 

793 else: 

794 pd = polymorphic_discriminator 

795 column_collection.append(pd) 

796 

797 

798def _warn_for_runid_changed(state): 

799 util.warn( 

800 "Loading context for %s has changed within a load/refresh " 

801 "handler, suggesting a row refresh operation took place. If this " 

802 "event handler is expected to be " 

803 "emitting row refresh operations within an existing load or refresh " 

804 "operation, set restore_load_context=True when establishing the " 

805 "listener to ensure the context remains unchanged when the event " 

806 "handler completes." % (state_str(state),) 

807 ) 

808 

809 

810def _instance_processor( 

811 query_entity, 

812 mapper, 

813 context, 

814 result, 

815 path, 

816 adapter, 

817 only_load_props=None, 

818 refresh_state=None, 

819 polymorphic_discriminator=None, 

820 _polymorphic_from=None, 

821): 

822 """Produce a mapper level row processor callable 

823 which processes rows into mapped instances.""" 

824 

825 # note that this method, most of which exists in a closure 

826 # called _instance(), resists being broken out, as 

827 # attempts to do so tend to add significant function 

828 # call overhead. _instance() is the most 

829 # performance-critical section in the whole ORM. 

830 

831 identity_class = mapper._identity_class 

832 compile_state = context.compile_state 

833 

834 # look for "row getter" functions that have been assigned along 

835 # with the compile state that were cached from a previous load. 

836 # these are operator.itemgetter() objects that each will extract a 

837 # particular column from each row. 

838 

839 getter_key = ("getters", mapper) 

840 getters = path.get(compile_state.attributes, getter_key, None) 

841 

842 if getters is None: 

843 # no getters, so go through a list of attributes we are loading for, 

844 # and the ones that are column based will have already put information 

845 # for us in another collection "memoized_setups", which represents the 

846 # output of the LoaderStrategy.setup_query() method. We can just as 

847 # easily call LoaderStrategy.create_row_processor for each, but by 

848 # getting it all at once from setup_query we save another method call 

849 # per attribute. 

850 props = mapper._prop_set 

851 if only_load_props is not None: 

852 props = props.intersection( 

853 mapper._props[k] for k in only_load_props 

854 ) 

855 

856 quick_populators = path.get( 

857 context.attributes, "memoized_setups", EMPTY_DICT 

858 ) 

859 

860 todo = [] 

861 cached_populators = { 

862 "new": [], 

863 "quick": [], 

864 "deferred": [], 

865 "expire": [], 

866 "existing": [], 

867 "eager": [], 

868 } 

869 

870 if refresh_state is None: 

871 # we can also get the "primary key" tuple getter function 

872 pk_cols = mapper.primary_key 

873 

874 if adapter: 

875 pk_cols = [adapter.columns[c] for c in pk_cols] 

876 primary_key_getter = result._tuple_getter(pk_cols) 

877 else: 

878 primary_key_getter = None 

879 

880 getters = { 

881 "cached_populators": cached_populators, 

882 "todo": todo, 

883 "primary_key_getter": primary_key_getter, 

884 } 

885 for prop in props: 

886 if prop in quick_populators: 

887 # this is an inlined path just for column-based attributes. 

888 col = quick_populators[prop] 

889 if col is _DEFER_FOR_STATE: 

890 cached_populators["new"].append( 

891 (prop.key, prop._deferred_column_loader) 

892 ) 

893 elif col is _SET_DEFERRED_EXPIRED: 

894 # note that in this path, we are no longer 

895 # searching in the result to see if the column might 

896 # be present in some unexpected way. 

897 cached_populators["expire"].append((prop.key, False)) 

898 elif col is _RAISE_FOR_STATE: 

899 cached_populators["new"].append( 

900 (prop.key, prop._raise_column_loader) 

901 ) 

902 else: 

903 getter = None 

904 if adapter: 

905 # this logic had been removed for all 1.4 releases 

906 # up until 1.4.18; the adapter here is particularly 

907 # the compound eager adapter which isn't accommodated 

908 # in the quick_populators right now. The "fallback" 

909 # logic below instead took over in many more cases 

910 # until issue #6596 was identified. 

911 

912 # note there is still an issue where this codepath 

913 # produces no "getter" for cases where a joined-inh 

914 # mapping includes a labeled column property, meaning 

915 # KeyError is caught internally and we fall back to 

916 # _getter(col), which works anyway. The adapter 

917 # here for joined inh without any aliasing might not 

918 # be useful. Tests which see this include 

919 # test.orm.inheritance.test_basic -> 

920 # EagerTargetingTest.test_adapt_stringency 

921 # OptimizedLoadTest.test_column_expression_joined 

922 # PolymorphicOnNotLocalTest.test_polymorphic_on_column_prop # noqa: E501 

923 # 

924 

925 adapted_col = adapter.columns[col] 

926 if adapted_col is not None: 

927 getter = result._getter(adapted_col, False) 

928 if not getter: 

929 getter = result._getter(col, False) 

930 if getter: 

931 cached_populators["quick"].append((prop.key, getter)) 

932 else: 

933 # fall back to the ColumnProperty itself, which 

934 # will iterate through all of its columns 

935 # to see if one fits 

936 prop.create_row_processor( 

937 context, 

938 query_entity, 

939 path, 

940 mapper, 

941 result, 

942 adapter, 

943 cached_populators, 

944 ) 

945 else: 

946 # loader strategies like subqueryload, selectinload, 

947 # joinedload, basically relationships, these need to interact 

948 # with the context each time to work correctly. 

949 todo.append(prop) 

950 

951 path.set(compile_state.attributes, getter_key, getters) 

952 

953 cached_populators = getters["cached_populators"] 

954 

955 populators = {key: list(value) for key, value in cached_populators.items()} 

956 for prop in getters["todo"]: 

957 prop.create_row_processor( 

958 context, query_entity, path, mapper, result, adapter, populators 

959 ) 

960 

961 propagated_loader_options = context.propagated_loader_options 

962 load_path = ( 

963 context.compile_state.current_path + path 

964 if context.compile_state.current_path.path 

965 else path 

966 ) 

967 

968 session_identity_map = context.session.identity_map 

969 

970 populate_existing = context.populate_existing or mapper.always_refresh 

971 load_evt = bool(mapper.class_manager.dispatch.load) 

972 refresh_evt = bool(mapper.class_manager.dispatch.refresh) 

973 persistent_evt = bool(context.session.dispatch.loaded_as_persistent) 

974 if persistent_evt: 

975 loaded_as_persistent = context.session.dispatch.loaded_as_persistent 

976 instance_state = attributes.instance_state 

977 instance_dict = attributes.instance_dict 

978 session_id = context.session.hash_key 

979 runid = context.runid 

980 identity_token = context.identity_token 

981 

982 version_check = context.version_check 

983 if version_check: 

984 version_id_col = mapper.version_id_col 

985 if version_id_col is not None: 

986 if adapter: 

987 version_id_col = adapter.columns[version_id_col] 

988 version_id_getter = result._getter(version_id_col) 

989 else: 

990 version_id_getter = None 

991 

992 if not refresh_state and _polymorphic_from is not None: 

993 key = ("loader", path.path) 

994 

995 if key in context.attributes and context.attributes[key].strategy == ( 

996 ("selectinload_polymorphic", True), 

997 ): 

998 option_entities = context.attributes[key].local_opts["entities"] 

999 else: 

1000 option_entities = None 

1001 selectin_load_via = mapper._should_selectin_load( 

1002 option_entities, 

1003 _polymorphic_from, 

1004 ) 

1005 

1006 if selectin_load_via and selectin_load_via is not _polymorphic_from: 

1007 # only_load_props goes w/ refresh_state only, and in a refresh 

1008 # we are a single row query for the exact entity; polymorphic 

1009 # loading does not apply 

1010 assert only_load_props is None 

1011 

1012 if selectin_load_via.is_mapper: 

1013 _load_supers = [] 

1014 _endmost_mapper = selectin_load_via 

1015 while ( 

1016 _endmost_mapper 

1017 and _endmost_mapper is not _polymorphic_from 

1018 ): 

1019 _load_supers.append(_endmost_mapper) 

1020 _endmost_mapper = _endmost_mapper.inherits 

1021 else: 

1022 _load_supers = [selectin_load_via] 

1023 

1024 for _selectinload_entity in _load_supers: 

1025 if _PostLoad.path_exists( 

1026 context, load_path, _selectinload_entity 

1027 ): 

1028 continue 

1029 callable_ = _load_subclass_via_in( 

1030 context, 

1031 path, 

1032 _selectinload_entity, 

1033 _polymorphic_from, 

1034 option_entities, 

1035 ) 

1036 _PostLoad.callable_for_path( 

1037 context, 

1038 load_path, 

1039 _selectinload_entity.mapper, 

1040 _selectinload_entity, 

1041 callable_, 

1042 _selectinload_entity, 

1043 ) 

1044 

1045 post_load = _PostLoad.for_context(context, load_path, only_load_props) 

1046 

1047 if refresh_state: 

1048 refresh_identity_key = refresh_state.key 

1049 if refresh_identity_key is None: 

1050 # super-rare condition; a refresh is being called 

1051 # on a non-instance-key instance; this is meant to only 

1052 # occur within a flush() 

1053 refresh_identity_key = mapper._identity_key_from_state( 

1054 refresh_state 

1055 ) 

1056 else: 

1057 refresh_identity_key = None 

1058 

1059 primary_key_getter = getters["primary_key_getter"] 

1060 

1061 if mapper.allow_partial_pks: 

1062 is_not_primary_key = _none_set.issuperset 

1063 else: 

1064 is_not_primary_key = _none_set.intersection 

1065 

1066 def _instance(row): 

1067 # determine the state that we'll be populating 

1068 if refresh_identity_key: 

1069 # fixed state that we're refreshing 

1070 state = refresh_state 

1071 instance = state.obj() 

1072 dict_ = instance_dict(instance) 

1073 isnew = state.runid != runid 

1074 currentload = True 

1075 loaded_instance = False 

1076 else: 

1077 # look at the row, see if that identity is in the 

1078 # session, or we have to create a new one 

1079 identitykey = ( 

1080 identity_class, 

1081 primary_key_getter(row), 

1082 identity_token, 

1083 ) 

1084 

1085 instance = session_identity_map.get(identitykey) 

1086 

1087 if instance is not None: 

1088 # existing instance 

1089 state = instance_state(instance) 

1090 dict_ = instance_dict(instance) 

1091 

1092 isnew = state.runid != runid 

1093 currentload = not isnew 

1094 loaded_instance = False 

1095 

1096 if version_check and version_id_getter and not currentload: 

1097 _validate_version_id( 

1098 mapper, state, dict_, row, version_id_getter 

1099 ) 

1100 

1101 else: 

1102 # create a new instance 

1103 

1104 # check for non-NULL values in the primary key columns, 

1105 # else no entity is returned for the row 

1106 if is_not_primary_key(identitykey[1]): 

1107 return None 

1108 

1109 isnew = True 

1110 currentload = True 

1111 loaded_instance = True 

1112 

1113 instance = mapper.class_manager.new_instance() 

1114 

1115 dict_ = instance_dict(instance) 

1116 state = instance_state(instance) 

1117 state.key = identitykey 

1118 state.identity_token = identity_token 

1119 

1120 # attach instance to session. 

1121 state.session_id = session_id 

1122 session_identity_map._add_unpresent(state, identitykey) 

1123 

1124 effective_populate_existing = populate_existing 

1125 if refresh_state is state: 

1126 effective_populate_existing = True 

1127 

1128 # populate. this looks at whether this state is new 

1129 # for this load or was existing, and whether or not this 

1130 # row is the first row with this identity. 

1131 if currentload or effective_populate_existing: 

1132 # full population routines. Objects here are either 

1133 # just created, or we are doing a populate_existing 

1134 

1135 # be conservative about setting load_path when populate_existing 

1136 # is in effect; want to maintain options from the original 

1137 # load. see test_expire->test_refresh_maintains_deferred_options 

1138 if isnew and ( 

1139 propagated_loader_options or not effective_populate_existing 

1140 ): 

1141 state.load_options = propagated_loader_options 

1142 state.load_path = load_path 

1143 

1144 _populate_full( 

1145 context, 

1146 row, 

1147 state, 

1148 dict_, 

1149 isnew, 

1150 load_path, 

1151 loaded_instance, 

1152 effective_populate_existing, 

1153 populators, 

1154 ) 

1155 

1156 if isnew: 

1157 # state.runid should be equal to context.runid / runid 

1158 # here, however for event checks we are being more conservative 

1159 # and checking against existing run id 

1160 # assert state.runid == runid 

1161 

1162 existing_runid = state.runid 

1163 

1164 if loaded_instance: 

1165 if load_evt: 

1166 state.manager.dispatch.load(state, context) 

1167 if state.runid != existing_runid: 

1168 _warn_for_runid_changed(state) 

1169 if persistent_evt: 

1170 loaded_as_persistent(context.session, state) 

1171 if state.runid != existing_runid: 

1172 _warn_for_runid_changed(state) 

1173 elif refresh_evt: 

1174 state.manager.dispatch.refresh( 

1175 state, context, only_load_props 

1176 ) 

1177 if state.runid != runid: 

1178 _warn_for_runid_changed(state) 

1179 

1180 if effective_populate_existing or state.modified: 

1181 if refresh_state and only_load_props: 

1182 state._commit(dict_, only_load_props) 

1183 else: 

1184 state._commit_all(dict_, session_identity_map) 

1185 

1186 if post_load: 

1187 post_load.add_state(state, True) 

1188 

1189 else: 

1190 # partial population routines, for objects that were already 

1191 # in the Session, but a row matches them; apply eager loaders 

1192 # on existing objects, etc. 

1193 unloaded = state.unloaded 

1194 isnew = state not in context.partials 

1195 

1196 if not isnew or unloaded or populators["eager"]: 

1197 # state is having a partial set of its attributes 

1198 # refreshed. Populate those attributes, 

1199 # and add to the "context.partials" collection. 

1200 

1201 to_load = _populate_partial( 

1202 context, 

1203 row, 

1204 state, 

1205 dict_, 

1206 isnew, 

1207 load_path, 

1208 unloaded, 

1209 populators, 

1210 ) 

1211 

1212 if isnew: 

1213 if refresh_evt: 

1214 existing_runid = state.runid 

1215 state.manager.dispatch.refresh(state, context, to_load) 

1216 if state.runid != existing_runid: 

1217 _warn_for_runid_changed(state) 

1218 

1219 state._commit(dict_, to_load) 

1220 

1221 if post_load and context.invoke_all_eagers: 

1222 post_load.add_state(state, False) 

1223 

1224 return instance 

1225 

1226 if mapper.polymorphic_map and not _polymorphic_from and not refresh_state: 

1227 # if we are doing polymorphic, dispatch to a different _instance() 

1228 # method specific to the subclass mapper 

1229 def ensure_no_pk(row): 

1230 identitykey = ( 

1231 identity_class, 

1232 primary_key_getter(row), 

1233 identity_token, 

1234 ) 

1235 if not is_not_primary_key(identitykey[1]): 

1236 return identitykey 

1237 else: 

1238 return None 

1239 

1240 _instance = _decorate_polymorphic_switch( 

1241 _instance, 

1242 context, 

1243 query_entity, 

1244 mapper, 

1245 result, 

1246 path, 

1247 polymorphic_discriminator, 

1248 adapter, 

1249 ensure_no_pk, 

1250 ) 

1251 

1252 return _instance 

1253 

1254 

1255def _load_subclass_via_in( 

1256 context, path, entity, polymorphic_from, option_entities 

1257): 

1258 mapper = entity.mapper 

1259 

1260 # TODO: polymorphic_from seems to be a Mapper in all cases. 

1261 # this is likely not needed, but as we dont have typing in loading.py 

1262 # yet, err on the safe side 

1263 polymorphic_from_mapper = polymorphic_from.mapper 

1264 not_against_basemost = polymorphic_from_mapper.inherits is not None 

1265 

1266 zero_idx = len(mapper.base_mapper.primary_key) == 1 

1267 

1268 if entity.is_aliased_class or not_against_basemost: 

1269 q, enable_opt, disable_opt = mapper._subclass_load_via_in( 

1270 entity, polymorphic_from 

1271 ) 

1272 else: 

1273 q, enable_opt, disable_opt = mapper._subclass_load_via_in_mapper 

1274 

1275 def do_load(context, path, states, load_only, effective_entity): 

1276 if not option_entities: 

1277 # filter out states for those that would have selectinloaded 

1278 # from another loader 

1279 # TODO: we are currently ignoring the case where the 

1280 # "selectin_polymorphic" option is used, as this is much more 

1281 # complex / specific / very uncommon API use 

1282 states = [ 

1283 (s, v) 

1284 for s, v in states 

1285 if s.mapper._would_selectin_load_only_from_given_mapper(mapper) 

1286 ] 

1287 

1288 if not states: 

1289 return 

1290 

1291 orig_query = context.query 

1292 

1293 if path.parent: 

1294 enable_opt_lcl = enable_opt._prepend_path(path) 

1295 disable_opt_lcl = disable_opt._prepend_path(path) 

1296 else: 

1297 enable_opt_lcl = enable_opt 

1298 disable_opt_lcl = disable_opt 

1299 options = ( 

1300 (enable_opt_lcl,) + orig_query._with_options + (disable_opt_lcl,) 

1301 ) 

1302 

1303 q2 = q.options(*options) 

1304 

1305 q2._compile_options = context.compile_state.default_compile_options 

1306 q2._compile_options += {"_current_path": path.parent} 

1307 

1308 if context.populate_existing: 

1309 q2 = q2.execution_options(populate_existing=True) 

1310 

1311 while states: 

1312 chunk = states[0 : _SelectInLoader._chunksize] 

1313 states = states[_SelectInLoader._chunksize :] 

1314 context.session.execute( 

1315 q2, 

1316 dict( 

1317 primary_keys=[ 

1318 state.key[1][0] if zero_idx else state.key[1] 

1319 for state, load_attrs in chunk 

1320 ] 

1321 ), 

1322 ).unique().scalars().all() 

1323 

1324 return do_load 

1325 

1326 

1327def _populate_full( 

1328 context, 

1329 row, 

1330 state, 

1331 dict_, 

1332 isnew, 

1333 load_path, 

1334 loaded_instance, 

1335 populate_existing, 

1336 populators, 

1337): 

1338 if isnew: 

1339 # first time we are seeing a row with this identity. 

1340 state.runid = context.runid 

1341 

1342 for key, getter in populators["quick"]: 

1343 dict_[key] = getter(row) 

1344 if populate_existing: 

1345 for key, set_callable in populators["expire"]: 

1346 dict_.pop(key, None) 

1347 if set_callable: 

1348 state.expired_attributes.add(key) 

1349 else: 

1350 for key, set_callable in populators["expire"]: 

1351 if set_callable: 

1352 state.expired_attributes.add(key) 

1353 

1354 for key, populator in populators["new"]: 

1355 populator(state, dict_, row) 

1356 

1357 elif load_path != state.load_path: 

1358 # new load path, e.g. object is present in more than one 

1359 # column position in a series of rows 

1360 state.load_path = load_path 

1361 

1362 # if we have data, and the data isn't in the dict, OK, let's put 

1363 # it in. 

1364 for key, getter in populators["quick"]: 

1365 if key not in dict_: 

1366 dict_[key] = getter(row) 

1367 

1368 # otherwise treat like an "already seen" row 

1369 for key, populator in populators["existing"]: 

1370 populator(state, dict_, row) 

1371 # TODO: allow "existing" populator to know this is 

1372 # a new path for the state: 

1373 # populator(state, dict_, row, new_path=True) 

1374 

1375 else: 

1376 # have already seen rows with this identity in this same path. 

1377 for key, populator in populators["existing"]: 

1378 populator(state, dict_, row) 

1379 

1380 # TODO: same path 

1381 # populator(state, dict_, row, new_path=False) 

1382 

1383 

1384def _populate_partial( 

1385 context, row, state, dict_, isnew, load_path, unloaded, populators 

1386): 

1387 if not isnew: 

1388 if unloaded: 

1389 # extra pass, see #8166 

1390 for key, getter in populators["quick"]: 

1391 if key in unloaded: 

1392 dict_[key] = getter(row) 

1393 

1394 to_load = context.partials[state] 

1395 for key, populator in populators["existing"]: 

1396 if key in to_load: 

1397 populator(state, dict_, row) 

1398 else: 

1399 to_load = unloaded 

1400 context.partials[state] = to_load 

1401 

1402 for key, getter in populators["quick"]: 

1403 if key in to_load: 

1404 dict_[key] = getter(row) 

1405 for key, set_callable in populators["expire"]: 

1406 if key in to_load: 

1407 dict_.pop(key, None) 

1408 if set_callable: 

1409 state.expired_attributes.add(key) 

1410 for key, populator in populators["new"]: 

1411 if key in to_load: 

1412 populator(state, dict_, row) 

1413 

1414 for key, populator in populators["eager"]: 

1415 if key not in unloaded: 

1416 populator(state, dict_, row) 

1417 

1418 return to_load 

1419 

1420 

1421def _validate_version_id(mapper, state, dict_, row, getter): 

1422 if mapper._get_state_attr_by_column( 

1423 state, dict_, mapper.version_id_col 

1424 ) != getter(row): 

1425 raise orm_exc.StaleDataError( 

1426 "Instance '%s' has version id '%s' which " 

1427 "does not match database-loaded version id '%s'." 

1428 % ( 

1429 state_str(state), 

1430 mapper._get_state_attr_by_column( 

1431 state, dict_, mapper.version_id_col 

1432 ), 

1433 getter(row), 

1434 ) 

1435 ) 

1436 

1437 

1438def _decorate_polymorphic_switch( 

1439 instance_fn, 

1440 context, 

1441 query_entity, 

1442 mapper, 

1443 result, 

1444 path, 

1445 polymorphic_discriminator, 

1446 adapter, 

1447 ensure_no_pk, 

1448): 

1449 if polymorphic_discriminator is not None: 

1450 polymorphic_on = polymorphic_discriminator 

1451 else: 

1452 polymorphic_on = mapper.polymorphic_on 

1453 if polymorphic_on is None: 

1454 return instance_fn 

1455 

1456 if adapter: 

1457 polymorphic_on = adapter.columns[polymorphic_on] 

1458 

1459 def configure_subclass_mapper(discriminator): 

1460 try: 

1461 sub_mapper = mapper.polymorphic_map[discriminator] 

1462 except KeyError: 

1463 raise AssertionError( 

1464 "No such polymorphic_identity %r is defined" % discriminator 

1465 ) 

1466 else: 

1467 if sub_mapper is mapper: 

1468 return None 

1469 elif not sub_mapper.isa(mapper): 

1470 return False 

1471 

1472 return _instance_processor( 

1473 query_entity, 

1474 sub_mapper, 

1475 context, 

1476 result, 

1477 path, 

1478 adapter, 

1479 _polymorphic_from=mapper, 

1480 ) 

1481 

1482 polymorphic_instances = util.PopulateDict(configure_subclass_mapper) 

1483 

1484 getter = result._getter(polymorphic_on) 

1485 

1486 def polymorphic_instance(row): 

1487 discriminator = getter(row) 

1488 if discriminator is not None: 

1489 _instance = polymorphic_instances[discriminator] 

1490 if _instance: 

1491 return _instance(row) 

1492 elif _instance is False: 

1493 identitykey = ensure_no_pk(row) 

1494 

1495 if identitykey: 

1496 raise sa_exc.InvalidRequestError( 

1497 "Row with identity key %s can't be loaded into an " 

1498 "object; the polymorphic discriminator column '%s' " 

1499 "refers to %s, which is not a sub-mapper of " 

1500 "the requested %s" 

1501 % ( 

1502 identitykey, 

1503 polymorphic_on, 

1504 mapper.polymorphic_map[discriminator], 

1505 mapper, 

1506 ) 

1507 ) 

1508 else: 

1509 return None 

1510 else: 

1511 return instance_fn(row) 

1512 else: 

1513 identitykey = ensure_no_pk(row) 

1514 

1515 if identitykey: 

1516 raise sa_exc.InvalidRequestError( 

1517 "Row with identity key %s can't be loaded into an " 

1518 "object; the polymorphic discriminator column '%s' is " 

1519 "NULL" % (identitykey, polymorphic_on) 

1520 ) 

1521 else: 

1522 return None 

1523 

1524 return polymorphic_instance 

1525 

1526 

1527class _PostLoad: 

1528 """Track loaders and states for "post load" operations.""" 

1529 

1530 __slots__ = "loaders", "states", "load_keys" 

1531 

1532 def __init__(self): 

1533 self.loaders = {} 

1534 self.states = util.OrderedDict() 

1535 self.load_keys = None 

1536 

1537 def add_state(self, state, overwrite): 

1538 # the states for a polymorphic load here are all shared 

1539 # within a single PostLoad object among multiple subtypes. 

1540 # Filtering of callables on a per-subclass basis needs to be done at 

1541 # the invocation level 

1542 self.states[state] = overwrite 

1543 

1544 def invoke(self, context, path): 

1545 if not self.states: 

1546 return 

1547 path = path_registry.PathRegistry.coerce(path) 

1548 for ( 

1549 effective_context, 

1550 token, 

1551 limit_to_mapper, 

1552 loader, 

1553 arg, 

1554 kw, 

1555 ) in self.loaders.values(): 

1556 states = [ 

1557 (state, overwrite) 

1558 for state, overwrite in self.states.items() 

1559 if state.manager.mapper.isa(limit_to_mapper) 

1560 ] 

1561 if states: 

1562 loader( 

1563 effective_context, path, states, self.load_keys, *arg, **kw 

1564 ) 

1565 self.states.clear() 

1566 

1567 @classmethod 

1568 def for_context(cls, context, path, only_load_props): 

1569 pl = context.post_load_paths.get(path.path) 

1570 if pl is not None and only_load_props: 

1571 pl.load_keys = only_load_props 

1572 return pl 

1573 

1574 @classmethod 

1575 def path_exists(self, context, path, key): 

1576 return ( 

1577 path.path in context.post_load_paths 

1578 and key in context.post_load_paths[path.path].loaders 

1579 ) 

1580 

1581 @classmethod 

1582 def callable_for_path( 

1583 cls, context, path, limit_to_mapper, token, loader_callable, *arg, **kw 

1584 ): 

1585 if path.path in context.post_load_paths: 

1586 pl = context.post_load_paths[path.path] 

1587 else: 

1588 pl = context.post_load_paths[path.path] = _PostLoad() 

1589 pl.loaders[token] = ( 

1590 context, 

1591 token, 

1592 limit_to_mapper, 

1593 loader_callable, 

1594 arg, 

1595 kw, 

1596 ) 

1597 

1598 

1599def _load_scalar_attributes(mapper, state, attribute_names, passive): 

1600 """initiate a column-based attribute refresh operation.""" 

1601 

1602 # assert mapper is _state_mapper(state) 

1603 session = state.session 

1604 if not session: 

1605 raise orm_exc.DetachedInstanceError( 

1606 "Instance %s is not bound to a Session; " 

1607 "attribute refresh operation cannot proceed" % (state_str(state)) 

1608 ) 

1609 

1610 no_autoflush = bool(passive & attributes.NO_AUTOFLUSH) 

1611 

1612 # in the case of inheritance, particularly concrete and abstract 

1613 # concrete inheritance, the class manager might have some keys 

1614 # of attributes on the superclass that we didn't actually map. 

1615 # These could be mapped as "concrete, don't load" or could be completely 

1616 # excluded from the mapping and we know nothing about them. Filter them 

1617 # here to prevent them from coming through. 

1618 if attribute_names: 

1619 attribute_names = attribute_names.intersection(mapper.attrs.keys()) 

1620 

1621 if mapper.inherits and not mapper.concrete: 

1622 # load based on committed attributes in the object, formed into 

1623 # a truncated SELECT that only includes relevant tables. does not 

1624 # currently use state.key 

1625 statement = mapper._optimized_get_statement(state, attribute_names) 

1626 if statement is not None: 

1627 # undefer() isn't needed here because statement has the 

1628 # columns needed already, this implicitly undefers that column 

1629 stmt = FromStatement(mapper, statement) 

1630 

1631 return _load_on_ident( 

1632 session, 

1633 stmt, 

1634 None, 

1635 only_load_props=attribute_names, 

1636 refresh_state=state, 

1637 no_autoflush=no_autoflush, 

1638 ) 

1639 

1640 # normal load, use state.key as the identity to SELECT 

1641 has_key = bool(state.key) 

1642 

1643 if has_key: 

1644 identity_key = state.key 

1645 else: 

1646 # this codepath is rare - only valid when inside a flush, and the 

1647 # object is becoming persistent but hasn't yet been assigned 

1648 # an identity_key. 

1649 # check here to ensure we have the attrs we need. 

1650 pk_attrs = [ 

1651 mapper._columntoproperty[col].key for col in mapper.primary_key 

1652 ] 

1653 if state.expired_attributes.intersection(pk_attrs): 

1654 raise sa_exc.InvalidRequestError( 

1655 "Instance %s cannot be refreshed - it's not " 

1656 " persistent and does not " 

1657 "contain a full primary key." % state_str(state) 

1658 ) 

1659 identity_key = mapper._identity_key_from_state(state) 

1660 

1661 if ( 

1662 _none_set.issubset(identity_key) and not mapper.allow_partial_pks 

1663 ) or _none_set.issuperset(identity_key): 

1664 util.warn_limited( 

1665 "Instance %s to be refreshed doesn't " 

1666 "contain a full primary key - can't be refreshed " 

1667 "(and shouldn't be expired, either).", 

1668 state_str(state), 

1669 ) 

1670 return 

1671 

1672 result = _load_on_ident( 

1673 session, 

1674 select(mapper).set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL), 

1675 identity_key, 

1676 refresh_state=state, 

1677 only_load_props=attribute_names, 

1678 no_autoflush=no_autoflush, 

1679 ) 

1680 

1681 # if instance is pending, a refresh operation 

1682 # may not complete (even if PK attributes are assigned) 

1683 if has_key and result is None: 

1684 raise orm_exc.ObjectDeletedError(state)