Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/loading.py: 11%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

661 statements  

1# orm/loading.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: ignore-errors 

8 

9 

10"""private module containing functions used to convert database 

11rows into object instances and associated state. 

12 

13the functions here are called primarily by Query, Mapper, 

14as well as some of the attribute loading strategies. 

15 

16""" 

17 

18from __future__ import annotations 

19 

20from typing import Any 

21from typing import Dict 

22from typing import Iterable 

23from typing import List 

24from typing import Mapping 

25from typing import Optional 

26from typing import Sequence 

27from typing import Tuple 

28from typing import TYPE_CHECKING 

29from typing import TypeVar 

30from typing import Union 

31 

32from . import attributes 

33from . import exc as orm_exc 

34from . import path_registry 

35from .base import _DEFER_FOR_STATE 

36from .base import _RAISE_FOR_STATE 

37from .base import _SET_DEFERRED_EXPIRED 

38from .base import PassiveFlag 

39from .context import _ORMCompileState 

40from .context import FromStatement 

41from .context import QueryContext 

42from .strategies import _SelectInLoader 

43from .util import _none_set 

44from .util import state_str 

45from .. import exc as sa_exc 

46from .. import util 

47from ..engine import result_tuple 

48from ..engine.result import ChunkedIteratorResult 

49from ..engine.result import FrozenResult 

50from ..engine.result import SimpleResultMetaData 

51from ..sql import select 

52from ..sql import util as sql_util 

53from ..sql.selectable import ForUpdateArg 

54from ..sql.selectable import SelectState 

55from ..util import EMPTY_DICT 

56from ..util.typing import TupleAny 

57from ..util.typing import Unpack 

58 

59if TYPE_CHECKING: 

60 from ._typing import _IdentityKeyType 

61 from .base import LoaderCallableStatus 

62 from .interfaces import ORMOption 

63 from .mapper import Mapper 

64 from .query import Query 

65 from .session import Session 

66 from .state import InstanceState 

67 from ..engine.cursor import CursorResult 

68 from ..engine.interfaces import _ExecuteOptions 

69 from ..engine.result import Result 

70 from ..sql import Select 

71 

72_T = TypeVar("_T", bound=Any) 

73_O = TypeVar("_O", bound=object) 

74_new_runid = util.counter() 

75 

76 

77_PopulatorDict = Dict[str, List[Tuple[str, Any]]] 

78 

79 

80def instances( 

81 cursor: CursorResult[Unpack[TupleAny]], context: QueryContext 

82) -> Result[Unpack[TupleAny]]: 

83 """Return a :class:`.Result` given an ORM query context. 

84 

85 :param cursor: a :class:`.CursorResult`, generated by a statement 

86 which came from :class:`.ORMCompileState` 

87 

88 :param context: a :class:`.QueryContext` object 

89 

90 :return: a :class:`.Result` object representing ORM results 

91 

92 .. versionchanged:: 1.4 The instances() function now uses 

93 :class:`.Result` objects and has an all new interface. 

94 

95 """ 

96 

97 context.runid = _new_runid() 

98 

99 if context.top_level_context: 

100 is_top_level = False 

101 context.post_load_paths = context.top_level_context.post_load_paths 

102 else: 

103 is_top_level = True 

104 context.post_load_paths = {} 

105 

106 compile_state = context.compile_state 

107 filtered = compile_state._has_mapper_entities 

108 single_entity = ( 

109 not context.load_options._only_return_tuples 

110 and len(compile_state._entities) == 1 

111 and compile_state._entities[0].supports_single_entity 

112 ) 

113 

114 try: 

115 (process, labels, extra) = list( 

116 zip( 

117 *[ 

118 query_entity.row_processor(context, cursor) 

119 for query_entity in context.compile_state._entities 

120 ] 

121 ) 

122 ) 

123 

124 if context.yield_per and ( 

125 context.loaders_require_buffering 

126 or context.loaders_require_uniquing 

127 ): 

128 raise sa_exc.InvalidRequestError( 

129 "Can't use yield_per with eager loaders that require uniquing " 

130 "or row buffering, e.g. joinedload() against collections " 

131 "or subqueryload(). Consider the selectinload() strategy " 

132 "for better flexibility in loading objects." 

133 ) 

134 

135 except Exception: 

136 with util.safe_reraise(): 

137 cursor.close() 

138 

139 def _no_unique(entry): 

140 raise sa_exc.InvalidRequestError( 

141 "Can't use the ORM yield_per feature in conjunction with unique()" 

142 ) 

143 

144 def _not_hashable(datatype, *, legacy=False, uncertain=False): 

145 if not legacy: 

146 

147 def go(obj): 

148 if uncertain: 

149 try: 

150 return hash(obj) 

151 except: 

152 pass 

153 

154 raise sa_exc.InvalidRequestError( 

155 "Can't apply uniqueness to row tuple containing value of " 

156 f"""type {datatype!r}; { 

157 'the values returned appear to be' 

158 if uncertain 

159 else 'this datatype produces' 

160 } non-hashable values""" 

161 ) 

162 

163 return go 

164 elif not uncertain: 

165 return id 

166 else: 

167 _use_id = False 

168 

169 def go(obj): 

170 nonlocal _use_id 

171 

172 if not _use_id: 

173 try: 

174 return hash(obj) 

175 except: 

176 pass 

177 

178 # in #10459, we considered using a warning here, however 

179 # as legacy query uses result.unique() in all cases, this 

180 # would lead to too many warning cases. 

181 _use_id = True 

182 

183 return id(obj) 

184 

185 return go 

186 

187 unique_filters = [ 

188 ( 

189 _no_unique 

190 if context.yield_per 

191 else ( 

192 _not_hashable( 

193 ent.column.type, # type: ignore 

194 legacy=context.load_options._legacy_uniquing, 

195 uncertain=ent._null_column_type, 

196 ) 

197 if ( 

198 not ent.use_id_for_hash 

199 and (ent._non_hashable_value or ent._null_column_type) 

200 ) 

201 else id if ent.use_id_for_hash else None 

202 ) 

203 ) 

204 for ent in context.compile_state._entities 

205 ] 

206 

207 row_metadata = SimpleResultMetaData( 

208 labels, extra, _unique_filters=unique_filters 

209 ) 

210 

211 def chunks(size): # type: ignore 

212 while True: 

213 yield_per = size 

214 

215 context.partials = {} 

216 

217 if yield_per: 

218 fetch = cursor.fetchmany(yield_per) 

219 

220 if not fetch: 

221 break 

222 else: 

223 fetch = cursor._raw_all_rows() 

224 

225 if single_entity: 

226 proc = process[0] 

227 rows = [proc(row) for row in fetch] 

228 else: 

229 rows = [ 

230 tuple([proc(row) for proc in process]) for row in fetch 

231 ] 

232 

233 # if we are the originating load from a query, meaning we 

234 # aren't being called as a result of a nested "post load", 

235 # iterate through all the collected post loaders and fire them 

236 # off. Previously this used to work recursively, however that 

237 # prevented deeply nested structures from being loadable 

238 if is_top_level: 

239 if yield_per: 

