1# orm/loading.py
2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: ignore-errors
8
9
10"""private module containing functions used to convert database
11rows into object instances and associated state.
12
13the functions here are called primarily by Query, Mapper,
14as well as some of the attribute loading strategies.
15
16"""
17
18from __future__ import annotations
19
20from typing import Any
21from typing import Dict
22from typing import Iterable
23from typing import List
24from typing import Mapping
25from typing import Optional
26from typing import Sequence
27from typing import Tuple
28from typing import TYPE_CHECKING
29from typing import TypeVar
30from typing import Union
31
32from . import attributes
33from . import exc as orm_exc
34from . import path_registry
35from .base import _DEFER_FOR_STATE
36from .base import _RAISE_FOR_STATE
37from .base import _SET_DEFERRED_EXPIRED
38from .base import PassiveFlag
39from .context import _ORMCompileState
40from .context import FromStatement
41from .context import QueryContext
42from .strategies import _SelectInLoader
43from .util import _none_set
44from .util import state_str
45from .. import exc as sa_exc
46from .. import util
47from ..engine import result_tuple
48from ..engine.result import ChunkedIteratorResult
49from ..engine.result import FrozenResult
50from ..engine.result import SimpleResultMetaData
51from ..sql import select
52from ..sql import util as sql_util
53from ..sql.selectable import ForUpdateArg
54from ..sql.selectable import SelectState
55from ..util import EMPTY_DICT
56from ..util.typing import TupleAny
57from ..util.typing import Unpack
58
59if TYPE_CHECKING:
60 from ._typing import _IdentityKeyType
61 from .base import LoaderCallableStatus
62 from .interfaces import ORMOption
63 from .mapper import Mapper
64 from .query import Query
65 from .session import Session
66 from .state import InstanceState
67 from ..engine.cursor import CursorResult
68 from ..engine.interfaces import _ExecuteOptions
69 from ..engine.result import Result
70 from ..sql import Select
71
72_T = TypeVar("_T", bound=Any)
73_O = TypeVar("_O", bound=object)
74_new_runid = util.counter()
75
76
77_PopulatorDict = Dict[str, List[Tuple[str, Any]]]
78
79
80def instances(
81 cursor: CursorResult[Unpack[TupleAny]], context: QueryContext
82) -> Result[Unpack[TupleAny]]:
83 """Return a :class:`.Result` given an ORM query context.
84
85 :param cursor: a :class:`.CursorResult`, generated by a statement
86 which came from :class:`.ORMCompileState`
87
88 :param context: a :class:`.QueryContext` object
89
90 :return: a :class:`.Result` object representing ORM results
91
92 .. versionchanged:: 1.4 The instances() function now uses
93 :class:`.Result` objects and has an all new interface.
94
95 """
96
97 context.runid = _new_runid()
98
99 if context.top_level_context:
100 is_top_level = False
101 context.post_load_paths = context.top_level_context.post_load_paths
102 else:
103 is_top_level = True
104 context.post_load_paths = {}
105
106 compile_state = context.compile_state
107 filtered = compile_state._has_mapper_entities
108 single_entity = (
109 not context.load_options._only_return_tuples
110 and len(compile_state._entities) == 1
111 and compile_state._entities[0].supports_single_entity
112 )
113
114 try:
115 process, labels, extra = list(
116 zip(
117 *[
118 query_entity.row_processor(context, cursor)
119 for query_entity in context.compile_state._entities
120 ]
121 )
122 )
123
124 if context.yield_per and (
125 context.loaders_require_buffering
126 or context.loaders_require_uniquing
127 ):
128 raise sa_exc.InvalidRequestError(
129 "Can't use yield_per with eager loaders that require uniquing "
130 "or row buffering, e.g. joinedload() against collections "
131 "or subqueryload(). Consider the selectinload() strategy "
132 "for better flexibility in loading objects."
133 )
134
135 except Exception:
136 with util.safe_reraise():
137 cursor.close()
138
139 def _not_hashable(datatype, *, legacy=False, uncertain=False):
140 if not legacy:
141
142 def go(obj):
143 if uncertain:
144 try:
145 return hash(obj)
146 except:
147 pass
148
149 raise sa_exc.InvalidRequestError(
150 "Can't apply uniqueness to row tuple containing value of "
151 f"""type {datatype!r}; {
152 'the values returned appear to be'
153 if uncertain
154 else 'this datatype produces'
155 } non-hashable values"""
156 )
157
158 return go
159 elif not uncertain:
160 return id
161 else:
162 _use_id = False
163
164 def go(obj):
165 nonlocal _use_id
166
167 if not _use_id:
168 try:
169 return hash(obj)
170 except:
171 pass
172
173 # in #10459, we considered using a warning here, however
174 # as legacy query uses result.unique() in all cases, this
175 # would lead to too many warning cases.
176 _use_id = True
177
178 return id(obj)
179
180 return go
181
182 _uniquing_is_active = False
183
184 def _create_unique_filters(result):
185 nonlocal _uniquing_is_active
186
187 if result._yield_per:
188 raise sa_exc.InvalidRequestError(
189 "Can't use the ORM yield_per feature "
190 "in conjunction with unique()"
191 )
192
193 _uniquing_is_active = True
194 return [
195 (
196 _not_hashable(
197 ent.column.type, # type: ignore
198 legacy=context.load_options._legacy_uniquing,
199 uncertain=ent._null_column_type,
200 )
201 if (
202 not ent.use_id_for_hash
203 and (ent._non_hashable_value or ent._null_column_type)
204 )
205 else id if ent.use_id_for_hash else None
206 )
207 for ent in context.compile_state._entities
208 ]
209
210 row_metadata = SimpleResultMetaData(
211 labels, extra, _create_unique_filters=_create_unique_filters
212 )
213
214 def chunks(size): # type: ignore
215 while True:
216 yield_per = size
217
218 context.partials = {}
219
220 if yield_per:
221 if _uniquing_is_active:
222 raise sa_exc.InvalidRequestError(
223 "Can't use the ORM yield_per feature "
224 "in conjunction with unique()"
225 )
226 fetch = cursor.fetchmany(yield_per)
227
228 if not fetch:
229 break
230 else:
231 fetch = cursor._raw_all_tuples()
232
233 if single_entity:
234 proc = process[0]
235 rows = [proc(row) for row in fetch]
236 else:
237 rows = [
238 tuple([proc(row) for proc in process]) for row in fetch
239 ]
240
241 # if we are the originating load from a query, meaning we
242 # aren't being called as a result of a nested "post load",
243 # iterate through all the collected post loaders and fire them
244 # off. Previously this used to work recursively, however that
245 # prevented deeply nested structures from being loadable
246 if is_top_level:
247 if yield_per:
248 # if using yield per, memoize the state of the
249 # collection so that it can be restored
250 top_level_post_loads = list(
251 context.post_load_paths.items()
252 )
253
254 while context.post_load_paths:
255 post_loads = list(context.post_load_paths.items())
256 context.post_load_paths.clear()
257 for path, post_load in post_loads:
258 post_load.invoke(context, path)
259
260 if yield_per:
261 context.post_load_paths.clear()
262 context.post_load_paths.update(top_level_post_loads)
263
264 yield rows
265
266 if not yield_per:
267 break
268
269 if context.execution_options.get("prebuffer_rows", False):
270 # this is a bit of a hack at the moment.
271 # I would rather have some option in the result to pre-buffer
272 # internally.
273 _prebuffered = list(chunks(None))
274
275 def chunks(size):
276 return iter(_prebuffered)
277
278 result = ChunkedIteratorResult(
279 row_metadata,
280 chunks,
281 source_supports_scalars=single_entity,
282 raw=cursor,
283 dynamic_yield_per=cursor.context._is_server_side,
284 context=context,
285 )
286
287 # filtered and single_entity are used to indicate to legacy Query that the
288 # query has ORM entities, so legacy deduping and scalars should be called
289 # on the result.
290 result._attributes = result._attributes.union(
291 dict(filtered=filtered, is_single_entity=single_entity)
292 )
293
294 # multi_row_eager_loaders OTOH is specific to joinedload.
295 if context.requires_uniquing:
296
297 def require_unique(obj):
298 raise sa_exc.InvalidRequestError(
299 "The unique() method must be invoked on this Result, "
300 "as it contains results that include joined eager loads "
301 "against collections"
302 )
303
304 result._unique_filter_state = (set(), require_unique)
305
306 if context.yield_per:
307 result.yield_per(context.yield_per)
308
309 return result
310
311
312@util.preload_module("sqlalchemy.orm.context")
313def merge_frozen_result(session, statement, frozen_result, load=True):
314 """Merge a :class:`_engine.FrozenResult` back into a :class:`_orm.Session`,
315 returning a new :class:`_engine.Result` object with :term:`persistent`
316 objects.
317
318 See the section :ref:`do_orm_execute_re_executing` for an example.
319
320 .. seealso::
321
322 :ref:`do_orm_execute_re_executing`
323
324 :meth:`_engine.Result.freeze`
325
326 :class:`_engine.FrozenResult`
327
328 """
329 querycontext = util.preloaded.orm_context
330
331 if load:
332 # flush current contents if we expect to load data
333 session._autoflush()
334
335 ctx = querycontext._ORMSelectCompileState._create_entities_collection(
336 statement, legacy=False
337 )
338
339 with session.no_autoflush:
340 mapped_entities = [
341 i
342 for i, e in enumerate(ctx._entities)
343 if isinstance(e, querycontext._MapperEntity)
344 ]
345 keys = [ent._label_name for ent in ctx._entities]
346
347 keyed_tuple = result_tuple(
348 keys, [ent._extra_entities for ent in ctx._entities]
349 )
350
351 result = []
352 for newrow in frozen_result._rewrite_rows():
353 for i in mapped_entities:
354 if newrow[i] is not None:
355 newrow[i] = session._merge(
356 attributes.instance_state(newrow[i]),
357 attributes.instance_dict(newrow[i]),
358 load=load,
359 _recursive={},
360 _resolve_conflict_map={},
361 )
362
363 result.append(keyed_tuple(newrow))
364
365 return frozen_result.with_new_rows(result)
366
367
368@util.became_legacy_20(
369 ":func:`_orm.merge_result`",
370 alternative="The function as well as the method on :class:`_orm.Query` "
371 "is superseded by the :func:`_orm.merge_frozen_result` function.",
372)
373@util.preload_module("sqlalchemy.orm.context")
374def merge_result(
375 query: Query[Any],
376 iterator: Union[FrozenResult, Iterable[Sequence[Any]], Iterable[object]],
377 load: bool = True,
378) -> Union[FrozenResult, Iterable[Any]]:
379 """Merge a result into the given :class:`.Query` object's Session.
380
381 See :meth:`_orm.Query.merge_result` for top-level documentation on this
382 function.
383
384 """
385
386 querycontext = util.preloaded.orm_context
387
388 session = query.session
389 if load:
390 # flush current contents if we expect to load data
391 session._autoflush()
392
393 # TODO: need test coverage and documentation for the FrozenResult
394 # use case.
395 if isinstance(iterator, FrozenResult):
396 frozen_result = iterator
397 iterator = iter(frozen_result.data)
398 else:
399 frozen_result = None
400
401 ctx = querycontext._ORMSelectCompileState._create_entities_collection(
402 query, legacy=True
403 )
404
405 autoflush = session.autoflush
406 try:
407 session.autoflush = False
408 single_entity = not frozen_result and len(ctx._entities) == 1
409
410 if single_entity:
411 if isinstance(ctx._entities[0], querycontext._MapperEntity):
412 result = [
413 session._merge(
414 attributes.instance_state(instance),
415 attributes.instance_dict(instance),
416 load=load,
417 _recursive={},
418 _resolve_conflict_map={},
419 )
420 for instance in iterator
421 ]
422 else:
423 result = list(iterator)
424 else:
425 mapped_entities = [
426 i
427 for i, e in enumerate(ctx._entities)
428 if isinstance(e, querycontext._MapperEntity)
429 ]
430 result = []
431 keys = [ent._label_name for ent in ctx._entities]
432
433 keyed_tuple = result_tuple(
434 keys, [ent._extra_entities for ent in ctx._entities]
435 )
436
437 for row in iterator:
438 newrow = list(row)
439 for i in mapped_entities:
440 if newrow[i] is not None:
441 newrow[i] = session._merge(
442 attributes.instance_state(newrow[i]),
443 attributes.instance_dict(newrow[i]),
444 load=load,
445 _recursive={},
446 _resolve_conflict_map={},
447 )
448 result.append(keyed_tuple(newrow))
449
450 if frozen_result:
451 return frozen_result.with_new_rows(result)
452 else:
453 return iter(result)
454 finally:
455 session.autoflush = autoflush
456
457
458def get_from_identity(
459 session: Session,
460 mapper: Mapper[_O],
461 key: _IdentityKeyType[_O],
462 passive: PassiveFlag,
463) -> Union[LoaderCallableStatus, Optional[_O]]:
464 """Look up the given key in the given session's identity map,
465 check the object for expired state if found.
466
467 """
468 instance = session.identity_map.get(key)
469 if instance is not None:
470 state = attributes.instance_state(instance)
471
472 if mapper.inherits and not state.mapper.isa(mapper):
473 return attributes.PASSIVE_CLASS_MISMATCH
474
475 # expired - ensure it still exists
476 if state.expired:
477 if not passive & attributes.SQL_OK:
478 # TODO: no coverage here
479 return attributes.PASSIVE_NO_RESULT
480 elif not passive & attributes.RELATED_OBJECT_OK:
481 # this mode is used within a flush and the instance's
482 # expired state will be checked soon enough, if necessary.
483 # also used by immediateloader for a mutually-dependent
484 # o2m->m2m load, :ticket:`6301`
485 return instance
486 try:
487 state._load_expired(state, passive)
488 except orm_exc.ObjectDeletedError:
489 session._remove_newly_deleted([state])
490 return None
491 return instance
492 else:
493 return None
494
495
496def _load_on_ident(
497 session: Session,
498 statement: Union[Select, FromStatement],
499 key: Optional[_IdentityKeyType],
500 *,
501 load_options: Optional[Sequence[ORMOption]] = None,
502 refresh_state: Optional[InstanceState[Any]] = None,
503 with_for_update: Optional[ForUpdateArg] = None,
504 only_load_props: Optional[Iterable[str]] = None,
505 no_autoflush: bool = False,
506 bind_arguments: Mapping[str, Any] = util.EMPTY_DICT,
507 execution_options: _ExecuteOptions = util.EMPTY_DICT,
508 require_pk_cols: bool = False,
509 is_user_refresh: bool = False,
510):
511 """Load the given identity key from the database."""
512 if key is not None:
513 ident = key[1]
514 identity_token = key[2]
515 else:
516 ident = identity_token = None
517
518 return _load_on_pk_identity(
519 session,
520 statement,
521 ident,
522 load_options=load_options,
523 refresh_state=refresh_state,
524 with_for_update=with_for_update,
525 only_load_props=only_load_props,
526 identity_token=identity_token,
527 no_autoflush=no_autoflush,
528 bind_arguments=bind_arguments,
529 execution_options=execution_options,
530 require_pk_cols=require_pk_cols,
531 is_user_refresh=is_user_refresh,
532 )
533
534
535def _load_on_pk_identity(
536 session: Session,
537 statement: Union[Select, FromStatement],
538 primary_key_identity: Optional[Tuple[Any, ...]],
539 *,
540 load_options: Optional[Sequence[ORMOption]] = None,
541 refresh_state: Optional[InstanceState[Any]] = None,
542 with_for_update: Optional[ForUpdateArg] = None,
543 only_load_props: Optional[Iterable[str]] = None,
544 identity_token: Optional[Any] = None,
545 no_autoflush: bool = False,
546 bind_arguments: Mapping[str, Any] = util.EMPTY_DICT,
547 execution_options: _ExecuteOptions = util.EMPTY_DICT,
548 require_pk_cols: bool = False,
549 is_user_refresh: bool = False,
550):
551 """Load the given primary key identity from the database."""
552
553 query = statement
554 q = query._clone()
555
556 assert not q._is_lambda_element
557
558 if load_options is None:
559 load_options = QueryContext.default_load_options
560
561 if (
562 statement._compile_options
563 is SelectState.default_select_compile_options
564 ):
565 compile_options = _ORMCompileState.default_compile_options
566 else:
567 compile_options = statement._compile_options
568
569 if primary_key_identity is not None:
570 mapper = query._propagate_attrs["plugin_subject"]
571
572 _get_clause, _get_params = mapper._get_clause
573
574 # None present in ident - turn those comparisons
575 # into "IS NULL"
576 if None in primary_key_identity:
577 nones = {
578 _get_params[col].key
579 for col, value in zip(mapper.primary_key, primary_key_identity)
580 if value is None
581 }
582
583 _get_clause = sql_util.adapt_criterion_to_null(_get_clause, nones)
584
585 if len(nones) == len(primary_key_identity):
586 util.warn(
587 "fully NULL primary key identity cannot load any "
588 "object. This condition may raise an error in a future "
589 "release."
590 )
591
592 q._where_criteria = (_get_clause,)
593
594 params = {
595 _get_params[primary_key].key: id_val
596 for id_val, primary_key in zip(
597 primary_key_identity, mapper.primary_key
598 )
599 }
600 else:
601 params = None
602
603 if with_for_update is not None:
604 version_check = True
605 q._for_update_arg = with_for_update
606 elif query._for_update_arg is not None:
607 version_check = True
608 q._for_update_arg = query._for_update_arg
609 else:
610 version_check = False
611
612 if require_pk_cols and only_load_props:
613 if not refresh_state:
614 raise sa_exc.ArgumentError(
615 "refresh_state is required when require_pk_cols is present"
616 )
617
618 refresh_state_prokeys = refresh_state.mapper._primary_key_propkeys
619 has_changes = {
620 key
621 for key in refresh_state_prokeys.difference(only_load_props)
622 if refresh_state.attrs[key].history.has_changes()
623 }
624 if has_changes:
625 # raise if pending pk changes are present.
626 # technically, this could be limited to the case where we have
627 # relationships in the only_load_props collection to be refreshed
628 # also (and only ones that have a secondary eager loader, at that).
629 # however, the error is in place across the board so that behavior
630 # here is easier to predict. The use case it prevents is one
631 # of mutating PK attrs, leaving them unflushed,
632 # calling session.refresh(), and expecting those attrs to remain
633 # still unflushed. It seems likely someone doing all those
634 # things would be better off having the PK attributes flushed
635 # to the database before tinkering like that (session.refresh() is
636 # tinkering).
637 raise sa_exc.InvalidRequestError(
638 f"Please flush pending primary key changes on "
639 "attributes "
640 f"{has_changes} for mapper {refresh_state.mapper} before "
641 "proceeding with a refresh"
642 )
643
644 # overall, the ORM has no internal flow right now for "dont load the
645 # primary row of an object at all, but fire off
646 # selectinload/subqueryload/immediateload for some relationships".
647 # It would probably be a pretty big effort to add such a flow. So
648 # here, the case for #8703 is introduced; user asks to refresh some
649 # relationship attributes only which are
650 # selectinload/subqueryload/immediateload/ etc. (not joinedload).
651 # ORM complains there's no columns in the primary row to load.
652 # So here, we just add the PK cols if that
653 # case is detected, so that there is a SELECT emitted for the primary
654 # row.
655 #
656 # Let's just state right up front, for this one little case,
657 # the ORM here is adding a whole extra SELECT just to satisfy
658 # limitations in the internal flow. This is really not a thing
659 # SQLAlchemy finds itself doing like, ever, obviously, we are
660 # constantly working to *remove* SELECTs we don't need. We
661 # rationalize this for now based on 1. session.refresh() is not
662 # commonly used 2. session.refresh() with only relationship attrs is
663 # even less commonly used 3. the SELECT in question is very low
664 # latency.
665 #
666 # to add the flow to not include the SELECT, the quickest way
667 # might be to just manufacture a single-row result set to send off to
668 # instances(), but we'd have to weave that into context.py and all
669 # that. For 2.0.0, we have enough big changes to navigate for now.
670 #
671 mp = refresh_state.mapper._props
672 for p in only_load_props:
673 if mp[p]._is_relationship:
674 only_load_props = refresh_state_prokeys.union(only_load_props)
675 break
676
677 if refresh_state and refresh_state.load_options:
678 compile_options += {"_current_path": refresh_state.load_path.parent}
679 q = q.options(*refresh_state.load_options)
680
681 new_compile_options, load_options = _set_get_options(
682 compile_options,
683 load_options,
684 version_check=version_check,
685 only_load_props=only_load_props,
686 refresh_state=refresh_state,
687 identity_token=identity_token,
688 is_user_refresh=is_user_refresh,
689 )
690
691 q._compile_options = new_compile_options
692 q._order_by = None
693
694 if no_autoflush:
695 load_options += {"_autoflush": False}
696
697 execution_options = util.EMPTY_DICT.merge_with(
698 execution_options,
699 util.immutabledict(_sa_orm_load_options=load_options),
700 )
701 result = (
702 session.execute(
703 q,
704 params=params,
705 execution_options=execution_options,
706 bind_arguments=bind_arguments,
707 )
708 .unique()
709 .scalars()
710 )
711
712 try:
713 return result.one()
714 except orm_exc.NoResultFound:
715 return None
716
717
718def _set_get_options(
719 compile_opt,
720 load_opt,
721 populate_existing=None,
722 version_check=None,
723 only_load_props=None,
724 refresh_state=None,
725 identity_token=None,
726 is_user_refresh=None,
727):
728 compile_options = {}
729 load_options = {}
730 if version_check:
731 load_options["_version_check"] = version_check
732 if populate_existing:
733 load_options["_populate_existing"] = populate_existing
734 if refresh_state:
735 load_options["_refresh_state"] = refresh_state
736 compile_options["_for_refresh_state"] = True
737 if only_load_props:
738 compile_options["_only_load_props"] = frozenset(only_load_props)
739 if identity_token:
740 load_options["_identity_token"] = identity_token
741
742 if is_user_refresh:
743 load_options["_is_user_refresh"] = is_user_refresh
744 if load_options:
745 load_opt += load_options
746 if compile_options:
747 compile_opt += compile_options
748
749 return compile_opt, load_opt
750
751
752def _setup_entity_query(
753 compile_state,
754 mapper,
755 query_entity,
756 path,
757 adapter,
758 column_collection,
759 with_polymorphic=None,
760 only_load_props=None,
761 polymorphic_discriminator=None,
762 **kw,
763):
764 if with_polymorphic:
765 poly_properties = mapper._iterate_polymorphic_properties(
766 with_polymorphic
767 )
768 else:
769 poly_properties = mapper._polymorphic_properties
770
771 quick_populators = {}
772
773 path.set(compile_state.attributes, "memoized_setups", quick_populators)
774
775 # for the lead entities in the path, e.g. not eager loads, and
776 # assuming a user-passed aliased class, e.g. not a from_self() or any
777 # implicit aliasing, don't add columns to the SELECT that aren't
778 # in the thing that's aliased.
779 check_for_adapt = adapter and len(path) == 1 and path[-1].is_aliased_class
780
781 for value in poly_properties:
782 if only_load_props and value.key not in only_load_props:
783 continue
784 value.setup(
785 compile_state,
786 query_entity,
787 path,
788 adapter,
789 only_load_props=only_load_props,
790 column_collection=column_collection,
791 memoized_populators=quick_populators,
792 check_for_adapt=check_for_adapt,
793 **kw,
794 )
795
796 if (
797 polymorphic_discriminator is not None
798 and polymorphic_discriminator is not mapper.polymorphic_on
799 ):
800 if adapter:
801 pd = adapter.columns[polymorphic_discriminator]
802 else:
803 pd = polymorphic_discriminator
804 column_collection.append(pd)
805
806
807def _warn_for_runid_changed(state):
808 util.warn(
809 "Loading context for %s has changed within a load/refresh "
810 "handler, suggesting a row refresh operation took place. If this "
811 "event handler is expected to be "
812 "emitting row refresh operations within an existing load or refresh "
813 "operation, set restore_load_context=True when establishing the "
814 "listener to ensure the context remains unchanged when the event "
815 "handler completes." % (state_str(state),)
816 )
817
818
819def _instance_processor(
820 query_entity,
821 mapper,
822 context,
823 result,
824 path,
825 adapter,
826 only_load_props=None,
827 refresh_state=None,
828 polymorphic_discriminator=None,
829 _polymorphic_from=None,
830):
831 """Produce a mapper level row processor callable
832 which processes rows into mapped instances."""
833
834 # note that this method, most of which exists in a closure
835 # called _instance(), resists being broken out, as
836 # attempts to do so tend to add significant function
837 # call overhead. _instance() is the most
838 # performance-critical section in the whole ORM.
839
840 identity_class = mapper._identity_class
841 compile_state = context.compile_state
842
843 # look for "row getter" functions that have been assigned along
844 # with the compile state that were cached from a previous load.
845 # these are operator.itemgetter() objects that each will extract a
846 # particular column from each row.
847
848 getter_key = ("getters", mapper)
849 getters = path.get(compile_state.attributes, getter_key, None)
850
851 if getters is None:
852 # no getters, so go through a list of attributes we are loading for,
853 # and the ones that are column based will have already put information
854 # for us in another collection "memoized_setups", which represents the
855 # output of the LoaderStrategy.setup_query() method. We can just as
856 # easily call LoaderStrategy.create_row_processor for each, but by
857 # getting it all at once from setup_query we save another method call
858 # per attribute.
859 props = mapper._prop_set
860 if only_load_props is not None:
861 props = props.intersection(
862 mapper._props[k] for k in only_load_props
863 )
864
865 quick_populators = path.get(
866 context.attributes, "memoized_setups", EMPTY_DICT
867 )
868
869 todo = []
870 cached_populators = {
871 "new": [],
872 "quick": [],
873 "deferred": [],
874 "expire": [],
875 "existing": [],
876 "eager": [],
877 }
878
879 if refresh_state is None:
880 # we can also get the "primary key" tuple getter function
881 pk_cols = mapper.primary_key
882
883 if adapter:
884 pk_cols = [adapter.columns[c] for c in pk_cols]
885 primary_key_getter = result._tuple_getter(pk_cols)
886 else:
887 primary_key_getter = None
888
889 getters = {
890 "cached_populators": cached_populators,
891 "todo": todo,
892 "primary_key_getter": primary_key_getter,
893 }
894 for prop in props:
895 if prop in quick_populators:
896 # this is an inlined path just for column-based attributes.
897 col = quick_populators[prop]
898 if col is _DEFER_FOR_STATE:
899 cached_populators["new"].append(
900 (prop.key, prop._deferred_column_loader)
901 )
902 elif col is _SET_DEFERRED_EXPIRED:
903 # note that in this path, we are no longer
904 # searching in the result to see if the column might
905 # be present in some unexpected way.
906 cached_populators["expire"].append((prop.key, False))
907 elif col is _RAISE_FOR_STATE:
908 cached_populators["new"].append(
909 (prop.key, prop._raise_column_loader)
910 )
911 else:
912 getter = None
913 if adapter:
914 # this logic had been removed for all 1.4 releases
915 # up until 1.4.18; the adapter here is particularly
916 # the compound eager adapter which isn't accommodated
917 # in the quick_populators right now. The "fallback"
918 # logic below instead took over in many more cases
919 # until issue #6596 was identified.
920
921 # note there is still an issue where this codepath
922 # produces no "getter" for cases where a joined-inh
923 # mapping includes a labeled column property, meaning
924 # KeyError is caught internally and we fall back to
925 # _getter(col), which works anyway. The adapter
926 # here for joined inh without any aliasing might not
927 # be useful. Tests which see this include
928 # test.orm.inheritance.test_basic ->
929 # EagerTargetingTest.test_adapt_stringency
930 # OptimizedLoadTest.test_column_expression_joined
931 # PolymorphicOnNotLocalTest.test_polymorphic_on_column_prop # noqa: E501
932 #
933
934 adapted_col = adapter.columns[col]
935 if adapted_col is not None:
936 getter = result._getter(adapted_col, False)
937 if not getter:
938 getter = result._getter(col, False)
939 if getter:
940 cached_populators["quick"].append((prop.key, getter))
941 else:
942 # fall back to the ColumnProperty itself, which
943 # will iterate through all of its columns
944 # to see if one fits
945 prop.create_row_processor(
946 context,
947 query_entity,
948 path,
949 mapper,
950 result,
951 adapter,
952 cached_populators,
953 )
954 else:
955 # loader strategies like subqueryload, selectinload,
956 # joinedload, basically relationships, these need to interact
957 # with the context each time to work correctly.
958 todo.append(prop)
959
960 path.set(compile_state.attributes, getter_key, getters)
961
962 cached_populators = getters["cached_populators"]
963
964 populators = {key: list(value) for key, value in cached_populators.items()}
965 for prop in getters["todo"]:
966 prop.create_row_processor(
967 context, query_entity, path, mapper, result, adapter, populators
968 )
969
970 propagated_loader_options = context.propagated_loader_options
971 load_path = (
972 context.compile_state.current_path + path
973 if context.compile_state.current_path.path
974 else path
975 )
976
977 session_identity_map = context.session.identity_map
978
979 populate_existing = context.populate_existing or mapper.always_refresh
980 load_evt = bool(mapper.class_manager.dispatch.load)
981 refresh_evt = bool(mapper.class_manager.dispatch.refresh)
982 persistent_evt = bool(context.session.dispatch.loaded_as_persistent)
983 if persistent_evt:
984 loaded_as_persistent = context.session.dispatch.loaded_as_persistent
985 instance_state = attributes.instance_state
986 instance_dict = attributes.instance_dict
987 session_id = context.session.hash_key
988 runid = context.runid
989 identity_token = context.identity_token
990
991 version_check = context.version_check
992 if version_check:
993 version_id_col = mapper.version_id_col
994 if version_id_col is not None:
995 if adapter:
996 version_id_col = adapter.columns[version_id_col]
997 version_id_getter = result._getter(version_id_col)
998 else:
999 version_id_getter = None
1000
1001 if not refresh_state and _polymorphic_from is not None:
1002 key = ("loader", path.path)
1003
1004 if key in context.attributes and context.attributes[key].strategy == (
1005 ("selectinload_polymorphic", True),
1006 ):
1007 option_entities = context.attributes[key].local_opts["entities"]
1008 else:
1009 option_entities = None
1010 selectin_load_via = mapper._should_selectin_load(
1011 option_entities,
1012 _polymorphic_from,
1013 )
1014
1015 if selectin_load_via and selectin_load_via is not _polymorphic_from:
1016 # only_load_props goes w/ refresh_state only, and in a refresh
1017 # we are a single row query for the exact entity; polymorphic
1018 # loading does not apply
1019 assert only_load_props is None
1020
1021 if selectin_load_via.is_mapper:
1022 _load_supers = []
1023 _endmost_mapper = selectin_load_via
1024 while (
1025 _endmost_mapper
1026 and _endmost_mapper is not _polymorphic_from
1027 ):
1028 _load_supers.append(_endmost_mapper)
1029 _endmost_mapper = _endmost_mapper.inherits
1030 else:
1031 _load_supers = [selectin_load_via]
1032
1033 for _selectinload_entity in _load_supers:
1034 if _PostLoad.path_exists(
1035 context, load_path, _selectinload_entity
1036 ):
1037 continue
1038 callable_ = _load_subclass_via_in(
1039 context,
1040 path,
1041 _selectinload_entity,
1042 _polymorphic_from,
1043 option_entities,
1044 )
1045 _PostLoad.callable_for_path(
1046 context,
1047 load_path,
1048 _selectinload_entity.mapper,
1049 _selectinload_entity,
1050 callable_,
1051 _selectinload_entity,
1052 )
1053
1054 post_load = _PostLoad.for_context(context, load_path, only_load_props)
1055
1056 if refresh_state:
1057 refresh_identity_key = refresh_state.key
1058 if refresh_identity_key is None:
1059 # super-rare condition; a refresh is being called
1060 # on a non-instance-key instance; this is meant to only
1061 # occur within a flush()
1062 refresh_identity_key = mapper._identity_key_from_state(
1063 refresh_state
1064 )
1065 else:
1066 refresh_identity_key = None
1067
1068 primary_key_getter = getters["primary_key_getter"]
1069
1070 if mapper.allow_partial_pks:
1071 is_not_primary_key = _none_set.issuperset
1072 else:
1073 is_not_primary_key = _none_set.intersection
1074
1075 def _instance(row):
1076 # determine the state that we'll be populating
1077 if refresh_identity_key:
1078 # fixed state that we're refreshing
1079 state = refresh_state
1080 instance = state.obj()
1081 dict_ = instance_dict(instance)
1082 isnew = state.runid != runid
1083 currentload = True
1084 loaded_instance = False
1085 else:
1086 # look at the row, see if that identity is in the
1087 # session, or we have to create a new one
1088 identitykey = (
1089 identity_class,
1090 primary_key_getter(row),
1091 identity_token,
1092 )
1093
1094 instance = session_identity_map.get(identitykey)
1095
1096 if instance is not None:
1097 # existing instance
1098 state = instance_state(instance)
1099 dict_ = instance_dict(instance)
1100
1101 isnew = state.runid != runid
1102 currentload = not isnew
1103 loaded_instance = False
1104
1105 if version_check and version_id_getter and not currentload:
1106 _validate_version_id(
1107 mapper, state, dict_, row, version_id_getter
1108 )
1109
1110 else:
1111 # create a new instance
1112
1113 # check for non-NULL values in the primary key columns,
1114 # else no entity is returned for the row
1115 if is_not_primary_key(identitykey[1]):
1116 return None
1117
1118 isnew = True
1119 currentload = True
1120 loaded_instance = True
1121
1122 instance = mapper.class_manager.new_instance()
1123
1124 dict_ = instance_dict(instance)
1125 state = instance_state(instance)
1126 state.key = identitykey
1127 state.identity_token = identity_token
1128
1129 # attach instance to session.
1130 state.session_id = session_id
1131 session_identity_map._add_unpresent(state, identitykey)
1132
1133 effective_populate_existing = populate_existing
1134 if refresh_state is state:
1135 effective_populate_existing = True
1136
1137 # populate. this looks at whether this state is new
1138 # for this load or was existing, and whether or not this
1139 # row is the first row with this identity.
1140 if currentload or effective_populate_existing:
1141 # full population routines. Objects here are either
1142 # just created, or we are doing a populate_existing
1143
1144 # be conservative about setting load_path when populate_existing
1145 # is in effect; want to maintain options from the original
1146 # load. see test_expire->test_refresh_maintains_deferred_options
1147 if isnew and (
1148 propagated_loader_options or not effective_populate_existing
1149 ):
1150 state.load_options = propagated_loader_options
1151 state.load_path = load_path
1152
1153 _populate_full(
1154 context,
1155 row,
1156 state,
1157 dict_,
1158 isnew,
1159 load_path,
1160 loaded_instance,
1161 effective_populate_existing,
1162 populators,
1163 )
1164
1165 if isnew:
1166 # state.runid should be equal to context.runid / runid
1167 # here, however for event checks we are being more conservative
1168 # and checking against existing run id
1169 # assert state.runid == runid
1170
1171 existing_runid = state.runid
1172
1173 if loaded_instance:
1174 if load_evt:
1175 state.manager.dispatch.load(state, context)
1176 if state.runid != existing_runid:
1177 _warn_for_runid_changed(state)
1178 if persistent_evt:
1179 loaded_as_persistent(context.session, state)
1180 if state.runid != existing_runid:
1181 _warn_for_runid_changed(state)
1182 elif refresh_evt:
1183 state.manager.dispatch.refresh(
1184 state, context, only_load_props
1185 )
1186 if state.runid != runid:
1187 _warn_for_runid_changed(state)
1188
1189 if effective_populate_existing or state.modified:
1190 if refresh_state and only_load_props:
1191 state._commit(dict_, only_load_props)
1192 else:
1193 state._commit_all(dict_, session_identity_map)
1194
1195 if post_load:
1196 post_load.add_state(state, True)
1197
1198 else:
1199 # partial population routines, for objects that were already
1200 # in the Session, but a row matches them; apply eager loaders
1201 # on existing objects, etc.
1202 unloaded = state.unloaded
1203 isnew = state not in context.partials
1204
1205 if not isnew or unloaded or populators["eager"]:
1206 # state is having a partial set of its attributes
1207 # refreshed. Populate those attributes,
1208 # and add to the "context.partials" collection.
1209
1210 to_load = _populate_partial(
1211 context,
1212 row,
1213 state,
1214 dict_,
1215 isnew,
1216 load_path,
1217 unloaded,
1218 populators,
1219 )
1220
1221 if isnew:
1222 if refresh_evt:
1223 existing_runid = state.runid
1224 state.manager.dispatch.refresh(state, context, to_load)
1225 if state.runid != existing_runid:
1226 _warn_for_runid_changed(state)
1227
1228 state._commit(dict_, to_load)
1229
1230 if post_load and context.invoke_all_eagers:
1231 post_load.add_state(state, False)
1232
1233 return instance
1234
1235 if mapper.polymorphic_map and not _polymorphic_from and not refresh_state:
1236 # if we are doing polymorphic, dispatch to a different _instance()
1237 # method specific to the subclass mapper
1238 def ensure_no_pk(row):
1239 identitykey = (
1240 identity_class,
1241 primary_key_getter(row),
1242 identity_token,
1243 )
1244 if not is_not_primary_key(identitykey[1]):
1245 return identitykey
1246 else:
1247 return None
1248
1249 _instance = _decorate_polymorphic_switch(
1250 _instance,
1251 context,
1252 query_entity,
1253 mapper,
1254 result,
1255 path,
1256 polymorphic_discriminator,
1257 adapter,
1258 ensure_no_pk,
1259 )
1260
1261 return _instance
1262
1263
1264def _load_subclass_via_in(
1265 context, path, entity, polymorphic_from, option_entities
1266):
1267 mapper = entity.mapper
1268
1269 # TODO: polymorphic_from seems to be a Mapper in all cases.
1270 # this is likely not needed, but as we dont have typing in loading.py
1271 # yet, err on the safe side
1272 polymorphic_from_mapper = polymorphic_from.mapper
1273 not_against_basemost = polymorphic_from_mapper.inherits is not None
1274
1275 zero_idx = len(mapper.base_mapper.primary_key) == 1
1276
1277 if entity.is_aliased_class or not_against_basemost:
1278 q, enable_opt, disable_opt = mapper._subclass_load_via_in(
1279 entity, polymorphic_from
1280 )
1281 else:
1282 q, enable_opt, disable_opt = mapper._subclass_load_via_in_mapper
1283
1284 def do_load(context, path, states, load_only, effective_entity):
1285 if not option_entities:
1286 # filter out states for those that would have selectinloaded
1287 # from another loader
1288 # TODO: we are currently ignoring the case where the
1289 # "selectin_polymorphic" option is used, as this is much more
1290 # complex / specific / very uncommon API use
1291 states = [
1292 (s, v)
1293 for s, v in states
1294 if s.mapper._would_selectin_load_only_from_given_mapper(mapper)
1295 ]
1296
1297 if not states:
1298 return
1299
1300 orig_query = context.query
1301
1302 if path.parent:
1303 enable_opt_lcl = enable_opt._prepend_path(path)
1304 disable_opt_lcl = disable_opt._prepend_path(path)
1305 else:
1306 enable_opt_lcl = enable_opt
1307 disable_opt_lcl = disable_opt
1308 options = (
1309 (enable_opt_lcl,) + orig_query._with_options + (disable_opt_lcl,)
1310 )
1311
1312 q2 = q.options(*options)
1313
1314 q2._compile_options = context.compile_state.default_compile_options
1315 q2._compile_options += {"_current_path": path.parent}
1316
1317 if context.populate_existing:
1318 q2 = q2.execution_options(populate_existing=True)
1319
1320 while states:
1321 chunk = states[0 : _SelectInLoader._chunksize]
1322 states = states[_SelectInLoader._chunksize :]
1323 context.session.execute(
1324 q2,
1325 dict(
1326 primary_keys=[
1327 state.key[1][0] if zero_idx else state.key[1]
1328 for state, load_attrs in chunk
1329 ]
1330 ),
1331 ).unique().scalars().all()
1332
1333 return do_load
1334
1335
1336def _populate_full(
1337 context,
1338 row,
1339 state,
1340 dict_,
1341 isnew,
1342 load_path,
1343 loaded_instance,
1344 populate_existing,
1345 populators,
1346):
1347 if isnew:
1348 # first time we are seeing a row with this identity.
1349 state.runid = context.runid
1350
1351 for key, getter in populators["quick"]:
1352 dict_[key] = getter(row)
1353 if populate_existing:
1354 for key, set_callable in populators["expire"]:
1355 dict_.pop(key, None)
1356 if set_callable:
1357 state.expired_attributes.add(key)
1358 else:
1359 for key, set_callable in populators["expire"]:
1360 if set_callable:
1361 state.expired_attributes.add(key)
1362
1363 for key, populator in populators["new"]:
1364 populator(state, dict_, row)
1365
1366 elif load_path != state.load_path:
1367 # new load path, e.g. object is present in more than one
1368 # column position in a series of rows
1369 state.load_path = load_path
1370
1371 # if we have data, and the data isn't in the dict, OK, let's put
1372 # it in.
1373 for key, getter in populators["quick"]:
1374 if key not in dict_:
1375 dict_[key] = getter(row)
1376
1377 # otherwise treat like an "already seen" row
1378 for key, populator in populators["existing"]:
1379 populator(state, dict_, row)
1380 # TODO: allow "existing" populator to know this is
1381 # a new path for the state:
1382 # populator(state, dict_, row, new_path=True)
1383
1384 else:
1385 # have already seen rows with this identity in this same path.
1386 for key, populator in populators["existing"]:
1387 populator(state, dict_, row)
1388
1389 # TODO: same path
1390 # populator(state, dict_, row, new_path=False)
1391
1392
1393def _populate_partial(
1394 context, row, state, dict_, isnew, load_path, unloaded, populators
1395):
1396 if not isnew:
1397 if unloaded:
1398 # extra pass, see #8166
1399 for key, getter in populators["quick"]:
1400 if key in unloaded:
1401 dict_[key] = getter(row)
1402
1403 to_load = context.partials[state]
1404 for key, populator in populators["existing"]:
1405 if key in to_load:
1406 populator(state, dict_, row)
1407 else:
1408 to_load = unloaded
1409 context.partials[state] = to_load
1410
1411 for key, getter in populators["quick"]:
1412 if key in to_load:
1413 dict_[key] = getter(row)
1414 for key, set_callable in populators["expire"]:
1415 if key in to_load:
1416 dict_.pop(key, None)
1417 if set_callable:
1418 state.expired_attributes.add(key)
1419 for key, populator in populators["new"]:
1420 if key in to_load:
1421 populator(state, dict_, row)
1422
1423 for key, populator in populators["eager"]:
1424 if key not in unloaded:
1425 populator(state, dict_, row)
1426
1427 return to_load
1428
1429
1430def _validate_version_id(mapper, state, dict_, row, getter):
1431 if mapper._get_state_attr_by_column(
1432 state, dict_, mapper.version_id_col
1433 ) != getter(row):
1434 raise orm_exc.StaleDataError(
1435 "Instance '%s' has version id '%s' which "
1436 "does not match database-loaded version id '%s'."
1437 % (
1438 state_str(state),
1439 mapper._get_state_attr_by_column(
1440 state, dict_, mapper.version_id_col
1441 ),
1442 getter(row),
1443 )
1444 )
1445
1446
1447def _decorate_polymorphic_switch(
1448 instance_fn,
1449 context,
1450 query_entity,
1451 mapper,
1452 result,
1453 path,
1454 polymorphic_discriminator,
1455 adapter,
1456 ensure_no_pk,
1457):
1458 if polymorphic_discriminator is not None:
1459 polymorphic_on = polymorphic_discriminator
1460 else:
1461 polymorphic_on = mapper.polymorphic_on
1462 if polymorphic_on is None:
1463 return instance_fn
1464
1465 if adapter:
1466 polymorphic_on = adapter.columns[polymorphic_on]
1467
1468 def configure_subclass_mapper(discriminator):
1469 try:
1470 sub_mapper = mapper.polymorphic_map[discriminator]
1471 except KeyError:
1472 raise AssertionError(
1473 "No such polymorphic_identity %r is defined" % discriminator
1474 )
1475 else:
1476 if sub_mapper is mapper:
1477 return None
1478 elif not sub_mapper.isa(mapper):
1479 return False
1480
1481 return _instance_processor(
1482 query_entity,
1483 sub_mapper,
1484 context,
1485 result,
1486 path,
1487 adapter,
1488 _polymorphic_from=mapper,
1489 )
1490
1491 polymorphic_instances = util.PopulateDict(configure_subclass_mapper)
1492
1493 getter = result._getter(polymorphic_on)
1494
1495 def polymorphic_instance(row):
1496 discriminator = getter(row)
1497 if discriminator is not None:
1498 _instance = polymorphic_instances[discriminator]
1499 if _instance:
1500 return _instance(row)
1501 elif _instance is False:
1502 identitykey = ensure_no_pk(row)
1503
1504 if identitykey:
1505 raise sa_exc.InvalidRequestError(
1506 "Row with identity key %s can't be loaded into an "
1507 "object; the polymorphic discriminator column '%s' "
1508 "refers to %s, which is not a sub-mapper of "
1509 "the requested %s"
1510 % (
1511 identitykey,
1512 polymorphic_on,
1513 mapper.polymorphic_map[discriminator],
1514 mapper,
1515 )
1516 )
1517 else:
1518 return None
1519 else:
1520 return instance_fn(row)
1521 else:
1522 identitykey = ensure_no_pk(row)
1523
1524 if identitykey:
1525 raise sa_exc.InvalidRequestError(
1526 "Row with identity key %s can't be loaded into an "
1527 "object; the polymorphic discriminator column '%s' is "
1528 "NULL" % (identitykey, polymorphic_on)
1529 )
1530 else:
1531 return None
1532
1533 return polymorphic_instance
1534
1535
1536class _PostLoad:
1537 """Track loaders and states for "post load" operations."""
1538
1539 __slots__ = "loaders", "states", "load_keys"
1540
1541 def __init__(self):
1542 self.loaders = {}
1543 self.states = util.OrderedDict()
1544 self.load_keys = None
1545
1546 def add_state(self, state, overwrite):
1547 # the states for a polymorphic load here are all shared
1548 # within a single PostLoad object among multiple subtypes.
1549 # Filtering of callables on a per-subclass basis needs to be done at
1550 # the invocation level
1551 self.states[state] = overwrite
1552
1553 def invoke(self, context, path):
1554 if not self.states:
1555 return
1556 path = path_registry.PathRegistry.coerce(path)
1557 for (
1558 effective_context,
1559 token,
1560 limit_to_mapper,
1561 loader,
1562 arg,
1563 kw,
1564 ) in self.loaders.values():
1565 states = [
1566 (state, overwrite)
1567 for state, overwrite in self.states.items()
1568 if state.manager.mapper.isa(limit_to_mapper)
1569 ]
1570 if states:
1571 loader(
1572 effective_context, path, states, self.load_keys, *arg, **kw
1573 )
1574 self.states.clear()
1575
1576 @classmethod
1577 def for_context(cls, context, path, only_load_props):
1578 pl = context.post_load_paths.get(path.path)
1579 if pl is not None and only_load_props:
1580 pl.load_keys = only_load_props
1581 return pl
1582
1583 @classmethod
1584 def path_exists(self, context, path, key):
1585 return (
1586 path.path in context.post_load_paths
1587 and key in context.post_load_paths[path.path].loaders
1588 )
1589
1590 @classmethod
1591 def callable_for_path(
1592 cls, context, path, limit_to_mapper, token, loader_callable, *arg, **kw
1593 ):
1594 if path.path in context.post_load_paths:
1595 pl = context.post_load_paths[path.path]
1596 else:
1597 pl = context.post_load_paths[path.path] = _PostLoad()
1598 pl.loaders[token] = (
1599 context,
1600 token,
1601 limit_to_mapper,
1602 loader_callable,
1603 arg,
1604 kw,
1605 )
1606
1607
1608def _load_scalar_attributes(mapper, state, attribute_names, passive):
1609 """initiate a column-based attribute refresh operation."""
1610
1611 # assert mapper is _state_mapper(state)
1612 session = state.session
1613 if not session:
1614 raise orm_exc.DetachedInstanceError(
1615 "Instance %s is not bound to a Session; "
1616 "attribute refresh operation cannot proceed" % (state_str(state))
1617 )
1618
1619 no_autoflush = bool(passive & attributes.NO_AUTOFLUSH)
1620
1621 # in the case of inheritance, particularly concrete and abstract
1622 # concrete inheritance, the class manager might have some keys
1623 # of attributes on the superclass that we didn't actually map.
1624 # These could be mapped as "concrete, don't load" or could be completely
1625 # excluded from the mapping and we know nothing about them. Filter them
1626 # here to prevent them from coming through.
1627 if attribute_names:
1628 attribute_names = attribute_names.intersection(mapper.attrs.keys())
1629
1630 if mapper.inherits and not mapper.concrete:
1631 # load based on committed attributes in the object, formed into
1632 # a truncated SELECT that only includes relevant tables. does not
1633 # currently use state.key
1634 statement = mapper._optimized_get_statement(state, attribute_names)
1635 if statement is not None:
1636 # undefer() isn't needed here because statement has the
1637 # columns needed already, this implicitly undefers that column
1638 stmt = FromStatement(mapper, statement)
1639
1640 return _load_on_ident(
1641 session,
1642 stmt,
1643 None,
1644 only_load_props=attribute_names,
1645 refresh_state=state,
1646 no_autoflush=no_autoflush,
1647 )
1648
1649 # normal load, use state.key as the identity to SELECT
1650 has_key = bool(state.key)
1651
1652 if has_key:
1653 identity_key = state.key
1654 else:
1655 # this codepath is rare - only valid when inside a flush, and the
1656 # object is becoming persistent but hasn't yet been assigned
1657 # an identity_key.
1658 # check here to ensure we have the attrs we need.
1659 pk_attrs = [
1660 mapper._columntoproperty[col].key for col in mapper.primary_key
1661 ]
1662 if state.expired_attributes.intersection(pk_attrs):
1663 raise sa_exc.InvalidRequestError(
1664 "Instance %s cannot be refreshed - it's not "
1665 " persistent and does not "
1666 "contain a full primary key." % state_str(state)
1667 )
1668 identity_key = mapper._identity_key_from_state(state)
1669
1670 if (
1671 _none_set.issubset(identity_key) and not mapper.allow_partial_pks
1672 ) or _none_set.issuperset(identity_key):
1673 util.warn_limited(
1674 "Instance %s to be refreshed doesn't "
1675 "contain a full primary key - can't be refreshed "
1676 "(and shouldn't be expired, either).",
1677 state_str(state),
1678 )
1679 return
1680
1681 result = _load_on_ident(
1682 session,
1683 select(mapper),
1684 identity_key,
1685 refresh_state=state,
1686 only_load_props=attribute_names,
1687 no_autoflush=no_autoflush,
1688 )
1689
1690 # if instance is pending, a refresh operation
1691 # may not complete (even if PK attributes are assigned)
1692 if has_key and result is None:
1693 raise orm_exc.ObjectDeletedError(state)