1# orm/loading.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: ignore-errors
8
9
10"""private module containing functions used to convert database
11rows into object instances and associated state.
12
13the functions here are called primarily by Query, Mapper,
14as well as some of the attribute loading strategies.
15
16"""
17
18from __future__ import annotations
19
20from typing import Any
21from typing import Dict
22from typing import Iterable
23from typing import List
24from typing import Mapping
25from typing import Optional
26from typing import Sequence
27from typing import Tuple
28from typing import TYPE_CHECKING
29from typing import TypeVar
30from typing import Union
31
32from . import attributes
33from . import exc as orm_exc
34from . import path_registry
35from .base import _DEFER_FOR_STATE
36from .base import _RAISE_FOR_STATE
37from .base import _SET_DEFERRED_EXPIRED
38from .base import PassiveFlag
39from .context import _ORMCompileState
40from .context import FromStatement
41from .context import QueryContext
42from .util import _none_set
43from .util import state_str
44from .. import exc as sa_exc
45from .. import util
46from ..engine import result_tuple
47from ..engine.result import ChunkedIteratorResult
48from ..engine.result import FrozenResult
49from ..engine.result import SimpleResultMetaData
50from ..sql import select
51from ..sql import util as sql_util
52from ..sql.selectable import ForUpdateArg
53from ..sql.selectable import LABEL_STYLE_TABLENAME_PLUS_COL
54from ..sql.selectable import SelectState
55from ..util import EMPTY_DICT
56from ..util.typing import TupleAny
57from ..util.typing import Unpack
58
59if TYPE_CHECKING:
60 from ._typing import _IdentityKeyType
61 from .base import LoaderCallableStatus
62 from .interfaces import ORMOption
63 from .mapper import Mapper
64 from .query import Query
65 from .session import Session
66 from .state import InstanceState
67 from ..engine.cursor import CursorResult
68 from ..engine.interfaces import _ExecuteOptions
69 from ..engine.result import Result
70 from ..sql import Select
71
72_T = TypeVar("_T", bound=Any)
73_O = TypeVar("_O", bound=object)
74_new_runid = util.counter()
75
76
77_PopulatorDict = Dict[str, List[Tuple[str, Any]]]
78
79
80def instances(
81 cursor: CursorResult[Unpack[TupleAny]], context: QueryContext
82) -> Result[Unpack[TupleAny]]:
83 """Return a :class:`.Result` given an ORM query context.
84
85 :param cursor: a :class:`.CursorResult`, generated by a statement
86 which came from :class:`.ORMCompileState`
87
88 :param context: a :class:`.QueryContext` object
89
90 :return: a :class:`.Result` object representing ORM results
91
92 .. versionchanged:: 1.4 The instances() function now uses
93 :class:`.Result` objects and has an all new interface.
94
95 """
96
97 context.runid = _new_runid()
98
99 if context.top_level_context:
100 is_top_level = False
101 context.post_load_paths = context.top_level_context.post_load_paths
102 else:
103 is_top_level = True
104 context.post_load_paths = {}
105
106 compile_state = context.compile_state
107 filtered = compile_state._has_mapper_entities
108 single_entity = (
109 not context.load_options._only_return_tuples
110 and len(compile_state._entities) == 1
111 and compile_state._entities[0].supports_single_entity
112 )
113
114 try:
115 (process, labels, extra) = list(
116 zip(
117 *[
118 query_entity.row_processor(context, cursor)
119 for query_entity in context.compile_state._entities
120 ]
121 )
122 )
123
124 if context.yield_per and (
125 context.loaders_require_buffering
126 or context.loaders_require_uniquing
127 ):
128 raise sa_exc.InvalidRequestError(
129 "Can't use yield_per with eager loaders that require uniquing "
130 "or row buffering, e.g. joinedload() against collections "
131 "or subqueryload(). Consider the selectinload() strategy "
132 "for better flexibility in loading objects."
133 )
134
135 except Exception:
136 with util.safe_reraise():
137 cursor.close()
138
139 def _no_unique(entry):
140 raise sa_exc.InvalidRequestError(
141 "Can't use the ORM yield_per feature in conjunction with unique()"
142 )
143
144 def _not_hashable(datatype, *, legacy=False, uncertain=False):
145 if not legacy:
146
147 def go(obj):
148 if uncertain:
149 try:
150 return hash(obj)
151 except:
152 pass
153
154 raise sa_exc.InvalidRequestError(
155 "Can't apply uniqueness to row tuple containing value of "
156 f"""type {datatype!r}; {
157 'the values returned appear to be'
158 if uncertain
159 else 'this datatype produces'
160 } non-hashable values"""
161 )
162
163 return go
164 elif not uncertain:
165 return id
166 else:
167 _use_id = False
168
169 def go(obj):
170 nonlocal _use_id
171
172 if not _use_id:
173 try:
174 return hash(obj)
175 except:
176 pass
177
178 # in #10459, we considered using a warning here, however
179 # as legacy query uses result.unique() in all cases, this
180 # would lead to too many warning cases.
181 _use_id = True
182
183 return id(obj)
184
185 return go
186
187 unique_filters = [
188 (
189 _no_unique
190 if context.yield_per
191 else (
192 _not_hashable(
193 ent.column.type, # type: ignore
194 legacy=context.load_options._legacy_uniquing,
195 uncertain=ent._null_column_type,
196 )
197 if (
198 not ent.use_id_for_hash
199 and (ent._non_hashable_value or ent._null_column_type)
200 )
201 else id if ent.use_id_for_hash else None
202 )
203 )
204 for ent in context.compile_state._entities
205 ]
206
207 row_metadata = SimpleResultMetaData(
208 labels, extra, _unique_filters=unique_filters
209 )
210
211 def chunks(size): # type: ignore
212 while True:
213 yield_per = size
214
215 context.partials = {}
216
217 if yield_per:
218 fetch = cursor.fetchmany(yield_per)
219
220 if not fetch:
221 break
222 else:
223 fetch = cursor._raw_all_rows()
224
225 if single_entity:
226 proc = process[0]
227 rows = [proc(row) for row in fetch]
228 else:
229 rows = [
230 tuple([proc(row) for proc in process]) for row in fetch
231 ]
232
233 # if we are the originating load from a query, meaning we
234 # aren't being called as a result of a nested "post load",
235 # iterate through all the collected post loaders and fire them
236 # off. Previously this used to work recursively, however that
237 # prevented deeply nested structures from being loadable
238 if is_top_level:
239 if yield_per:
240 # if using yield per, memoize the state of the
241 # collection so that it can be restored
242 top_level_post_loads = list(
243 context.post_load_paths.items()
244 )
245
246 while context.post_load_paths:
247 post_loads = list(context.post_load_paths.items())
248 context.post_load_paths.clear()
249 for path, post_load in post_loads:
250 post_load.invoke(context, path)
251
252 if yield_per:
253 context.post_load_paths.clear()
254 context.post_load_paths.update(top_level_post_loads)
255
256 yield rows
257
258 if not yield_per:
259 break
260
261 if context.execution_options.get("prebuffer_rows", False):
262 # this is a bit of a hack at the moment.
263 # I would rather have some option in the result to pre-buffer
264 # internally.
265 _prebuffered = list(chunks(None))
266
267 def chunks(size):
268 return iter(_prebuffered)
269
270 result = ChunkedIteratorResult(
271 row_metadata,
272 chunks,
273 source_supports_scalars=single_entity,
274 raw=cursor,
275 dynamic_yield_per=cursor.context._is_server_side,
276 )
277
278 # filtered and single_entity are used to indicate to legacy Query that the
279 # query has ORM entities, so legacy deduping and scalars should be called
280 # on the result.
281 result._attributes = result._attributes.union(
282 dict(filtered=filtered, is_single_entity=single_entity)
283 )
284
285 # multi_row_eager_loaders OTOH is specific to joinedload.
286 if context.compile_state.multi_row_eager_loaders:
287
288 def require_unique(obj):
289 raise sa_exc.InvalidRequestError(
290 "The unique() method must be invoked on this Result, "
291 "as it contains results that include joined eager loads "
292 "against collections"
293 )
294
295 result._unique_filter_state = (None, require_unique)
296
297 if context.yield_per:
298 result.yield_per(context.yield_per)
299
300 return result
301
302
303@util.preload_module("sqlalchemy.orm.context")
304def merge_frozen_result(session, statement, frozen_result, load=True):
305 """Merge a :class:`_engine.FrozenResult` back into a :class:`_orm.Session`,
306 returning a new :class:`_engine.Result` object with :term:`persistent`
307 objects.
308
309 See the section :ref:`do_orm_execute_re_executing` for an example.
310
311 .. seealso::
312
313 :ref:`do_orm_execute_re_executing`
314
315 :meth:`_engine.Result.freeze`
316
317 :class:`_engine.FrozenResult`
318
319 """
320 querycontext = util.preloaded.orm_context
321
322 if load:
323 # flush current contents if we expect to load data
324 session._autoflush()
325
326 ctx = querycontext._ORMSelectCompileState._create_entities_collection(
327 statement, legacy=False
328 )
329
330 with session.no_autoflush:
331 mapped_entities = [
332 i
333 for i, e in enumerate(ctx._entities)
334 if isinstance(e, querycontext._MapperEntity)
335 ]
336 keys = [ent._label_name for ent in ctx._entities]
337
338 keyed_tuple = result_tuple(
339 keys, [ent._extra_entities for ent in ctx._entities]
340 )
341
342 result = []
343 for newrow in frozen_result.rewrite_rows():
344 for i in mapped_entities:
345 if newrow[i] is not None:
346 newrow[i] = session._merge(
347 attributes.instance_state(newrow[i]),
348 attributes.instance_dict(newrow[i]),
349 load=load,
350 _recursive={},
351 _resolve_conflict_map={},
352 )
353
354 result.append(keyed_tuple(newrow))
355
356 return frozen_result.with_new_rows(result)
357
358
359@util.became_legacy_20(
360 ":func:`_orm.merge_result`",
361 alternative="The function as well as the method on :class:`_orm.Query` "
362 "is superseded by the :func:`_orm.merge_frozen_result` function.",
363)
364@util.preload_module("sqlalchemy.orm.context")
365def merge_result(
366 query: Query[Any],
367 iterator: Union[FrozenResult, Iterable[Sequence[Any]], Iterable[object]],
368 load: bool = True,
369) -> Union[FrozenResult, Iterable[Any]]:
370 """Merge a result into the given :class:`.Query` object's Session.
371
372 See :meth:`_orm.Query.merge_result` for top-level documentation on this
373 function.
374
375 """
376
377 querycontext = util.preloaded.orm_context
378
379 session = query.session
380 if load:
381 # flush current contents if we expect to load data
382 session._autoflush()
383
384 # TODO: need test coverage and documentation for the FrozenResult
385 # use case.
386 if isinstance(iterator, FrozenResult):
387 frozen_result = iterator
388 iterator = iter(frozen_result.data)
389 else:
390 frozen_result = None
391
392 ctx = querycontext._ORMSelectCompileState._create_entities_collection(
393 query, legacy=True
394 )
395
396 autoflush = session.autoflush
397 try:
398 session.autoflush = False
399 single_entity = not frozen_result and len(ctx._entities) == 1
400
401 if single_entity:
402 if isinstance(ctx._entities[0], querycontext._MapperEntity):
403 result = [
404 session._merge(
405 attributes.instance_state(instance),
406 attributes.instance_dict(instance),
407 load=load,
408 _recursive={},
409 _resolve_conflict_map={},
410 )
411 for instance in iterator
412 ]
413 else:
414 result = list(iterator)
415 else:
416 mapped_entities = [
417 i
418 for i, e in enumerate(ctx._entities)
419 if isinstance(e, querycontext._MapperEntity)
420 ]
421 result = []
422 keys = [ent._label_name for ent in ctx._entities]
423
424 keyed_tuple = result_tuple(
425 keys, [ent._extra_entities for ent in ctx._entities]
426 )
427
428 for row in iterator:
429 newrow = list(row)
430 for i in mapped_entities:
431 if newrow[i] is not None:
432 newrow[i] = session._merge(
433 attributes.instance_state(newrow[i]),
434 attributes.instance_dict(newrow[i]),
435 load=load,
436 _recursive={},
437 _resolve_conflict_map={},
438 )
439 result.append(keyed_tuple(newrow))
440
441 if frozen_result:
442 return frozen_result.with_new_rows(result)
443 else:
444 return iter(result)
445 finally:
446 session.autoflush = autoflush
447
448
449def get_from_identity(
450 session: Session,
451 mapper: Mapper[_O],
452 key: _IdentityKeyType[_O],
453 passive: PassiveFlag,
454) -> Union[LoaderCallableStatus, Optional[_O]]:
455 """Look up the given key in the given session's identity map,
456 check the object for expired state if found.
457
458 """
459 instance = session.identity_map.get(key)
460 if instance is not None:
461 state = attributes.instance_state(instance)
462
463 if mapper.inherits and not state.mapper.isa(mapper):
464 return attributes.PASSIVE_CLASS_MISMATCH
465
466 # expired - ensure it still exists
467 if state.expired:
468 if not passive & attributes.SQL_OK:
469 # TODO: no coverage here
470 return attributes.PASSIVE_NO_RESULT
471 elif not passive & attributes.RELATED_OBJECT_OK:
472 # this mode is used within a flush and the instance's
473 # expired state will be checked soon enough, if necessary.
474 # also used by immediateloader for a mutually-dependent
475 # o2m->m2m load, :ticket:`6301`
476 return instance
477 try:
478 state._load_expired(state, passive)
479 except orm_exc.ObjectDeletedError:
480 session._remove_newly_deleted([state])
481 return None
482 return instance
483 else:
484 return None
485
486
487def _load_on_ident(
488 session: Session,
489 statement: Union[Select, FromStatement],
490 key: Optional[_IdentityKeyType],
491 *,
492 load_options: Optional[Sequence[ORMOption]] = None,
493 refresh_state: Optional[InstanceState[Any]] = None,
494 with_for_update: Optional[ForUpdateArg] = None,
495 only_load_props: Optional[Iterable[str]] = None,
496 no_autoflush: bool = False,
497 bind_arguments: Mapping[str, Any] = util.EMPTY_DICT,
498 execution_options: _ExecuteOptions = util.EMPTY_DICT,
499 require_pk_cols: bool = False,
500 is_user_refresh: bool = False,
501):
502 """Load the given identity key from the database."""
503 if key is not None:
504 ident = key[1]
505 identity_token = key[2]
506 else:
507 ident = identity_token = None
508
509 return _load_on_pk_identity(
510 session,
511 statement,
512 ident,
513 load_options=load_options,
514 refresh_state=refresh_state,
515 with_for_update=with_for_update,
516 only_load_props=only_load_props,
517 identity_token=identity_token,
518 no_autoflush=no_autoflush,
519 bind_arguments=bind_arguments,
520 execution_options=execution_options,
521 require_pk_cols=require_pk_cols,
522 is_user_refresh=is_user_refresh,
523 )
524
525
526def _load_on_pk_identity(
527 session: Session,
528 statement: Union[Select, FromStatement],
529 primary_key_identity: Optional[Tuple[Any, ...]],
530 *,
531 load_options: Optional[Sequence[ORMOption]] = None,
532 refresh_state: Optional[InstanceState[Any]] = None,
533 with_for_update: Optional[ForUpdateArg] = None,
534 only_load_props: Optional[Iterable[str]] = None,
535 identity_token: Optional[Any] = None,
536 no_autoflush: bool = False,
537 bind_arguments: Mapping[str, Any] = util.EMPTY_DICT,
538 execution_options: _ExecuteOptions = util.EMPTY_DICT,
539 require_pk_cols: bool = False,
540 is_user_refresh: bool = False,
541):
542 """Load the given primary key identity from the database."""
543
544 query = statement
545 q = query._clone()
546
547 assert not q._is_lambda_element
548
549 if load_options is None:
550 load_options = QueryContext.default_load_options
551
552 if (
553 statement._compile_options
554 is SelectState.default_select_compile_options
555 ):
556 compile_options = _ORMCompileState.default_compile_options
557 else:
558 compile_options = statement._compile_options
559
560 if primary_key_identity is not None:
561 mapper = query._propagate_attrs["plugin_subject"]
562
563 (_get_clause, _get_params) = mapper._get_clause
564
565 # None present in ident - turn those comparisons
566 # into "IS NULL"
567 if None in primary_key_identity:
568 nones = {
569 _get_params[col].key
570 for col, value in zip(mapper.primary_key, primary_key_identity)
571 if value is None
572 }
573
574 _get_clause = sql_util.adapt_criterion_to_null(_get_clause, nones)
575
576 if len(nones) == len(primary_key_identity):
577 util.warn(
578 "fully NULL primary key identity cannot load any "
579 "object. This condition may raise an error in a future "
580 "release."
581 )
582
583 q._where_criteria = (
584 sql_util._deep_annotate(_get_clause, {"_orm_adapt": True}),
585 )
586
587 params = {
588 _get_params[primary_key].key: id_val
589 for id_val, primary_key in zip(
590 primary_key_identity, mapper.primary_key
591 )
592 }
593 else:
594 params = None
595
596 if with_for_update is not None:
597 version_check = True
598 q._for_update_arg = with_for_update
599 elif query._for_update_arg is not None:
600 version_check = True
601 q._for_update_arg = query._for_update_arg
602 else:
603 version_check = False
604
605 if require_pk_cols and only_load_props:
606 if not refresh_state:
607 raise sa_exc.ArgumentError(
608 "refresh_state is required when require_pk_cols is present"
609 )
610
611 refresh_state_prokeys = refresh_state.mapper._primary_key_propkeys
612 has_changes = {
613 key
614 for key in refresh_state_prokeys.difference(only_load_props)
615 if refresh_state.attrs[key].history.has_changes()
616 }
617 if has_changes:
618 # raise if pending pk changes are present.
619 # technically, this could be limited to the case where we have
620 # relationships in the only_load_props collection to be refreshed
621 # also (and only ones that have a secondary eager loader, at that).
622 # however, the error is in place across the board so that behavior
623 # here is easier to predict. The use case it prevents is one
624 # of mutating PK attrs, leaving them unflushed,
625 # calling session.refresh(), and expecting those attrs to remain
626 # still unflushed. It seems likely someone doing all those
627 # things would be better off having the PK attributes flushed
628 # to the database before tinkering like that (session.refresh() is
629 # tinkering).
630 raise sa_exc.InvalidRequestError(
631 f"Please flush pending primary key changes on "
632 "attributes "
633 f"{has_changes} for mapper {refresh_state.mapper} before "
634 "proceeding with a refresh"
635 )
636
637 # overall, the ORM has no internal flow right now for "dont load the
638 # primary row of an object at all, but fire off
639 # selectinload/subqueryload/immediateload for some relationships".
640 # It would probably be a pretty big effort to add such a flow. So
641 # here, the case for #8703 is introduced; user asks to refresh some
642 # relationship attributes only which are
643 # selectinload/subqueryload/immediateload/ etc. (not joinedload).
644 # ORM complains there's no columns in the primary row to load.
645 # So here, we just add the PK cols if that
646 # case is detected, so that there is a SELECT emitted for the primary
647 # row.
648 #
649 # Let's just state right up front, for this one little case,
650 # the ORM here is adding a whole extra SELECT just to satisfy
651 # limitations in the internal flow. This is really not a thing
652 # SQLAlchemy finds itself doing like, ever, obviously, we are
653 # constantly working to *remove* SELECTs we don't need. We
654 # rationalize this for now based on 1. session.refresh() is not
655 # commonly used 2. session.refresh() with only relationship attrs is
656 # even less commonly used 3. the SELECT in question is very low
657 # latency.
658 #
659 # to add the flow to not include the SELECT, the quickest way
660 # might be to just manufacture a single-row result set to send off to
661 # instances(), but we'd have to weave that into context.py and all
662 # that. For 2.0.0, we have enough big changes to navigate for now.
663 #
664 mp = refresh_state.mapper._props
665 for p in only_load_props:
666 if mp[p]._is_relationship:
667 only_load_props = refresh_state_prokeys.union(only_load_props)
668 break
669
670 if refresh_state and refresh_state.load_options:
671 compile_options += {"_current_path": refresh_state.load_path.parent}
672 q = q.options(*refresh_state.load_options)
673
674 new_compile_options, load_options = _set_get_options(
675 compile_options,
676 load_options,
677 version_check=version_check,
678 only_load_props=only_load_props,
679 refresh_state=refresh_state,
680 identity_token=identity_token,
681 is_user_refresh=is_user_refresh,
682 )
683
684 q._compile_options = new_compile_options
685 q._order_by = None
686
687 if no_autoflush:
688 load_options += {"_autoflush": False}
689
690 execution_options = util.EMPTY_DICT.merge_with(
691 execution_options, {"_sa_orm_load_options": load_options}
692 )
693 result = (
694 session.execute(
695 q,
696 params=params,
697 execution_options=execution_options,
698 bind_arguments=bind_arguments,
699 )
700 .unique()
701 .scalars()
702 )
703
704 try:
705 return result.one()
706 except orm_exc.NoResultFound:
707 return None
708
709
710def _set_get_options(
711 compile_opt,
712 load_opt,
713 populate_existing=None,
714 version_check=None,
715 only_load_props=None,
716 refresh_state=None,
717 identity_token=None,
718 is_user_refresh=None,
719):
720 compile_options = {}
721 load_options = {}
722 if version_check:
723 load_options["_version_check"] = version_check
724 if populate_existing:
725 load_options["_populate_existing"] = populate_existing
726 if refresh_state:
727 load_options["_refresh_state"] = refresh_state
728 compile_options["_for_refresh_state"] = True
729 if only_load_props:
730 compile_options["_only_load_props"] = frozenset(only_load_props)
731 if identity_token:
732 load_options["_identity_token"] = identity_token
733
734 if is_user_refresh:
735 load_options["_is_user_refresh"] = is_user_refresh
736 if load_options:
737 load_opt += load_options
738 if compile_options:
739 compile_opt += compile_options
740
741 return compile_opt, load_opt
742
743
744def _setup_entity_query(
745 compile_state,
746 mapper,
747 query_entity,
748 path,
749 adapter,
750 column_collection,
751 with_polymorphic=None,
752 only_load_props=None,
753 polymorphic_discriminator=None,
754 **kw,
755):
756 if with_polymorphic:
757 poly_properties = mapper._iterate_polymorphic_properties(
758 with_polymorphic
759 )
760 else:
761 poly_properties = mapper._polymorphic_properties
762
763 quick_populators = {}
764
765 path.set(compile_state.attributes, "memoized_setups", quick_populators)
766
767 # for the lead entities in the path, e.g. not eager loads, and
768 # assuming a user-passed aliased class, e.g. not a from_self() or any
769 # implicit aliasing, don't add columns to the SELECT that aren't
770 # in the thing that's aliased.
771 check_for_adapt = adapter and len(path) == 1 and path[-1].is_aliased_class
772
773 for value in poly_properties:
774 if only_load_props and value.key not in only_load_props:
775 continue
776 value.setup(
777 compile_state,
778 query_entity,
779 path,
780 adapter,
781 only_load_props=only_load_props,
782 column_collection=column_collection,
783 memoized_populators=quick_populators,
784 check_for_adapt=check_for_adapt,
785 **kw,
786 )
787
788 if (
789 polymorphic_discriminator is not None
790 and polymorphic_discriminator is not mapper.polymorphic_on
791 ):
792 if adapter:
793 pd = adapter.columns[polymorphic_discriminator]
794 else:
795 pd = polymorphic_discriminator
796 column_collection.append(pd)
797
798
799def _warn_for_runid_changed(state):
800 util.warn(
801 "Loading context for %s has changed within a load/refresh "
802 "handler, suggesting a row refresh operation took place. If this "
803 "event handler is expected to be "
804 "emitting row refresh operations within an existing load or refresh "
805 "operation, set restore_load_context=True when establishing the "
806 "listener to ensure the context remains unchanged when the event "
807 "handler completes." % (state_str(state),)
808 )
809
810
811def _instance_processor(
812 query_entity,
813 mapper,
814 context,
815 result,
816 path,
817 adapter,
818 only_load_props=None,
819 refresh_state=None,
820 polymorphic_discriminator=None,
821 _polymorphic_from=None,
822):
823 """Produce a mapper level row processor callable
824 which processes rows into mapped instances."""
825
826 # note that this method, most of which exists in a closure
827 # called _instance(), resists being broken out, as
828 # attempts to do so tend to add significant function
829 # call overhead. _instance() is the most
830 # performance-critical section in the whole ORM.
831
832 identity_class = mapper._identity_class
833 compile_state = context.compile_state
834
835 # look for "row getter" functions that have been assigned along
836 # with the compile state that were cached from a previous load.
837 # these are operator.itemgetter() objects that each will extract a
838 # particular column from each row.
839
840 getter_key = ("getters", mapper)
841 getters = path.get(compile_state.attributes, getter_key, None)
842
843 if getters is None:
844 # no getters, so go through a list of attributes we are loading for,
845 # and the ones that are column based will have already put information
846 # for us in another collection "memoized_setups", which represents the
847 # output of the LoaderStrategy.setup_query() method. We can just as
848 # easily call LoaderStrategy.create_row_processor for each, but by
849 # getting it all at once from setup_query we save another method call
850 # per attribute.
851 props = mapper._prop_set
852 if only_load_props is not None:
853 props = props.intersection(
854 mapper._props[k] for k in only_load_props
855 )
856
857 quick_populators = path.get(
858 context.attributes, "memoized_setups", EMPTY_DICT
859 )
860
861 todo = []
862 cached_populators = {
863 "new": [],
864 "quick": [],
865 "deferred": [],
866 "expire": [],
867 "existing": [],
868 "eager": [],
869 }
870
871 if refresh_state is None:
872 # we can also get the "primary key" tuple getter function
873 pk_cols = mapper.primary_key
874
875 if adapter:
876 pk_cols = [adapter.columns[c] for c in pk_cols]
877 primary_key_getter = result._tuple_getter(pk_cols)
878 else:
879 primary_key_getter = None
880
881 getters = {
882 "cached_populators": cached_populators,
883 "todo": todo,
884 "primary_key_getter": primary_key_getter,
885 }
886 for prop in props:
887 if prop in quick_populators:
888 # this is an inlined path just for column-based attributes.
889 col = quick_populators[prop]
890 if col is _DEFER_FOR_STATE:
891 cached_populators["new"].append(
892 (prop.key, prop._deferred_column_loader)
893 )
894 elif col is _SET_DEFERRED_EXPIRED:
895 # note that in this path, we are no longer
896 # searching in the result to see if the column might
897 # be present in some unexpected way.
898 cached_populators["expire"].append((prop.key, False))
899 elif col is _RAISE_FOR_STATE:
900 cached_populators["new"].append(
901 (prop.key, prop._raise_column_loader)
902 )
903 else:
904 getter = None
905 if adapter:
906 # this logic had been removed for all 1.4 releases
907 # up until 1.4.18; the adapter here is particularly
908 # the compound eager adapter which isn't accommodated
909 # in the quick_populators right now. The "fallback"
910 # logic below instead took over in many more cases
911 # until issue #6596 was identified.
912
913 # note there is still an issue where this codepath
914 # produces no "getter" for cases where a joined-inh
915 # mapping includes a labeled column property, meaning
916 # KeyError is caught internally and we fall back to
917 # _getter(col), which works anyway. The adapter
918 # here for joined inh without any aliasing might not
919 # be useful. Tests which see this include
920 # test.orm.inheritance.test_basic ->
921 # EagerTargetingTest.test_adapt_stringency
922 # OptimizedLoadTest.test_column_expression_joined
923 # PolymorphicOnNotLocalTest.test_polymorphic_on_column_prop # noqa: E501
924 #
925
926 adapted_col = adapter.columns[col]
927 if adapted_col is not None:
928 getter = result._getter(adapted_col, False)
929 if not getter:
930 getter = result._getter(col, False)
931 if getter:
932 cached_populators["quick"].append((prop.key, getter))
933 else:
934 # fall back to the ColumnProperty itself, which
935 # will iterate through all of its columns
936 # to see if one fits
937 prop.create_row_processor(
938 context,
939 query_entity,
940 path,
941 mapper,
942 result,
943 adapter,
944 cached_populators,
945 )
946 else:
947 # loader strategies like subqueryload, selectinload,
948 # joinedload, basically relationships, these need to interact
949 # with the context each time to work correctly.
950 todo.append(prop)
951
952 path.set(compile_state.attributes, getter_key, getters)
953
954 cached_populators = getters["cached_populators"]
955
956 populators = {key: list(value) for key, value in cached_populators.items()}
957 for prop in getters["todo"]:
958 prop.create_row_processor(
959 context, query_entity, path, mapper, result, adapter, populators
960 )
961
962 propagated_loader_options = context.propagated_loader_options
963 load_path = (
964 context.compile_state.current_path + path
965 if context.compile_state.current_path.path
966 else path
967 )
968
969 session_identity_map = context.session.identity_map
970
971 populate_existing = context.populate_existing or mapper.always_refresh
972 load_evt = bool(mapper.class_manager.dispatch.load)
973 refresh_evt = bool(mapper.class_manager.dispatch.refresh)
974 persistent_evt = bool(context.session.dispatch.loaded_as_persistent)
975 if persistent_evt:
976 loaded_as_persistent = context.session.dispatch.loaded_as_persistent
977 instance_state = attributes.instance_state
978 instance_dict = attributes.instance_dict
979 session_id = context.session.hash_key
980 runid = context.runid
981 identity_token = context.identity_token
982
983 version_check = context.version_check
984 if version_check:
985 version_id_col = mapper.version_id_col
986 if version_id_col is not None:
987 if adapter:
988 version_id_col = adapter.columns[version_id_col]
989 version_id_getter = result._getter(version_id_col)
990 else:
991 version_id_getter = None
992
993 if not refresh_state and _polymorphic_from is not None:
994 key = ("loader", path.path)
995
996 if key in context.attributes and context.attributes[key].strategy == (
997 ("selectinload_polymorphic", True),
998 ):
999 option_entities = context.attributes[key].local_opts["entities"]
1000 else:
1001 option_entities = None
1002 selectin_load_via = mapper._should_selectin_load(
1003 option_entities,
1004 _polymorphic_from,
1005 )
1006
1007 if selectin_load_via and selectin_load_via is not _polymorphic_from:
1008 # only_load_props goes w/ refresh_state only, and in a refresh
1009 # we are a single row query for the exact entity; polymorphic
1010 # loading does not apply
1011 assert only_load_props is None
1012
1013 if selectin_load_via.is_mapper:
1014 _load_supers = []
1015 _endmost_mapper = selectin_load_via
1016 while (
1017 _endmost_mapper
1018 and _endmost_mapper is not _polymorphic_from
1019 ):
1020 _load_supers.append(_endmost_mapper)
1021 _endmost_mapper = _endmost_mapper.inherits
1022 else:
1023 _load_supers = [selectin_load_via]
1024
1025 for _selectinload_entity in _load_supers:
1026 if _PostLoad.path_exists(
1027 context, load_path, _selectinload_entity
1028 ):
1029 continue
1030 callable_ = _load_subclass_via_in(
1031 context,
1032 path,
1033 _selectinload_entity,
1034 _polymorphic_from,
1035 option_entities,
1036 )
1037 _PostLoad.callable_for_path(
1038 context,
1039 load_path,
1040 _selectinload_entity.mapper,
1041 _selectinload_entity,
1042 callable_,
1043 _selectinload_entity,
1044 )
1045
1046 post_load = _PostLoad.for_context(context, load_path, only_load_props)
1047
1048 if refresh_state:
1049 refresh_identity_key = refresh_state.key
1050 if refresh_identity_key is None:
1051 # super-rare condition; a refresh is being called
1052 # on a non-instance-key instance; this is meant to only
1053 # occur within a flush()
1054 refresh_identity_key = mapper._identity_key_from_state(
1055 refresh_state
1056 )
1057 else:
1058 refresh_identity_key = None
1059
1060 primary_key_getter = getters["primary_key_getter"]
1061
1062 if mapper.allow_partial_pks:
1063 is_not_primary_key = _none_set.issuperset
1064 else:
1065 is_not_primary_key = _none_set.intersection
1066
1067 def _instance(row):
1068 # determine the state that we'll be populating
1069 if refresh_identity_key:
1070 # fixed state that we're refreshing
1071 state = refresh_state
1072 instance = state.obj()
1073 dict_ = instance_dict(instance)
1074 isnew = state.runid != runid
1075 currentload = True
1076 loaded_instance = False
1077 else:
1078 # look at the row, see if that identity is in the
1079 # session, or we have to create a new one
1080 identitykey = (
1081 identity_class,
1082 primary_key_getter(row),
1083 identity_token,
1084 )
1085
1086 instance = session_identity_map.get(identitykey)
1087
1088 if instance is not None:
1089 # existing instance
1090 state = instance_state(instance)
1091 dict_ = instance_dict(instance)
1092
1093 isnew = state.runid != runid
1094 currentload = not isnew
1095 loaded_instance = False
1096
1097 if version_check and version_id_getter and not currentload:
1098 _validate_version_id(
1099 mapper, state, dict_, row, version_id_getter
1100 )
1101
1102 else:
1103 # create a new instance
1104
1105 # check for non-NULL values in the primary key columns,
1106 # else no entity is returned for the row
1107 if is_not_primary_key(identitykey[1]):
1108 return None
1109
1110 isnew = True
1111 currentload = True
1112 loaded_instance = True
1113
1114 instance = mapper.class_manager.new_instance()
1115
1116 dict_ = instance_dict(instance)
1117 state = instance_state(instance)
1118 state.key = identitykey
1119 state.identity_token = identity_token
1120
1121 # attach instance to session.
1122 state.session_id = session_id
1123 session_identity_map._add_unpresent(state, identitykey)
1124
1125 effective_populate_existing = populate_existing
1126 if refresh_state is state:
1127 effective_populate_existing = True
1128
1129 # populate. this looks at whether this state is new
1130 # for this load or was existing, and whether or not this
1131 # row is the first row with this identity.
1132 if currentload or effective_populate_existing:
1133 # full population routines. Objects here are either
1134 # just created, or we are doing a populate_existing
1135
1136 # be conservative about setting load_path when populate_existing
1137 # is in effect; want to maintain options from the original
1138 # load. see test_expire->test_refresh_maintains_deferred_options
1139 if isnew and (
1140 propagated_loader_options or not effective_populate_existing
1141 ):
1142 state.load_options = propagated_loader_options
1143 state.load_path = load_path
1144
1145 _populate_full(
1146 context,
1147 row,
1148 state,
1149 dict_,
1150 isnew,
1151 load_path,
1152 loaded_instance,
1153 effective_populate_existing,
1154 populators,
1155 )
1156
1157 if isnew:
1158 # state.runid should be equal to context.runid / runid
1159 # here, however for event checks we are being more conservative
1160 # and checking against existing run id
1161 # assert state.runid == runid
1162
1163 existing_runid = state.runid
1164
1165 if loaded_instance:
1166 if load_evt:
1167 state.manager.dispatch.load(state, context)
1168 if state.runid != existing_runid:
1169 _warn_for_runid_changed(state)
1170 if persistent_evt:
1171 loaded_as_persistent(context.session, state)
1172 if state.runid != existing_runid:
1173 _warn_for_runid_changed(state)
1174 elif refresh_evt:
1175 state.manager.dispatch.refresh(
1176 state, context, only_load_props
1177 )
1178 if state.runid != runid:
1179 _warn_for_runid_changed(state)
1180
1181 if effective_populate_existing or state.modified:
1182 if refresh_state and only_load_props:
1183 state._commit(dict_, only_load_props)
1184 else:
1185 state._commit_all(dict_, session_identity_map)
1186
1187 if post_load:
1188 post_load.add_state(state, True)
1189
1190 else:
1191 # partial population routines, for objects that were already
1192 # in the Session, but a row matches them; apply eager loaders
1193 # on existing objects, etc.
1194 unloaded = state.unloaded
1195 isnew = state not in context.partials
1196
1197 if not isnew or unloaded or populators["eager"]:
1198 # state is having a partial set of its attributes
1199 # refreshed. Populate those attributes,
1200 # and add to the "context.partials" collection.
1201
1202 to_load = _populate_partial(
1203 context,
1204 row,
1205 state,
1206 dict_,
1207 isnew,
1208 load_path,
1209 unloaded,
1210 populators,
1211 )
1212
1213 if isnew:
1214 if refresh_evt:
1215 existing_runid = state.runid
1216 state.manager.dispatch.refresh(state, context, to_load)
1217 if state.runid != existing_runid:
1218 _warn_for_runid_changed(state)
1219
1220 state._commit(dict_, to_load)
1221
1222 if post_load and context.invoke_all_eagers:
1223 post_load.add_state(state, False)
1224
1225 return instance
1226
1227 if mapper.polymorphic_map and not _polymorphic_from and not refresh_state:
1228 # if we are doing polymorphic, dispatch to a different _instance()
1229 # method specific to the subclass mapper
1230 def ensure_no_pk(row):
1231 identitykey = (
1232 identity_class,
1233 primary_key_getter(row),
1234 identity_token,
1235 )
1236 if not is_not_primary_key(identitykey[1]):
1237 return identitykey
1238 else:
1239 return None
1240
1241 _instance = _decorate_polymorphic_switch(
1242 _instance,
1243 context,
1244 query_entity,
1245 mapper,
1246 result,
1247 path,
1248 polymorphic_discriminator,
1249 adapter,
1250 ensure_no_pk,
1251 )
1252
1253 return _instance
1254
1255
1256def _load_subclass_via_in(
1257 context, path, entity, polymorphic_from, option_entities
1258):
1259 mapper = entity.mapper
1260
1261 # TODO: polymorphic_from seems to be a Mapper in all cases.
1262 # this is likely not needed, but as we dont have typing in loading.py
1263 # yet, err on the safe side
1264 polymorphic_from_mapper = polymorphic_from.mapper
1265 not_against_basemost = polymorphic_from_mapper.inherits is not None
1266
1267 zero_idx = len(mapper.base_mapper.primary_key) == 1
1268
1269 if entity.is_aliased_class or not_against_basemost:
1270 q, enable_opt, disable_opt = mapper._subclass_load_via_in(
1271 entity, polymorphic_from
1272 )
1273 else:
1274 q, enable_opt, disable_opt = mapper._subclass_load_via_in_mapper
1275
1276 def do_load(context, path, states, load_only, effective_entity):
1277 if not option_entities:
1278 # filter out states for those that would have selectinloaded
1279 # from another loader
1280 # TODO: we are currently ignoring the case where the
1281 # "selectin_polymorphic" option is used, as this is much more
1282 # complex / specific / very uncommon API use
1283 states = [
1284 (s, v)
1285 for s, v in states
1286 if s.mapper._would_selectin_load_only_from_given_mapper(mapper)
1287 ]
1288
1289 if not states:
1290 return
1291
1292 orig_query = context.query
1293
1294 if path.parent:
1295 enable_opt_lcl = enable_opt._prepend_path(path)
1296 disable_opt_lcl = disable_opt._prepend_path(path)
1297 else:
1298 enable_opt_lcl = enable_opt
1299 disable_opt_lcl = disable_opt
1300 options = (
1301 (enable_opt_lcl,) + orig_query._with_options + (disable_opt_lcl,)
1302 )
1303
1304 q2 = q.options(*options)
1305
1306 q2._compile_options = context.compile_state.default_compile_options
1307 q2._compile_options += {"_current_path": path.parent}
1308
1309 if context.populate_existing:
1310 q2 = q2.execution_options(populate_existing=True)
1311
1312 context.session.execute(
1313 q2,
1314 dict(
1315 primary_keys=[
1316 state.key[1][0] if zero_idx else state.key[1]
1317 for state, load_attrs in states
1318 ]
1319 ),
1320 ).unique().scalars().all()
1321
1322 return do_load
1323
1324
1325def _populate_full(
1326 context,
1327 row,
1328 state,
1329 dict_,
1330 isnew,
1331 load_path,
1332 loaded_instance,
1333 populate_existing,
1334 populators,
1335):
1336 if isnew:
1337 # first time we are seeing a row with this identity.
1338 state.runid = context.runid
1339
1340 for key, getter in populators["quick"]:
1341 dict_[key] = getter(row)
1342 if populate_existing:
1343 for key, set_callable in populators["expire"]:
1344 dict_.pop(key, None)
1345 if set_callable:
1346 state.expired_attributes.add(key)
1347 else:
1348 for key, set_callable in populators["expire"]:
1349 if set_callable:
1350 state.expired_attributes.add(key)
1351
1352 for key, populator in populators["new"]:
1353 populator(state, dict_, row)
1354
1355 elif load_path != state.load_path:
1356 # new load path, e.g. object is present in more than one
1357 # column position in a series of rows
1358 state.load_path = load_path
1359
1360 # if we have data, and the data isn't in the dict, OK, let's put
1361 # it in.
1362 for key, getter in populators["quick"]:
1363 if key not in dict_:
1364 dict_[key] = getter(row)
1365
1366 # otherwise treat like an "already seen" row
1367 for key, populator in populators["existing"]:
1368 populator(state, dict_, row)
1369 # TODO: allow "existing" populator to know this is
1370 # a new path for the state:
1371 # populator(state, dict_, row, new_path=True)
1372
1373 else:
1374 # have already seen rows with this identity in this same path.
1375 for key, populator in populators["existing"]:
1376 populator(state, dict_, row)
1377
1378 # TODO: same path
1379 # populator(state, dict_, row, new_path=False)
1380
1381
1382def _populate_partial(
1383 context, row, state, dict_, isnew, load_path, unloaded, populators
1384):
1385 if not isnew:
1386 if unloaded:
1387 # extra pass, see #8166
1388 for key, getter in populators["quick"]:
1389 if key in unloaded:
1390 dict_[key] = getter(row)
1391
1392 to_load = context.partials[state]
1393 for key, populator in populators["existing"]:
1394 if key in to_load:
1395 populator(state, dict_, row)
1396 else:
1397 to_load = unloaded
1398 context.partials[state] = to_load
1399
1400 for key, getter in populators["quick"]:
1401 if key in to_load:
1402 dict_[key] = getter(row)
1403 for key, set_callable in populators["expire"]:
1404 if key in to_load:
1405 dict_.pop(key, None)
1406 if set_callable:
1407 state.expired_attributes.add(key)
1408 for key, populator in populators["new"]:
1409 if key in to_load:
1410 populator(state, dict_, row)
1411
1412 for key, populator in populators["eager"]:
1413 if key not in unloaded:
1414 populator(state, dict_, row)
1415
1416 return to_load
1417
1418
1419def _validate_version_id(mapper, state, dict_, row, getter):
1420 if mapper._get_state_attr_by_column(
1421 state, dict_, mapper.version_id_col
1422 ) != getter(row):
1423 raise orm_exc.StaleDataError(
1424 "Instance '%s' has version id '%s' which "
1425 "does not match database-loaded version id '%s'."
1426 % (
1427 state_str(state),
1428 mapper._get_state_attr_by_column(
1429 state, dict_, mapper.version_id_col
1430 ),
1431 getter(row),
1432 )
1433 )
1434
1435
1436def _decorate_polymorphic_switch(
1437 instance_fn,
1438 context,
1439 query_entity,
1440 mapper,
1441 result,
1442 path,
1443 polymorphic_discriminator,
1444 adapter,
1445 ensure_no_pk,
1446):
1447 if polymorphic_discriminator is not None:
1448 polymorphic_on = polymorphic_discriminator
1449 else:
1450 polymorphic_on = mapper.polymorphic_on
1451 if polymorphic_on is None:
1452 return instance_fn
1453
1454 if adapter:
1455 polymorphic_on = adapter.columns[polymorphic_on]
1456
1457 def configure_subclass_mapper(discriminator):
1458 try:
1459 sub_mapper = mapper.polymorphic_map[discriminator]
1460 except KeyError:
1461 raise AssertionError(
1462 "No such polymorphic_identity %r is defined" % discriminator
1463 )
1464 else:
1465 if sub_mapper is mapper:
1466 return None
1467 elif not sub_mapper.isa(mapper):
1468 return False
1469
1470 return _instance_processor(
1471 query_entity,
1472 sub_mapper,
1473 context,
1474 result,
1475 path,
1476 adapter,
1477 _polymorphic_from=mapper,
1478 )
1479
1480 polymorphic_instances = util.PopulateDict(configure_subclass_mapper)
1481
1482 getter = result._getter(polymorphic_on)
1483
1484 def polymorphic_instance(row):
1485 discriminator = getter(row)
1486 if discriminator is not None:
1487 _instance = polymorphic_instances[discriminator]
1488 if _instance:
1489 return _instance(row)
1490 elif _instance is False:
1491 identitykey = ensure_no_pk(row)
1492
1493 if identitykey:
1494 raise sa_exc.InvalidRequestError(
1495 "Row with identity key %s can't be loaded into an "
1496 "object; the polymorphic discriminator column '%s' "
1497 "refers to %s, which is not a sub-mapper of "
1498 "the requested %s"
1499 % (
1500 identitykey,
1501 polymorphic_on,
1502 mapper.polymorphic_map[discriminator],
1503 mapper,
1504 )
1505 )
1506 else:
1507 return None
1508 else:
1509 return instance_fn(row)
1510 else:
1511 identitykey = ensure_no_pk(row)
1512
1513 if identitykey:
1514 raise sa_exc.InvalidRequestError(
1515 "Row with identity key %s can't be loaded into an "
1516 "object; the polymorphic discriminator column '%s' is "
1517 "NULL" % (identitykey, polymorphic_on)
1518 )
1519 else:
1520 return None
1521
1522 return polymorphic_instance
1523
1524
1525class _PostLoad:
1526 """Track loaders and states for "post load" operations."""
1527
1528 __slots__ = "loaders", "states", "load_keys"
1529
1530 def __init__(self):
1531 self.loaders = {}
1532 self.states = util.OrderedDict()
1533 self.load_keys = None
1534
1535 def add_state(self, state, overwrite):
1536 # the states for a polymorphic load here are all shared
1537 # within a single PostLoad object among multiple subtypes.
1538 # Filtering of callables on a per-subclass basis needs to be done at
1539 # the invocation level
1540 self.states[state] = overwrite
1541
1542 def invoke(self, context, path):
1543 if not self.states:
1544 return
1545 path = path_registry.PathRegistry.coerce(path)
1546 for (
1547 effective_context,
1548 token,
1549 limit_to_mapper,
1550 loader,
1551 arg,
1552 kw,
1553 ) in self.loaders.values():
1554 states = [
1555 (state, overwrite)
1556 for state, overwrite in self.states.items()
1557 if state.manager.mapper.isa(limit_to_mapper)
1558 ]
1559 if states:
1560 loader(
1561 effective_context, path, states, self.load_keys, *arg, **kw
1562 )
1563 self.states.clear()
1564
1565 @classmethod
1566 def for_context(cls, context, path, only_load_props):
1567 pl = context.post_load_paths.get(path.path)
1568 if pl is not None and only_load_props:
1569 pl.load_keys = only_load_props
1570 return pl
1571
1572 @classmethod
1573 def path_exists(self, context, path, key):
1574 return (
1575 path.path in context.post_load_paths
1576 and key in context.post_load_paths[path.path].loaders
1577 )
1578
1579 @classmethod
1580 def callable_for_path(
1581 cls, context, path, limit_to_mapper, token, loader_callable, *arg, **kw
1582 ):
1583 if path.path in context.post_load_paths:
1584 pl = context.post_load_paths[path.path]
1585 else:
1586 pl = context.post_load_paths[path.path] = _PostLoad()
1587 pl.loaders[token] = (
1588 context,
1589 token,
1590 limit_to_mapper,
1591 loader_callable,
1592 arg,
1593 kw,
1594 )
1595
1596
1597def _load_scalar_attributes(mapper, state, attribute_names, passive):
1598 """initiate a column-based attribute refresh operation."""
1599
1600 # assert mapper is _state_mapper(state)
1601 session = state.session
1602 if not session:
1603 raise orm_exc.DetachedInstanceError(
1604 "Instance %s is not bound to a Session; "
1605 "attribute refresh operation cannot proceed" % (state_str(state))
1606 )
1607
1608 no_autoflush = bool(passive & attributes.NO_AUTOFLUSH)
1609
1610 # in the case of inheritance, particularly concrete and abstract
1611 # concrete inheritance, the class manager might have some keys
1612 # of attributes on the superclass that we didn't actually map.
1613 # These could be mapped as "concrete, don't load" or could be completely
1614 # excluded from the mapping and we know nothing about them. Filter them
1615 # here to prevent them from coming through.
1616 if attribute_names:
1617 attribute_names = attribute_names.intersection(mapper.attrs.keys())
1618
1619 if mapper.inherits and not mapper.concrete:
1620 # load based on committed attributes in the object, formed into
1621 # a truncated SELECT that only includes relevant tables. does not
1622 # currently use state.key
1623 statement = mapper._optimized_get_statement(state, attribute_names)
1624 if statement is not None:
1625 # undefer() isn't needed here because statement has the
1626 # columns needed already, this implicitly undefers that column
1627 stmt = FromStatement(mapper, statement)
1628
1629 return _load_on_ident(
1630 session,
1631 stmt,
1632 None,
1633 only_load_props=attribute_names,
1634 refresh_state=state,
1635 no_autoflush=no_autoflush,
1636 )
1637
1638 # normal load, use state.key as the identity to SELECT
1639 has_key = bool(state.key)
1640
1641 if has_key:
1642 identity_key = state.key
1643 else:
1644 # this codepath is rare - only valid when inside a flush, and the
1645 # object is becoming persistent but hasn't yet been assigned
1646 # an identity_key.
1647 # check here to ensure we have the attrs we need.
1648 pk_attrs = [
1649 mapper._columntoproperty[col].key for col in mapper.primary_key
1650 ]
1651 if state.expired_attributes.intersection(pk_attrs):
1652 raise sa_exc.InvalidRequestError(
1653 "Instance %s cannot be refreshed - it's not "
1654 " persistent and does not "
1655 "contain a full primary key." % state_str(state)
1656 )
1657 identity_key = mapper._identity_key_from_state(state)
1658
1659 if (
1660 _none_set.issubset(identity_key) and not mapper.allow_partial_pks
1661 ) or _none_set.issuperset(identity_key):
1662 util.warn_limited(
1663 "Instance %s to be refreshed doesn't "
1664 "contain a full primary key - can't be refreshed "
1665 "(and shouldn't be expired, either).",
1666 state_str(state),
1667 )
1668 return
1669
1670 result = _load_on_ident(
1671 session,
1672 select(mapper).set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL),
1673 identity_key,
1674 refresh_state=state,
1675 only_load_props=attribute_names,
1676 no_autoflush=no_autoflush,
1677 )
1678
1679 # if instance is pending, a refresh operation
1680 # may not complete (even if PK attributes are assigned)
1681 if has_key and result is None:
1682 raise orm_exc.ObjectDeletedError(state)