240 # if using yield per, memoize the state of the 

241 # collection so that it can be restored 

242 top_level_post_loads = list( 

243 context.post_load_paths.items() 

244 ) 

245 

246 while context.post_load_paths: 

247 post_loads = list(context.post_load_paths.items()) 

248 context.post_load_paths.clear() 

249 for path, post_load in post_loads: 

250 post_load.invoke(context, path) 

251 

252 if yield_per: 

253 context.post_load_paths.clear() 

254 context.post_load_paths.update(top_level_post_loads) 

255 

256 yield rows 

257 

258 if not yield_per: 

259 break 

260 

261 if context.execution_options.get("prebuffer_rows", False): 

262 # this is a bit of a hack at the moment. 

263 # I would rather have some option in the result to pre-buffer 

264 # internally. 

265 _prebuffered = list(chunks(None)) 

266 

267 def chunks(size): 

268 return iter(_prebuffered) 

269 

270 result = ChunkedIteratorResult( 

271 row_metadata, 

272 chunks, 

273 source_supports_scalars=single_entity, 

274 raw=cursor, 

275 dynamic_yield_per=cursor.context._is_server_side, 

276 ) 

277 

278 # filtered and single_entity are used to indicate to legacy Query that the 

279 # query has ORM entities, so legacy deduping and scalars should be called 

280 # on the result. 

281 result._attributes = result._attributes.union( 

282 dict(filtered=filtered, is_single_entity=single_entity) 

283 ) 

284 

285 # multi_row_eager_loaders OTOH is specific to joinedload. 

286 if context.compile_state.multi_row_eager_loaders: 

287 

288 def require_unique(obj): 

289 raise sa_exc.InvalidRequestError( 

290 "The unique() method must be invoked on this Result, " 

291 "as it contains results that include joined eager loads " 

292 "against collections" 

293 ) 

294 

295 result._unique_filter_state = (None, require_unique) 

296 

297 if context.yield_per: 

298 result.yield_per(context.yield_per) 

299 

300 return result 

301 

302 

303@util.preload_module("sqlalchemy.orm.context") 

304def merge_frozen_result(session, statement, frozen_result, load=True): 

305 """Merge a :class:`_engine.FrozenResult` back into a :class:`_orm.Session`, 

306 returning a new :class:`_engine.Result` object with :term:`persistent` 

307 objects. 

308 

309 See the section :ref:`do_orm_execute_re_executing` for an example. 

310 

311 .. seealso:: 

312 

313 :ref:`do_orm_execute_re_executing` 

314 

315 :meth:`_engine.Result.freeze` 

316 

317 :class:`_engine.FrozenResult` 

318 

319 """ 

320 querycontext = util.preloaded.orm_context 

321 

322 if load: 

323 # flush current contents if we expect to load data 

324 session._autoflush() 

325 

326 ctx = querycontext._ORMSelectCompileState._create_entities_collection( 

327 statement, legacy=False 

328 ) 

329 

330 with session.no_autoflush: 

331 mapped_entities = [ 

332 i 

333 for i, e in enumerate(ctx._entities) 

334 if isinstance(e, querycontext._MapperEntity) 

335 ] 

336 keys = [ent._label_name for ent in ctx._entities] 

337 

338 keyed_tuple = result_tuple( 

339 keys, [ent._extra_entities for ent in ctx._entities] 

340 ) 

341 

342 result = [] 

343 for newrow in frozen_result._rewrite_rows(): 

344 for i in mapped_entities: 

345 if newrow[i] is not None: 

346 newrow[i] = session._merge( 

347 attributes.instance_state(newrow[i]), 

348 attributes.instance_dict(newrow[i]), 

349 load=load, 

350 _recursive={}, 

351 _resolve_conflict_map={}, 

352 ) 

353 

354 result.append(keyed_tuple(newrow)) 

355 

356 return frozen_result.with_new_rows(result) 

357 

358 

359@util.became_legacy_20( 

360 ":func:`_orm.merge_result`", 

361 alternative="The function as well as the method on :class:`_orm.Query` " 

362 "is superseded by the :func:`_orm.merge_frozen_result` function.", 

363) 

364@util.preload_module("sqlalchemy.orm.context") 

365def merge_result( 

366 query: Query[Any], 

367 iterator: Union[FrozenResult, Iterable[Sequence[Any]], Iterable[object]], 

368 load: bool = True, 

369) -> Union[FrozenResult, Iterable[Any]]: 

370 """Merge a result into the given :class:`.Query` object's Session. 

371 

372 See :meth:`_orm.Query.merge_result` for top-level documentation on this 

373 function. 

374 

375 """ 

376 

377 querycontext = util.preloaded.orm_context 

378 

379 session = query.session 

380 if load: 

381 # flush current contents if we expect to load data 

382 session._autoflush() 

383 

384 # TODO: need test coverage and documentation for the FrozenResult 

385 # use case. 

386 if isinstance(iterator, FrozenResult): 

387 frozen_result = iterator 

388 iterator = iter(frozen_result.data) 

389 else: 

390 frozen_result = None 

391 

392 ctx = querycontext._ORMSelectCompileState._create_entities_collection( 

393 query, legacy=True 

394 ) 

395 

396 autoflush = session.autoflush 

397 try: 

398 session.autoflush = False 

399 single_entity = not frozen_result and len(ctx._entities) == 1 

400 

401 if single_entity: 

402 if isinstance(ctx._entities[0], querycontext._MapperEntity): 

403 result = [ 

404 session._merge( 

405 attributes.instance_state(instance), 

406 attributes.instance_dict(instance), 

407 load=load, 

408 _recursive={}, 

409 _resolve_conflict_map={}, 

410 ) 

411 for instance in iterator 

412 ] 

413 else: 

414 result = list(iterator) 

415 else: 

416 mapped_entities = [ 

417 i 

418 for i, e in enumerate(ctx._entities) 

419 if isinstance(e, querycontext._MapperEntity) 

420 ] 

421 result = [] 

422 keys = [ent._label_name for ent in ctx._entities] 

423 

424 keyed_tuple = result_tuple( 

425 keys, [ent._extra_entities for ent in ctx._entities] 

426 ) 

427 

428 for row in iterator: 

429 newrow = list(row) 

430 for i in mapped_entities: 

431 if newrow[i] is not None: 

432 newrow[i] = session._merge( 

433 attributes.instance_state(newrow[i]), 

434 attributes.instance_dict(newrow[i]), 

435 load=load, 

436 _recursive={}, 

437 _resolve_conflict_map={}, 

438 ) 

439 result.append(keyed_tuple(newrow)) 

440 

441 if frozen_result: 

442 return frozen_result.with_new_rows(result) 

443 else: 

444 return iter(result) 

445 finally: 

446 session.autoflush = autoflush 

447 

448 

449def get_from_identity( 

450 session: Session, 

451 mapper: Mapper[_O], 

452 key: _IdentityKeyType[_O], 

453 passive: PassiveFlag, 

454) -> Union[LoaderCallableStatus, Optional[_O]]: 

455 """Look up the given key in the given session's identity map, 

456 check the object for expired state if found. 

457 

458 """ 

459 instance = session.identity_map.get(key) 

460 if instance is not None: 

461 state = attributes.instance_state(instance) 

462 

463 if mapper.inherits and not state.mapper.isa(mapper): 

464 return attributes.PASSIVE_CLASS_MISMATCH 

465 

466 # expired - ensure it still exists 

467 if state.expired: 

468 if not passive & attributes.SQL_OK: 

469 # TODO: no coverage here 

470 return attributes.PASSIVE_NO_RESULT 

471 elif not passive & attributes.RELATED_OBJECT_OK: 

472 # this mode is used within a flush and the instance's 

473 # expired state will be checked soon enough, if necessary. 

474 # also used by immediateloader for a mutually-dependent 

475 # o2m->m2m load, :ticket:`6301` 

476 return instance 

477 try: 

478 state._load_expired(state, passive) 

479 except orm_exc.ObjectDeletedError: 

480 session._remove_newly_deleted([state]) 

481 return None 

482 return instance 

483 else: 

484 return None 

485 

486 

487def _load_on_ident( 

488 session: Session, 

489 statement: Union[Select, FromStatement], 

490 key: Optional[_IdentityKeyType], 

491 *, 

492 load_options: Optional[Sequence[ORMOption]] = None, 

493 refresh_state: Optional[InstanceState[Any]] = None, 

494 with_for_update: Optional[ForUpdateArg] = None, 

495 only_load_props: Optional[Iterable[str]] = None, 

496 no_autoflush: bool = False, 

497 bind_arguments: Mapping[str, Any] = util.EMPTY_DICT, 

498 execution_options: _ExecuteOptions = util.EMPTY_DICT, 

499 require_pk_cols: bool = False, 

500 is_user_refresh: bool = False, 

501): 

502 """Load the given identity key from the database.""" 

503 if key is not None: 

504 ident = key[1] 

505 identity_token = key[2] 

506 else: 

507 ident = identity_token = None 

508 

509 return _load_on_pk_identity( 

510 session, 

511 statement, 

512 ident, 

513 load_options=load_options, 

514 refresh_state=refresh_state, 

515 with_for_update=with_for_update, 

516 only_load_props=only_load_props, 

517 identity_token=identity_token, 

518 no_autoflush=no_autoflush, 

519 bind_arguments=bind_arguments, 

520 execution_options=execution_options, 

521 require_pk_cols=require_pk_cols, 

522 is_user_refresh=is_user_refresh, 

523 ) 

524 

525 

526def _load_on_pk_identity( 

527 session: Session, 

528 statement: Union[Select, FromStatement], 

529 primary_key_identity: Optional[Tuple[Any, ...]], 

530 *, 

531 load_options: Optional[Sequence[ORMOption]] = None, 

532 refresh_state: Optional[InstanceState[Any]] = None, 

533 with_for_update: Optional[ForUpdateArg] = None, 

534 only_load_props: Optional[Iterable[str]] = None, 

535 identity_token: Optional[Any] = None, 

536 no_autoflush: bool = False, 

537 bind_arguments: Mapping[str, Any] = util.EMPTY_DICT, 

538 execution_options: _ExecuteOptions = util.EMPTY_DICT, 

539 require_pk_cols: bool = False, 

540 is_user_refresh: bool = False, 

541): 

542 """Load the given primary key identity from the database.""" 

543 

544 query = statement 

545 q = query._clone() 

546 

547 assert not q._is_lambda_element 

548 

549 if load_options is None: 

550 load_options = QueryContext.default_load_options 

551 

552 if ( 

553 statement._compile_options 

554 is SelectState.default_select_compile_options 

555 ): 

556 compile_options = _ORMCompileState.default_compile_options 

557 else: 

558 compile_options = statement._compile_options 

559 

560 if primary_key_identity is not None: 

561 mapper = query._propagate_attrs["plugin_subject"] 

562 

563 (_get_clause, _get_params) = mapper._get_clause 

564 

565 # None present in ident - turn those comparisons 

566 # into "IS NULL" 

567 if None in primary_key_identity: 

568 nones = { 

569 _get_params[col].key 

570 for col, value in zip(mapper.primary_key, primary_key_identity) 

571 if value is None 

572 } 

573 

574 _get_clause = sql_util.adapt_criterion_to_null(_get_clause, nones) 

575 

576 if len(nones) == len(primary_key_identity): 

577 util.warn( 

578 "fully NULL primary key identity cannot load any " 

579 "object. This condition may raise an error in a future " 

580 "release." 

581 ) 

582 

583 q._where_criteria = (_get_clause,) 

584 

585 params = { 

586 _get_params[primary_key].key: id_val 

587 for id_val, primary_key in zip( 

588 primary_key_identity, mapper.primary_key 

589 ) 

590 } 

591 else: 

592 params = None 

593 

594 if with_for_update is not None: 

595 version_check = True 

596 q._for_update_arg = with_for_update 

597 elif query._for_update_arg is not None: 

598 version_check = True 

599 q._for_update_arg = query._for_update_arg 

600 else: 

601 version_check = False 

602 

603 if require_pk_cols and only_load_props: 

604 if not refresh_state: 

605 raise sa_exc.ArgumentError( 

606 "refresh_state is required when require_pk_cols is present" 

607 ) 

608 

609 refresh_state_prokeys = refresh_state.mapper._primary_key_propkeys 

610 has_changes = { 

611 key 

612 for key in refresh_state_prokeys.difference(only_load_props) 

613 if refresh_state.attrs[key].history.has_changes() 

614 } 

615 if has_changes: 

616 # raise if pending pk changes are present. 

617 # technically, this could be limited to the case where we have 

618 # relationships in the only_load_props collection to be refreshed 

619 # also (and only ones that have a secondary eager loader, at that). 

620 # however, the error is in place across the board so that behavior 

621 # here is easier to predict. The use case it prevents is one 

622 # of mutating PK attrs, leaving them unflushed, 

623 # calling session.refresh(), and expecting those attrs to remain 

624 # still unflushed. It seems likely someone doing all those 

625 # things would be better off having the PK attributes flushed 

626 # to the database before tinkering like that (session.refresh() is 

627 # tinkering). 

628 raise sa_exc.InvalidRequestError( 

629 f"Please flush pending primary key changes on " 

630 "attributes " 

631 f"{has_changes} for mapper {refresh_state.mapper} before " 

632 "proceeding with a refresh" 

633 ) 

634 

635 # overall, the ORM has no internal flow right now for "dont load the 

636 # primary row of an object at all, but fire off 

637 # selectinload/subqueryload/immediateload for some relationships". 

638 # It would probably be a pretty big effort to add such a flow. So 

639 # here, the case for #8703 is introduced; user asks to refresh some 

640 # relationship attributes only which are 

641 # selectinload/subqueryload/immediateload/ etc. (not joinedload). 

642 # ORM complains there's no columns in the primary row to load. 

643 # So here, we just add the PK cols if that 

644 # case is detected, so that there is a SELECT emitted for the primary 

645 # row. 

646 # 

647 # Let's just state right up front, for this one little case, 

648 # the ORM here is adding a whole extra SELECT just to satisfy 

649 # limitations in the internal flow. This is really not a thing 

650 # SQLAlchemy finds itself doing like, ever, obviously, we are 

651 # constantly working to *remove* SELECTs we don't need. We 

652 # rationalize this for now based on 1. session.refresh() is not 

653 # commonly used 2. session.refresh() with only relationship attrs is 

654 # even less commonly used 3. the SELECT in question is very low 

655 # latency. 

656 # 

657 # to add the flow to not include the SELECT, the quickest way 

658 # might be to just manufacture a single-row result set to send off to 

659 # instances(), but we'd have to weave that into context.py and all 

660 # that. For 2.0.0, we have enough big changes to navigate for now. 

661 # 

662 mp = refresh_state.mapper._props 

663 for p in only_load_props: 

664 if mp[p]._is_relationship: 

665 only_load_props = refresh_state_prokeys.union(only_load_props) 

666 break 

667 

668 if refresh_state and refresh_state.load_options: 

669 compile_options += {"_current_path": refresh_state.load_path.parent} 

670 q = q.options(*refresh_state.load_options) 

671 

672 new_compile_options, load_options = _set_get_options( 

673 compile_options, 

674 load_options, 

675 version_check=version_check, 

676 only_load_props=only_load_props, 

677 refresh_state=refresh_state, 

678 identity_token=identity_token, 

679 is_user_refresh=is_user_refresh, 

680 ) 

681 

682 q._compile_options = new_compile_options 

683 q._order_by = None 

684 

685 if no_autoflush: 

686 load_options += {"_autoflush": False} 

687 

688 execution_options = util.EMPTY_DICT.merge_with( 

689 execution_options, {"_sa_orm_load_options": load_options} 

690 ) 

691 result = ( 

692 session.execute( 

693 q, 

694 params=params, 

695 execution_options=execution_options, 

696 bind_arguments=bind_arguments, 

697 ) 

698 .unique() 

699 .scalars() 

700 ) 

701 

702 try: 

703 return result.one() 

704 except orm_exc.NoResultFound: 

705 return None 

706 

707 

708def _set_get_options( 

709 compile_opt, 

710 load_opt, 

711 populate_existing=None, 

712 version_check=None, 

713 only_load_props=None, 

714 refresh_state=None, 

715 identity_token=None, 

716 is_user_refresh=None, 

717): 

718 compile_options = {} 

719 load_options = {} 

720 if version_check: 

721 load_options["_version_check"] = version_check 

722 if populate_existing: 

723 load_options["_populate_existing"] = populate_existing 

724 if refresh_state: 

725 load_options["_refresh_state"] = refresh_state 

726 compile_options["_for_refresh_state"] = True 

727 if only_load_props: 

728 compile_options["_only_load_props"] = frozenset(only_load_props) 

729 if identity_token: 

730 load_options["_identity_token"] = identity_token 

731 

732 if is_user_refresh: 

733 load_options["_is_user_refresh"] = is_user_refresh 

734 if load_options: 

735 load_opt += load_options 

736 if compile_options: 

737 compile_opt += compile_options 

738 

739 return compile_opt, load_opt 

740 

741 

742def _setup_entity_query( 

743 compile_state, 

744 mapper, 

745 query_entity, 

746 path, 

747 adapter, 

748 column_collection, 

749 with_polymorphic=None, 

750 only_load_props=None, 

751 polymorphic_discriminator=None, 

752 **kw, 

753): 

754 if with_polymorphic: 

755 poly_properties = mapper._iterate_polymorphic_properties( 

756 with_polymorphic 

757 ) 

758 else: 

759 poly_properties = mapper._polymorphic_properties 

760 

761 quick_populators = {} 

762 

763 path.set(compile_state.attributes, "memoized_setups", quick_populators) 

764 

765 # for the lead entities in the path, e.g. not eager loads, and 

766 # assuming a user-passed aliased class, e.g. not a from_self() or any 

767 # implicit aliasing, don't add columns to the SELECT that aren't 

768 # in the thing that's aliased. 

769 check_for_adapt = adapter and len(path) == 1 and path[-1].is_aliased_class 

770 

771 for value in poly_properties: 

772 if only_load_props and value.key not in only_load_props: 

773 continue 

774 value.setup( 

775 compile_state, 

776 query_entity, 

777 path, 

778 adapter, 

779 only_load_props=only_load_props, 

780 column_collection=column_collection, 

781 memoized_populators=quick_populators, 

782 check_for_adapt=check_for_adapt, 

783 **kw, 

784 ) 

785 

786 if ( 

787 polymorphic_discriminator is not None 

788 and polymorphic_discriminator is not mapper.polymorphic_on 

789 ): 

790 if adapter: 

791 pd = adapter.columns[polymorphic_discriminator] 

792 else: 

793 pd = polymorphic_discriminator 

794 column_collection.append(pd) 

795 

796 

797def _warn_for_runid_changed(state): 

798 util.warn( 

799 "Loading context for %s has changed within a load/refresh " 

800 "handler, suggesting a row refresh operation took place. If this " 

801 "event handler is expected to be " 

802 "emitting row refresh operations within an existing load or refresh " 

803 "operation, set restore_load_context=True when establishing the " 

804 "listener to ensure the context remains unchanged when the event " 

805 "handler completes." % (state_str(state),) 

806 ) 

807 

808 

809def _instance_processor( 

810 query_entity, 

811 mapper, 

812 context, 

813 result, 

814 path, 

815 adapter, 

816 only_load_props=None, 

817 refresh_state=None, 

818 polymorphic_discriminator=None, 

819 _polymorphic_from=None, 

820): 

821 """Produce a mapper level row processor callable 

822 which processes rows into mapped instances.""" 

823 

824 # note that this method, most of which exists in a closure 

825 # called _instance(), resists being broken out, as 

826 # attempts to do so tend to add significant function 

827 # call overhead. _instance() is the most 

828 # performance-critical section in the whole ORM. 

829 

830 identity_class = mapper._identity_class 

831 compile_state = context.compile_state 

832 

833 # look for "row getter" functions that have been assigned along 

834 # with the compile state that were cached from a previous load. 

835 # these are operator.itemgetter() objects that each will extract a 

836 # particular column from each row. 

837 

838 getter_key = ("getters", mapper) 

839 getters = path.get(compile_state.attributes, getter_key, None) 

840 

841 if getters is None: 

842 # no getters, so go through a list of attributes we are loading for, 

843 # and the ones that are column based will have already put information 

844 # for us in another collection "memoized_setups", which represents the 

845 # output of the LoaderStrategy.setup_query() method. We can just as 

846 # easily call LoaderStrategy.create_row_processor for each, but by 

847 # getting it all at once from setup_query we save another method call 

848 # per attribute. 

849 props = mapper._prop_set 

850 if only_load_props is not None: 

851 props = props.intersection( 

852 mapper._props[k] for k in only_load_props 

853 ) 

854 

855 quick_populators = path.get( 

856 context.attributes, "memoized_setups", EMPTY_DICT 

857 ) 

858 

859 todo = [] 

860 cached_populators = { 

861 "new": [], 

862 "quick": [], 

863 "deferred": [], 

864 "expire": [], 

865 "existing": [], 

866 "eager": [], 

867 } 

868 

869 if refresh_state is None: 

870 # we can also get the "primary key" tuple getter function 

871 pk_cols = mapper.primary_key 

872 

873 if adapter: 

874 pk_cols = [adapter.columns[c] for c in pk_cols] 

875 primary_key_getter = result._tuple_getter(pk_cols) 

876 else: 

877 primary_key_getter = None 

878 

879 getters = { 

880 "cached_populators": cached_populators, 

881 "todo": todo, 

882 "primary_key_getter": primary_key_getter, 

883 } 

884 for prop in props: 

885 if prop in quick_populators: 

886 # this is an inlined path just for column-based attributes. 

887 col = quick_populators[prop] 

888 if col is _DEFER_FOR_STATE: 

889 cached_populators["new"].append( 

890 (prop.key, prop._deferred_column_loader) 

891 ) 

892 elif col is _SET_DEFERRED_EXPIRED: 

893 # note that in this path, we are no longer 

894 # searching in the result to see if the column might 

895 # be present in some unexpected way. 

896 cached_populators["expire"].append((prop.key, False)) 

897 elif col is _RAISE_FOR_STATE: 

898 cached_populators["new"].append( 

899 (prop.key, prop._raise_column_loader) 

900 ) 

901 else: 

902 getter = None 

903 if adapter: 

904 # this logic had been removed for all 1.4 releases 

905 # up until 1.4.18; the adapter here is particularly 

906 # the compound eager adapter which isn't accommodated 

907 # in the quick_populators right now. The "fallback" 

908 # logic below instead took over in many more cases 

909 # until issue #6596 was identified. 

910 

911 # note there is still an issue where this codepath 

912 # produces no "getter" for cases where a joined-inh 

913 # mapping includes a labeled column property, meaning 

914 # KeyError is caught internally and we fall back to 

915 # _getter(col), which works anyway. The adapter 

916 # here for joined inh without any aliasing might not 

917 # be useful. Tests which see this include 

918 # test.orm.inheritance.test_basic -> 

919 # EagerTargetingTest.test_adapt_stringency 

920 # OptimizedLoadTest.test_column_expression_joined 

921 # PolymorphicOnNotLocalTest.test_polymorphic_on_column_prop # noqa: E501 

922 # 

923 

924 adapted_col = adapter.columns[col] 

925 if adapted_col is not None: 

926 getter = result._getter(adapted_col, False) 

927 if not getter: 

928 getter = result._getter(col, False) 

929 if getter: 

930 cached_populators["quick"].append((prop.key, getter)) 

931 else: 

932 # fall back to the ColumnProperty itself, which 

933 # will iterate through all of its columns 

934 # to see if one fits 

935 prop.create_row_processor( 

936 context, 

937 query_entity, 

938 path, 

939 mapper, 

940 result, 

941 adapter, 

942 cached_populators, 

943 ) 

944 else: 

945 # loader strategies like subqueryload, selectinload, 

946 # joinedload, basically relationships, these need to interact 

947 # with the context each time to work correctly. 

948 todo.append(prop) 

949 

950 path.set(compile_state.attributes, getter_key, getters) 

951 

952 cached_populators = getters["cached_populators"] 

953 

954 populators = {key: list(value) for key, value in cached_populators.items()} 

955 for prop in getters["todo"]: 

956 prop.create_row_processor( 

957 context, query_entity, path, mapper, result, adapter, populators 

958 ) 

959 

960 propagated_loader_options = context.propagated_loader_options 

961 load_path = ( 

962 context.compile_state.current_path + path 

963 if context.compile_state.current_path.path 

964 else path 

965 ) 

966 

967 session_identity_map = context.session.identity_map 

968 

969 populate_existing = context.populate_existing or mapper.always_refresh 

970 load_evt = bool(mapper.class_manager.dispatch.load) 

971 refresh_evt = bool(mapper.class_manager.dispatch.refresh) 

972 persistent_evt = bool(context.session.dispatch.loaded_as_persistent) 

973 if persistent_evt: 

974 loaded_as_persistent = context.session.dispatch.loaded_as_persistent 

975 instance_state = attributes.instance_state 

976 instance_dict = attributes.instance_dict 

977 session_id = context.session.hash_key 

978 runid = context.runid 

979 identity_token = context.identity_token 

980 

981 version_check = context.version_check 

982 if version_check: 

983 version_id_col = mapper.version_id_col 

984 if version_id_col is not None: 

985 if adapter: 

986 version_id_col = adapter.columns[version_id_col] 

987 version_id_getter = result._getter(version_id_col) 

988 else: 

989 version_id_getter = None 

990 

991 if not refresh_state and _polymorphic_from is not None: 

992 key = ("loader", path.path) 

993 

994 if key in context.attributes and context.attributes[key].strategy == ( 

995 ("selectinload_polymorphic", True), 

996 ): 

997 option_entities = context.attributes[key].local_opts["entities"] 

998 else: 

999 option_entities = None 

1000 selectin_load_via = mapper._should_selectin_load( 

1001 option_entities, 

1002 _polymorphic_from, 

1003 ) 

1004 

1005 if selectin_load_via and selectin_load_via is not _polymorphic_from: 

1006 # only_load_props goes w/ refresh_state only, and in a refresh 

1007 # we are a single row query for the exact entity; polymorphic 

1008 # loading does not apply 

1009 assert only_load_props is None 

1010 

1011 if selectin_load_via.is_mapper: 

1012 _load_supers = [] 

1013 _endmost_mapper = selectin_load_via 

1014 while ( 

1015 _endmost_mapper 

1016 and _endmost_mapper is not _polymorphic_from 

1017 ): 

1018 _load_supers.append(_endmost_mapper) 

1019 _endmost_mapper = _endmost_mapper.inherits 

1020 else: 

1021 _load_supers = [selectin_load_via] 

1022 

1023 for _selectinload_entity in _load_supers: 

1024 if _PostLoad.path_exists( 

1025 context, load_path, _selectinload_entity 

1026 ): 

1027 continue 

1028 callable_ = _load_subclass_via_in( 

1029 context, 

1030 path, 

1031 _selectinload_entity, 

1032 _polymorphic_from, 

1033 option_entities, 

1034 ) 

1035 _PostLoad.callable_for_path( 

1036 context, 

1037 load_path, 

1038 _selectinload_entity.mapper, 

1039 _selectinload_entity, 

1040 callable_, 

1041 _selectinload_entity, 

1042 ) 

1043 

1044 post_load = _PostLoad.for_context(context, load_path, only_load_props) 

1045 

1046 if refresh_state: 

1047 refresh_identity_key = refresh_state.key 

1048 if refresh_identity_key is None: 

1049 # super-rare condition; a refresh is being called 

1050 # on a non-instance-key instance; this is meant to only 

1051 # occur within a flush() 

1052 refresh_identity_key = mapper._identity_key_from_state( 

1053 refresh_state 

1054 ) 

1055 else: 

1056 refresh_identity_key = None 

1057 

1058 primary_key_getter = getters["primary_key_getter"] 

1059 

1060 if mapper.allow_partial_pks: 

1061 is_not_primary_key = _none_set.issuperset 

1062 else: 

1063 is_not_primary_key = _none_set.intersection 

1064 

1065 def _instance(row): 

1066 # determine the state that we'll be populating 

1067 if refresh_identity_key: 

1068 # fixed state that we're refreshing 

1069 state = refresh_state 

1070 instance = state.obj() 

1071 dict_ = instance_dict(instance) 

1072 isnew = state.runid != runid 

1073 currentload = True 

1074 loaded_instance = False 

1075 else: 

1076 # look at the row, see if that identity is in the 

1077 # session, or we have to create a new one 

1078 identitykey = ( 

1079 identity_class, 

1080 primary_key_getter(row), 

1081 identity_token, 

1082 ) 

1083 

1084 instance = session_identity_map.get(identitykey) 

1085 

1086 if instance is not None: 

1087 # existing instance 

1088 state = instance_state(instance) 

1089 dict_ = instance_dict(instance) 

1090 

1091 isnew = state.runid != runid 

1092 currentload = not isnew 

1093 loaded_instance = False 

1094 

1095 if version_check and version_id_getter and not currentload: 

1096 _validate_version_id( 

1097 mapper, state, dict_, row, version_id_getter 

1098 ) 

1099 

1100 else: 

1101 # create a new instance 

1102 

1103 # check for non-NULL values in the primary key columns, 

1104 # else no entity is returned for the row 

1105 if is_not_primary_key(identitykey[1]): 

1106 return None 

1107 

1108 isnew = True 

1109 currentload = True 

1110 loaded_instance = True 

1111 

1112 instance = mapper.class_manager.new_instance() 

1113 

1114 dict_ = instance_dict(instance) 

1115 state = instance_state(instance) 

1116 state.key = identitykey 

1117 state.identity_token = identity_token 

1118 

1119 # attach instance to session. 

1120 state.session_id = session_id 

1121 session_identity_map._add_unpresent(state, identitykey) 

1122 

1123 effective_populate_existing = populate_existing 

1124 if refresh_state is state: 

1125 effective_populate_existing = True 

1126 

1127 # populate. this looks at whether this state is new 

1128 # for this load or was existing, and whether or not this 

1129 # row is the first row with this identity. 

1130 if currentload or effective_populate_existing: 

1131 # full population routines. Objects here are either 

1132 # just created, or we are doing a populate_existing 

1133 

1134 # be conservative about setting load_path when populate_existing 

1135 # is in effect; want to maintain options from the original 

1136 # load. see test_expire->test_refresh_maintains_deferred_options 

1137 if isnew and ( 

1138 propagated_loader_options or not effective_populate_existing 

1139 ): 

1140 state.load_options = propagated_loader_options 

1141 state.load_path = load_path 

1142 

1143 _populate_full( 

1144 context, 

1145 row, 

1146 state, 

1147 dict_, 

1148 isnew, 

1149 load_path, 

1150 loaded_instance, 

1151 effective_populate_existing, 

1152 populators, 

1153 ) 

1154 

1155 if isnew: 

1156 # state.runid should be equal to context.runid / runid 

1157 # here, however for event checks we are being more conservative 

1158 # and checking against existing run id 

1159 # assert state.runid == runid 

1160 

1161 existing_runid = state.runid 

1162 

1163 if loaded_instance: 

1164 if load_evt: 

1165 state.manager.dispatch.load(state, context) 

1166 if state.runid != existing_runid: 

1167 _warn_for_runid_changed(state) 

1168 if persistent_evt: 

1169 loaded_as_persistent(context.session, state) 

1170 if state.runid != existing_runid: 

1171 _warn_for_runid_changed(state) 

1172 elif refresh_evt: 

1173 state.manager.dispatch.refresh( 

1174 state, context, only_load_props 

1175 ) 

1176 if state.runid != runid: 

1177 _warn_for_runid_changed(state) 

1178 

1179 if effective_populate_existing or state.modified: 

1180 if refresh_state and only_load_props: 

1181 state._commit(dict_, only_load_props) 

1182 else: 

1183 state._commit_all(dict_, session_identity_map) 

1184 

1185 if post_load: 

1186 post_load.add_state(state, True) 

1187 

1188 else: 

1189 # partial population routines, for objects that were already 

1190 # in the Session, but a row matches them; apply eager loaders 

1191 # on existing objects, etc. 

1192 unloaded = state.unloaded 

1193 isnew = state not in context.partials 

1194 

1195 if not isnew or unloaded or populators["eager"]: 

1196 # state is having a partial set of its attributes 

1197 # refreshed. Populate those attributes, 

1198 # and add to the "context.partials" collection. 

1199 

1200 to_load = _populate_partial( 

1201 context, 

1202 row, 

1203 state, 

1204 dict_, 

1205 isnew, 

1206 load_path, 

1207 unloaded, 

1208 populators, 

1209 ) 

1210 

1211 if isnew: 

1212 if refresh_evt: 

1213 existing_runid = state.runid 

1214 state.manager.dispatch.refresh(state, context, to_load) 

1215 if state.runid != existing_runid: 

1216 _warn_for_runid_changed(state) 

1217 

1218 state._commit(dict_, to_load) 

1219 

1220 if post_load and context.invoke_all_eagers: 

1221 post_load.add_state(state, False) 

1222 

1223 return instance 

1224 

1225 if mapper.polymorphic_map and not _polymorphic_from and not refresh_state: 

1226 # if we are doing polymorphic, dispatch to a different _instance() 

1227 # method specific to the subclass mapper 

1228 def ensure_no_pk(row): 

1229 identitykey = ( 

1230 identity_class, 

1231 primary_key_getter(row), 

1232 identity_token, 

1233 ) 

1234 if not is_not_primary_key(identitykey[1]): 

1235 return identitykey 

1236 else: 

1237 return None 

1238 

1239 _instance = _decorate_polymorphic_switch( 

1240 _instance, 

1241 context, 

1242 query_entity, 

1243 mapper, 

1244 result, 

1245 path, 

1246 polymorphic_discriminator, 

1247 adapter, 

1248 ensure_no_pk, 

1249 ) 

1250 

1251 return _instance 

1252 

1253 

1254def _load_subclass_via_in( 

1255 context, path, entity, polymorphic_from, option_entities 

1256): 

1257 mapper = entity.mapper 

1258 

1259 # TODO: polymorphic_from seems to be a Mapper in all cases. 

1260 # this is likely not needed, but as we dont have typing in loading.py 

1261 # yet, err on the safe side 

1262 polymorphic_from_mapper = polymorphic_from.mapper 

1263 not_against_basemost = polymorphic_from_mapper.inherits is not None 

1264 

1265 zero_idx = len(mapper.base_mapper.primary_key) == 1 

1266 

1267 if entity.is_aliased_class or not_against_basemost: 

1268 q, enable_opt, disable_opt = mapper._subclass_load_via_in( 

1269 entity, polymorphic_from 

1270 ) 

1271 else: 

1272 q, enable_opt, disable_opt = mapper._subclass_load_via_in_mapper 

1273 

1274 def do_load(context, path, states, load_only, effective_entity): 

1275 if not option_entities: 

1276 # filter out states for those that would have selectinloaded 

1277 # from another loader 

1278 # TODO: we are currently ignoring the case where the 

1279 # "selectin_polymorphic" option is used, as this is much more 

1280 # complex / specific / very uncommon API use 

1281 states = [ 

1282 (s, v) 

1283 for s, v in states 

1284 if s.mapper._would_selectin_load_only_from_given_mapper(mapper) 

1285 ] 

1286 

1287 if not states: 

1288 return 

1289 

1290 orig_query = context.query 

1291 

1292 if path.parent: 

1293 enable_opt_lcl = enable_opt._prepend_path(path) 

1294 disable_opt_lcl = disable_opt._prepend_path(path) 

1295 else: 

1296 enable_opt_lcl = enable_opt 

1297 disable_opt_lcl = disable_opt 

1298 options = ( 

1299 (enable_opt_lcl,) + orig_query._with_options + (disable_opt_lcl,) 

1300 ) 

1301 

1302 q2 = q.options(*options) 

1303 

1304 q2._compile_options = context.compile_state.default_compile_options 

1305 q2._compile_options += {"_current_path": path.parent} 

1306 

1307 if context.populate_existing: 

1308 q2 = q2.execution_options(populate_existing=True) 

1309 

1310 while states: 

1311 chunk = states[0 : _SelectInLoader._chunksize] 

1312 states = states[_SelectInLoader._chunksize :] 

1313 context.session.execute( 

1314 q2, 

1315 dict( 

1316 primary_keys=[ 

1317 state.key[1][0] if zero_idx else state.key[1] 

1318 for state, load_attrs in chunk 

1319 ] 

1320 ), 

1321 ).unique().scalars().all() 

1322 

1323 return do_load 

1324 

1325 

1326def _populate_full( 

1327 context, 

1328 row, 

1329 state, 

1330 dict_, 

1331 isnew, 

1332 load_path, 

1333 loaded_instance, 

1334 populate_existing, 

1335 populators, 

1336): 

1337 if isnew: 

1338 # first time we are seeing a row with this identity. 

1339 state.runid = context.runid 

1340 

1341 for key, getter in populators["quick"]: 

1342 dict_[key] = getter(row) 

1343 if populate_existing: 

1344 for key, set_callable in populators["expire"]: 

1345 dict_.pop(key, None) 

1346 if set_callable: 

1347 state.expired_attributes.add(key) 

1348 else: 

1349 for key, set_callable in populators["expire"]: 

1350 if set_callable: 

1351 state.expired_attributes.add(key) 

1352 

1353 for key, populator in populators["new"]: 

1354 populator(state, dict_, row) 

1355 

1356 elif load_path != state.load_path: 

1357 # new load path, e.g. object is present in more than one 

1358 # column position in a series of rows 

1359 state.load_path = load_path 

1360 

1361 # if we have data, and the data isn't in the dict, OK, let's put 

1362 # it in. 

1363 for key, getter in populators["quick"]: 

1364 if key not in dict_: 

1365 dict_[key] = getter(row) 

1366 

1367 # otherwise treat like an "already seen" row 

1368 for key, populator in populators["existing"]: 

1369 populator(state, dict_, row) 

1370 # TODO: allow "existing" populator to know this is 

1371 # a new path for the state: 

1372 # populator(state, dict_, row, new_path=True) 

1373 

1374 else: 

1375 # have already seen rows with this identity in this same path. 

1376 for key, populator in populators["existing"]: 

1377 populator(state, dict_, row) 

1378 

1379 # TODO: same path 

1380 # populator(state, dict_, row, new_path=False) 

1381 

1382 

1383def _populate_partial( 

1384 context, row, state, dict_, isnew, load_path, unloaded, populators 

1385): 

1386 if not isnew: 

1387 if unloaded: 

1388 # extra pass, see #8166 

1389 for key, getter in populators["quick"]: 

1390 if key in unloaded: 

1391 dict_[key] = getter(row) 

1392 

1393 to_load = context.partials[state] 

1394 for key, populator in populators["existing"]: 

1395 if key in to_load: 

1396 populator(state, dict_, row) 

1397 else: 

1398 to_load = unloaded 

1399 context.partials[state] = to_load 

1400 

1401 for key, getter in populators["quick"]: 

1402 if key in to_load: 

1403 dict_[key] = getter(row) 

1404 for key, set_callable in populators["expire"]: 

1405 if key in to_load: 

1406 dict_.pop(key, None) 

1407 if set_callable: 

1408 state.expired_attributes.add(key) 

1409 for key, populator in populators["new"]: 

1410 if key in to_load: 

1411 populator(state, dict_, row) 

1412 

1413 for key, populator in populators["eager"]: 

1414 if key not in unloaded: 

1415 populator(state, dict_, row) 

1416 

1417 return to_load 

1418 

1419 

1420def _validate_version_id(mapper, state, dict_, row, getter): 

1421 if mapper._get_state_attr_by_column( 

1422 state, dict_, mapper.version_id_col 

1423 ) != getter(row): 

1424 raise orm_exc.StaleDataError( 

1425 "Instance '%s' has version id '%s' which " 

1426 "does not match database-loaded version id '%s'." 

1427 % ( 

1428 state_str(state), 

1429 mapper._get_state_attr_by_column( 

1430 state, dict_, mapper.version_id_col 

1431 ), 

1432 getter(row), 

1433 ) 

1434 ) 

1435 

1436 

1437def _decorate_polymorphic_switch( 

1438 instance_fn, 

1439 context, 

1440 query_entity, 

1441 mapper, 

1442 result, 

1443 path, 

1444 polymorphic_discriminator, 

1445 adapter, 

1446 ensure_no_pk, 

1447): 

1448 if polymorphic_discriminator is not None: 

1449 polymorphic_on = polymorphic_discriminator 

1450 else: 

1451 polymorphic_on = mapper.polymorphic_on 

1452 if polymorphic_on is None: 

1453 return instance_fn 

1454 

1455 if adapter: 

1456 polymorphic_on = adapter.columns[polymorphic_on] 

1457 

1458 def configure_subclass_mapper(discriminator): 

1459 try: 

1460 sub_mapper = mapper.polymorphic_map[discriminator] 

1461 except KeyError: 

1462 raise AssertionError( 

1463 "No such polymorphic_identity %r is defined" % discriminator 

1464 ) 

1465 else: 

1466 if sub_mapper is mapper: 

1467 return None 

1468 elif not sub_mapper.isa(mapper): 

1469 return False 

1470 

1471 return _instance_processor( 

1472 query_entity, 

1473 sub_mapper, 

1474 context, 

1475 result, 

1476 path, 

1477 adapter, 

1478 _polymorphic_from=mapper, 

1479 ) 

1480 

1481 polymorphic_instances = util.PopulateDict(configure_subclass_mapper) 

1482 

1483 getter = result._getter(polymorphic_on) 

1484 

1485 def polymorphic_instance(row): 

1486 discriminator = getter(row) 

1487 if discriminator is not None: 

1488 _instance = polymorphic_instances[discriminator] 

1489 if _instance: 

1490 return _instance(row) 

1491 elif _instance is False: 

1492 identitykey = ensure_no_pk(row) 

1493 

1494 if identitykey: 

1495 raise sa_exc.InvalidRequestError( 

1496 "Row with identity key %s can't be loaded into an " 

1497 "object; the polymorphic discriminator column '%s' " 

1498 "refers to %s, which is not a sub-mapper of " 

1499 "the requested %s" 

1500 % ( 

1501 identitykey, 

1502 polymorphic_on, 

1503 mapper.polymorphic_map[discriminator], 

1504 mapper, 

1505 ) 

1506 ) 

1507 else: 

1508 return None 

1509 else: 

1510 return instance_fn(row) 

1511 else: 

1512 identitykey = ensure_no_pk(row) 

1513 

1514 if identitykey: 

1515 raise sa_exc.InvalidRequestError( 

1516 "Row with identity key %s can't be loaded into an " 

1517 "object; the polymorphic discriminator column '%s' is " 

1518 "NULL" % (identitykey, polymorphic_on) 

1519 ) 

1520 else: 

1521 return None 

1522 

1523 return polymorphic_instance 

1524 

1525 

1526class _PostLoad: 

1527 """Track loaders and states for "post load" operations.""" 

1528 

1529 __slots__ = "loaders", "states", "load_keys" 

1530 

1531 def __init__(self): 

1532 self.loaders = {} 

1533 self.states = util.OrderedDict() 

1534 self.load_keys = None 

1535 

1536 def add_state(self, state, overwrite): 

1537 # the states for a polymorphic load here are all shared 

1538 # within a single PostLoad object among multiple subtypes. 

1539 # Filtering of callables on a per-subclass basis needs to be done at 

1540 # the invocation level 

1541 self.states[state] = overwrite 

1542 

1543 def invoke(self, context, path): 

1544 if not self.states: 

1545 return 

1546 path = path_registry.PathRegistry.coerce(path) 

1547 for ( 

1548 effective_context, 

1549 token, 

1550 limit_to_mapper, 

1551 loader, 

1552 arg, 

1553 kw, 

1554 ) in self.loaders.values(): 

1555 states = [ 

1556 (state, overwrite) 

1557 for state, overwrite in self.states.items() 

1558 if state.manager.mapper.isa(limit_to_mapper) 

1559 ] 

1560 if states: 

1561 loader( 

1562 effective_context, path, states, self.load_keys, *arg, **kw 

1563 ) 

1564 self.states.clear() 

1565 

1566 @classmethod 

1567 def for_context(cls, context, path, only_load_props): 

1568 pl = context.post_load_paths.get(path.path) 

1569 if pl is not None and only_load_props: 

1570 pl.load_keys = only_load_props 

1571 return pl 

1572 

1573 @classmethod 

1574 def path_exists(self, context, path, key): 

1575 return ( 

1576 path.path in context.post_load_paths 

1577 and key in context.post_load_paths[path.path].loaders 

1578 ) 

1579 

1580 @classmethod 

1581 def callable_for_path( 

1582 cls, context, path, limit_to_mapper, token, loader_callable, *arg, **kw 

1583 ): 

1584 if path.path in context.post_load_paths: 

1585 pl = context.post_load_paths[path.path] 

1586 else: 

1587 pl = context.post_load_paths[path.path] = _PostLoad() 

1588 pl.loaders[token] = ( 

1589 context, 

1590 token, 

1591 limit_to_mapper, 

1592 loader_callable, 

1593 arg, 

1594 kw, 

1595 ) 

1596 

1597 

1598def _load_scalar_attributes(mapper, state, attribute_names, passive): 

1599 """initiate a column-based attribute refresh operation.""" 

1600 

1601 # assert mapper is _state_mapper(state) 

1602 session = state.session 

1603 if not session: 

1604 raise orm_exc.DetachedInstanceError( 

1605 "Instance %s is not bound to a Session; " 

1606 "attribute refresh operation cannot proceed" % (state_str(state)) 

1607 ) 

1608 

1609 no_autoflush = bool(passive & attributes.NO_AUTOFLUSH) 

1610 

1611 # in the case of inheritance, particularly concrete and abstract 

1612 # concrete inheritance, the class manager might have some keys 

1613 # of attributes on the superclass that we didn't actually map. 

1614 # These could be mapped as "concrete, don't load" or could be completely 

1615 # excluded from the mapping and we know nothing about them. Filter them 

1616 # here to prevent them from coming through. 

1617 if attribute_names: 

1618 attribute_names = attribute_names.intersection(mapper.attrs.keys()) 

1619 

1620 if mapper.inherits and not mapper.concrete: 

1621 # load based on committed attributes in the object, formed into 

1622 # a truncated SELECT that only includes relevant tables. does not 

1623 # currently use state.key 

1624 statement = mapper._optimized_get_statement(state, attribute_names) 

1625 if statement is not None: 

1626 # undefer() isn't needed here because statement has the 

1627 # columns needed already, this implicitly undefers that column 

1628 stmt = FromStatement(mapper, statement) 

1629 

1630 return _load_on_ident( 

1631 session, 

1632 stmt, 

1633 None, 

1634 only_load_props=attribute_names, 

1635 refresh_state=state, 

1636 no_autoflush=no_autoflush, 

1637 ) 

1638 

1639 # normal load, use state.key as the identity to SELECT 

1640 has_key = bool(state.key) 

1641 

1642 if has_key: 

1643 identity_key = state.key 

1644 else: 

1645 # this codepath is rare - only valid when inside a flush, and the 

1646 # object is becoming persistent but hasn't yet been assigned 

1647 # an identity_key. 

1648 # check here to ensure we have the attrs we need. 

1649 pk_attrs = [ 

1650 mapper._columntoproperty[col].key for col in mapper.primary_key 

1651 ] 

1652 if state.expired_attributes.intersection(pk_attrs): 

1653 raise sa_exc.InvalidRequestError( 

1654 "Instance %s cannot be refreshed - it's not " 

1655 " persistent and does not " 

1656 "contain a full primary key." % state_str(state) 

1657 ) 

1658 identity_key = mapper._identity_key_from_state(state) 

1659 

1660 if ( 

1661 _none_set.issubset(identity_key) and not mapper.allow_partial_pks 

1662 ) or _none_set.issuperset(identity_key): 

1663 util.warn_limited( 

1664 "Instance %s to be refreshed doesn't " 

1665 "contain a full primary key - can't be refreshed " 

1666 "(and shouldn't be expired, either).", 

1667 state_str(state), 

1668 ) 

1669 return 

1670 

1671 result = _load_on_ident( 

1672 session, 

1673 select(mapper), 

1674 identity_key, 

1675 refresh_state=state, 

1676 only_load_props=attribute_names, 

1677 no_autoflush=no_autoflush, 

1678 ) 

1679 

1680 # if instance is pending, a refresh operation 

1681 # may not complete (even if PK attributes are assigned) 

1682 if has_key and result is None: 

1683 raise orm_exc.ObjectDeletedError(state)