1# orm/loading.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: ignore-errors
8
9
10"""private module containing functions used to convert database
11rows into object instances and associated state.
12
13the functions here are called primarily by Query, Mapper,
14as well as some of the attribute loading strategies.
15
16"""
17
18from __future__ import annotations
19
20from typing import Any
21from typing import Dict
22from typing import Iterable
23from typing import List
24from typing import Mapping
25from typing import Optional
26from typing import Sequence
27from typing import Tuple
28from typing import TYPE_CHECKING
29from typing import TypeVar
30from typing import Union
31
32from . import attributes
33from . import exc as orm_exc
34from . import path_registry
35from .base import _DEFER_FOR_STATE
36from .base import _RAISE_FOR_STATE
37from .base import _SET_DEFERRED_EXPIRED
38from .base import PassiveFlag
39from .context import _ORMCompileState
40from .context import FromStatement
41from .context import QueryContext
42from .strategies import _SelectInLoader
43from .util import _none_set
44from .util import state_str
45from .. import exc as sa_exc
46from .. import util
47from ..engine import result_tuple
48from ..engine.result import ChunkedIteratorResult
49from ..engine.result import FrozenResult
50from ..engine.result import SimpleResultMetaData
51from ..sql import select
52from ..sql import util as sql_util
53from ..sql.selectable import ForUpdateArg
54from ..sql.selectable import LABEL_STYLE_TABLENAME_PLUS_COL
55from ..sql.selectable import SelectState
56from ..util import EMPTY_DICT
57from ..util.typing import TupleAny
58from ..util.typing import Unpack
59
60if TYPE_CHECKING:
61 from ._typing import _IdentityKeyType
62 from .base import LoaderCallableStatus
63 from .interfaces import ORMOption
64 from .mapper import Mapper
65 from .query import Query
66 from .session import Session
67 from .state import InstanceState
68 from ..engine.cursor import CursorResult
69 from ..engine.interfaces import _ExecuteOptions
70 from ..engine.result import Result
71 from ..sql import Select
72
73_T = TypeVar("_T", bound=Any)
74_O = TypeVar("_O", bound=object)
75_new_runid = util.counter()
76
77
78_PopulatorDict = Dict[str, List[Tuple[str, Any]]]
79
80
81def instances(
82 cursor: CursorResult[Unpack[TupleAny]], context: QueryContext
83) -> Result[Unpack[TupleAny]]:
84 """Return a :class:`.Result` given an ORM query context.
85
86 :param cursor: a :class:`.CursorResult`, generated by a statement
87 which came from :class:`.ORMCompileState`
88
89 :param context: a :class:`.QueryContext` object
90
91 :return: a :class:`.Result` object representing ORM results
92
93 .. versionchanged:: 1.4 The instances() function now uses
94 :class:`.Result` objects and has an all new interface.
95
96 """
97
98 context.runid = _new_runid()
99
100 if context.top_level_context:
101 is_top_level = False
102 context.post_load_paths = context.top_level_context.post_load_paths
103 else:
104 is_top_level = True
105 context.post_load_paths = {}
106
107 compile_state = context.compile_state
108 filtered = compile_state._has_mapper_entities
109 single_entity = (
110 not context.load_options._only_return_tuples
111 and len(compile_state._entities) == 1
112 and compile_state._entities[0].supports_single_entity
113 )
114
115 try:
116 (process, labels, extra) = list(
117 zip(
118 *[
119 query_entity.row_processor(context, cursor)
120 for query_entity in context.compile_state._entities
121 ]
122 )
123 )
124
125 if context.yield_per and (
126 context.loaders_require_buffering
127 or context.loaders_require_uniquing
128 ):
129 raise sa_exc.InvalidRequestError(
130 "Can't use yield_per with eager loaders that require uniquing "
131 "or row buffering, e.g. joinedload() against collections "
132 "or subqueryload(). Consider the selectinload() strategy "
133 "for better flexibility in loading objects."
134 )
135
136 except Exception:
137 with util.safe_reraise():
138 cursor.close()
139
140 def _no_unique(entry):
141 raise sa_exc.InvalidRequestError(
142 "Can't use the ORM yield_per feature in conjunction with unique()"
143 )
144
145 def _not_hashable(datatype, *, legacy=False, uncertain=False):
146 if not legacy:
147
148 def go(obj):
149 if uncertain:
150 try:
151 return hash(obj)
152 except:
153 pass
154
155 raise sa_exc.InvalidRequestError(
156 "Can't apply uniqueness to row tuple containing value of "
157 f"""type {datatype!r}; {
158 'the values returned appear to be'
159 if uncertain
160 else 'this datatype produces'
161 } non-hashable values"""
162 )
163
164 return go
165 elif not uncertain:
166 return id
167 else:
168 _use_id = False
169
170 def go(obj):
171 nonlocal _use_id
172
173 if not _use_id:
174 try:
175 return hash(obj)
176 except:
177 pass
178
179 # in #10459, we considered using a warning here, however
180 # as legacy query uses result.unique() in all cases, this
181 # would lead to too many warning cases.
182 _use_id = True
183
184 return id(obj)
185
186 return go
187
188 unique_filters = [
189 (
190 _no_unique
191 if context.yield_per
192 else (
193 _not_hashable(
194 ent.column.type, # type: ignore
195 legacy=context.load_options._legacy_uniquing,
196 uncertain=ent._null_column_type,
197 )
198 if (
199 not ent.use_id_for_hash
200 and (ent._non_hashable_value or ent._null_column_type)
201 )
202 else id if ent.use_id_for_hash else None
203 )
204 )
205 for ent in context.compile_state._entities
206 ]
207
208 row_metadata = SimpleResultMetaData(
209 labels, extra, _unique_filters=unique_filters
210 )
211
212 def chunks(size): # type: ignore
213 while True:
214 yield_per = size
215
216 context.partials = {}
217
218 if yield_per:
219 fetch = cursor.fetchmany(yield_per)
220
221 if not fetch:
222 break
223 else:
224 fetch = cursor._raw_all_rows()
225
226 if single_entity:
227 proc = process[0]
228 rows = [proc(row) for row in fetch]
229 else:
230 rows = [
231 tuple([proc(row) for proc in process]) for row in fetch
232 ]
233
234 # if we are the originating load from a query, meaning we
235 # aren't being called as a result of a nested "post load",
236 # iterate through all the collected post loaders and fire them
237 # off. Previously this used to work recursively, however that
238 # prevented deeply nested structures from being loadable
239 if is_top_level:
240 if yield_per:
241 # if using yield per, memoize the state of the
242 # collection so that it can be restored
243 top_level_post_loads = list(
244 context.post_load_paths.items()
245 )
246
247 while context.post_load_paths:
248 post_loads = list(context.post_load_paths.items())
249 context.post_load_paths.clear()
250 for path, post_load in post_loads:
251 post_load.invoke(context, path)
252
253 if yield_per:
254 context.post_load_paths.clear()
255 context.post_load_paths.update(top_level_post_loads)
256
257 yield rows
258
259 if not yield_per:
260 break
261
262 if context.execution_options.get("prebuffer_rows", False):
263 # this is a bit of a hack at the moment.
264 # I would rather have some option in the result to pre-buffer
265 # internally.
266 _prebuffered = list(chunks(None))
267
268 def chunks(size):
269 return iter(_prebuffered)
270
271 result = ChunkedIteratorResult(
272 row_metadata,
273 chunks,
274 source_supports_scalars=single_entity,
275 raw=cursor,
276 dynamic_yield_per=cursor.context._is_server_side,
277 )
278
279 # filtered and single_entity are used to indicate to legacy Query that the
280 # query has ORM entities, so legacy deduping and scalars should be called
281 # on the result.
282 result._attributes = result._attributes.union(
283 dict(filtered=filtered, is_single_entity=single_entity)
284 )
285
286 # multi_row_eager_loaders OTOH is specific to joinedload.
287 if context.compile_state.multi_row_eager_loaders:
288
289 def require_unique(obj):
290 raise sa_exc.InvalidRequestError(
291 "The unique() method must be invoked on this Result, "
292 "as it contains results that include joined eager loads "
293 "against collections"
294 )
295
296 result._unique_filter_state = (None, require_unique)
297
298 if context.yield_per:
299 result.yield_per(context.yield_per)
300
301 return result
302
303
304@util.preload_module("sqlalchemy.orm.context")
305def merge_frozen_result(session, statement, frozen_result, load=True):
306 """Merge a :class:`_engine.FrozenResult` back into a :class:`_orm.Session`,
307 returning a new :class:`_engine.Result` object with :term:`persistent`
308 objects.
309
310 See the section :ref:`do_orm_execute_re_executing` for an example.
311
312 .. seealso::
313
314 :ref:`do_orm_execute_re_executing`
315
316 :meth:`_engine.Result.freeze`
317
318 :class:`_engine.FrozenResult`
319
320 """
321 querycontext = util.preloaded.orm_context
322
323 if load:
324 # flush current contents if we expect to load data
325 session._autoflush()
326
327 ctx = querycontext._ORMSelectCompileState._create_entities_collection(
328 statement, legacy=False
329 )
330
331 with session.no_autoflush:
332 mapped_entities = [
333 i
334 for i, e in enumerate(ctx._entities)
335 if isinstance(e, querycontext._MapperEntity)
336 ]
337 keys = [ent._label_name for ent in ctx._entities]
338
339 keyed_tuple = result_tuple(
340 keys, [ent._extra_entities for ent in ctx._entities]
341 )
342
343 result = []
344 for newrow in frozen_result._rewrite_rows():
345 for i in mapped_entities:
346 if newrow[i] is not None:
347 newrow[i] = session._merge(
348 attributes.instance_state(newrow[i]),
349 attributes.instance_dict(newrow[i]),
350 load=load,
351 _recursive={},
352 _resolve_conflict_map={},
353 )
354
355 result.append(keyed_tuple(newrow))
356
357 return frozen_result.with_new_rows(result)
358
359
360@util.became_legacy_20(
361 ":func:`_orm.merge_result`",
362 alternative="The function as well as the method on :class:`_orm.Query` "
363 "is superseded by the :func:`_orm.merge_frozen_result` function.",
364)
365@util.preload_module("sqlalchemy.orm.context")
366def merge_result(
367 query: Query[Any],
368 iterator: Union[FrozenResult, Iterable[Sequence[Any]], Iterable[object]],
369 load: bool = True,
370) -> Union[FrozenResult, Iterable[Any]]:
371 """Merge a result into the given :class:`.Query` object's Session.
372
373 See :meth:`_orm.Query.merge_result` for top-level documentation on this
374 function.
375
376 """
377
378 querycontext = util.preloaded.orm_context
379
380 session = query.session
381 if load:
382 # flush current contents if we expect to load data
383 session._autoflush()
384
385 # TODO: need test coverage and documentation for the FrozenResult
386 # use case.
387 if isinstance(iterator, FrozenResult):
388 frozen_result = iterator
389 iterator = iter(frozen_result.data)
390 else:
391 frozen_result = None
392
393 ctx = querycontext._ORMSelectCompileState._create_entities_collection(
394 query, legacy=True
395 )
396
397 autoflush = session.autoflush
398 try:
399 session.autoflush = False
400 single_entity = not frozen_result and len(ctx._entities) == 1
401
402 if single_entity:
403 if isinstance(ctx._entities[0], querycontext._MapperEntity):
404 result = [
405 session._merge(
406 attributes.instance_state(instance),
407 attributes.instance_dict(instance),
408 load=load,
409 _recursive={},
410 _resolve_conflict_map={},
411 )
412 for instance in iterator
413 ]
414 else:
415 result = list(iterator)
416 else:
417 mapped_entities = [
418 i
419 for i, e in enumerate(ctx._entities)
420 if isinstance(e, querycontext._MapperEntity)
421 ]
422 result = []
423 keys = [ent._label_name for ent in ctx._entities]
424
425 keyed_tuple = result_tuple(
426 keys, [ent._extra_entities for ent in ctx._entities]
427 )
428
429 for row in iterator:
430 newrow = list(row)
431 for i in mapped_entities:
432 if newrow[i] is not None:
433 newrow[i] = session._merge(
434 attributes.instance_state(newrow[i]),
435 attributes.instance_dict(newrow[i]),
436 load=load,
437 _recursive={},
438 _resolve_conflict_map={},
439 )
440 result.append(keyed_tuple(newrow))
441
442 if frozen_result:
443 return frozen_result.with_new_rows(result)
444 else:
445 return iter(result)
446 finally:
447 session.autoflush = autoflush
448
449
450def get_from_identity(
451 session: Session,
452 mapper: Mapper[_O],
453 key: _IdentityKeyType[_O],
454 passive: PassiveFlag,
455) -> Union[LoaderCallableStatus, Optional[_O]]:
456 """Look up the given key in the given session's identity map,
457 check the object for expired state if found.
458
459 """
460 instance = session.identity_map.get(key)
461 if instance is not None:
462 state = attributes.instance_state(instance)
463
464 if mapper.inherits and not state.mapper.isa(mapper):
465 return attributes.PASSIVE_CLASS_MISMATCH
466
467 # expired - ensure it still exists
468 if state.expired:
469 if not passive & attributes.SQL_OK:
470 # TODO: no coverage here
471 return attributes.PASSIVE_NO_RESULT
472 elif not passive & attributes.RELATED_OBJECT_OK:
473 # this mode is used within a flush and the instance's
474 # expired state will be checked soon enough, if necessary.
475 # also used by immediateloader for a mutually-dependent
476 # o2m->m2m load, :ticket:`6301`
477 return instance
478 try:
479 state._load_expired(state, passive)
480 except orm_exc.ObjectDeletedError:
481 session._remove_newly_deleted([state])
482 return None
483 return instance
484 else:
485 return None
486
487
488def _load_on_ident(
489 session: Session,
490 statement: Union[Select, FromStatement],
491 key: Optional[_IdentityKeyType],
492 *,
493 load_options: Optional[Sequence[ORMOption]] = None,
494 refresh_state: Optional[InstanceState[Any]] = None,
495 with_for_update: Optional[ForUpdateArg] = None,
496 only_load_props: Optional[Iterable[str]] = None,
497 no_autoflush: bool = False,
498 bind_arguments: Mapping[str, Any] = util.EMPTY_DICT,
499 execution_options: _ExecuteOptions = util.EMPTY_DICT,
500 require_pk_cols: bool = False,
501 is_user_refresh: bool = False,
502):
503 """Load the given identity key from the database."""
504 if key is not None:
505 ident = key[1]
506 identity_token = key[2]
507 else:
508 ident = identity_token = None
509
510 return _load_on_pk_identity(
511 session,
512 statement,
513 ident,
514 load_options=load_options,
515 refresh_state=refresh_state,
516 with_for_update=with_for_update,
517 only_load_props=only_load_props,
518 identity_token=identity_token,
519 no_autoflush=no_autoflush,
520 bind_arguments=bind_arguments,
521 execution_options=execution_options,
522 require_pk_cols=require_pk_cols,
523 is_user_refresh=is_user_refresh,
524 )
525
526
527def _load_on_pk_identity(
528 session: Session,
529 statement: Union[Select, FromStatement],
530 primary_key_identity: Optional[Tuple[Any, ...]],
531 *,
532 load_options: Optional[Sequence[ORMOption]] = None,
533 refresh_state: Optional[InstanceState[Any]] = None,
534 with_for_update: Optional[ForUpdateArg] = None,
535 only_load_props: Optional[Iterable[str]] = None,
536 identity_token: Optional[Any] = None,
537 no_autoflush: bool = False,
538 bind_arguments: Mapping[str, Any] = util.EMPTY_DICT,
539 execution_options: _ExecuteOptions = util.EMPTY_DICT,
540 require_pk_cols: bool = False,
541 is_user_refresh: bool = False,
542):
543 """Load the given primary key identity from the database."""
544
545 query = statement
546 q = query._clone()
547
548 assert not q._is_lambda_element
549
550 if load_options is None:
551 load_options = QueryContext.default_load_options
552
553 if (
554 statement._compile_options
555 is SelectState.default_select_compile_options
556 ):
557 compile_options = _ORMCompileState.default_compile_options
558 else:
559 compile_options = statement._compile_options
560
561 if primary_key_identity is not None:
562 mapper = query._propagate_attrs["plugin_subject"]
563
564 (_get_clause, _get_params) = mapper._get_clause
565
566 # None present in ident - turn those comparisons
567 # into "IS NULL"
568 if None in primary_key_identity:
569 nones = {
570 _get_params[col].key
571 for col, value in zip(mapper.primary_key, primary_key_identity)
572 if value is None
573 }
574
575 _get_clause = sql_util.adapt_criterion_to_null(_get_clause, nones)
576
577 if len(nones) == len(primary_key_identity):
578 util.warn(
579 "fully NULL primary key identity cannot load any "
580 "object. This condition may raise an error in a future "
581 "release."
582 )
583
584 q._where_criteria = (
585 sql_util._deep_annotate(_get_clause, {"_orm_adapt": True}),
586 )
587
588 params = {
589 _get_params[primary_key].key: id_val
590 for id_val, primary_key in zip(
591 primary_key_identity, mapper.primary_key
592 )
593 }
594 else:
595 params = None
596
597 if with_for_update is not None:
598 version_check = True
599 q._for_update_arg = with_for_update
600 elif query._for_update_arg is not None:
601 version_check = True
602 q._for_update_arg = query._for_update_arg
603 else:
604 version_check = False
605
606 if require_pk_cols and only_load_props:
607 if not refresh_state:
608 raise sa_exc.ArgumentError(
609 "refresh_state is required when require_pk_cols is present"
610 )
611
612 refresh_state_prokeys = refresh_state.mapper._primary_key_propkeys
613 has_changes = {
614 key
615 for key in refresh_state_prokeys.difference(only_load_props)
616 if refresh_state.attrs[key].history.has_changes()
617 }
618 if has_changes:
619 # raise if pending pk changes are present.
620 # technically, this could be limited to the case where we have
621 # relationships in the only_load_props collection to be refreshed
622 # also (and only ones that have a secondary eager loader, at that).
623 # however, the error is in place across the board so that behavior
624 # here is easier to predict. The use case it prevents is one
625 # of mutating PK attrs, leaving them unflushed,
626 # calling session.refresh(), and expecting those attrs to remain
627 # still unflushed. It seems likely someone doing all those
628 # things would be better off having the PK attributes flushed
629 # to the database before tinkering like that (session.refresh() is
630 # tinkering).
631 raise sa_exc.InvalidRequestError(
632 f"Please flush pending primary key changes on "
633 "attributes "
634 f"{has_changes} for mapper {refresh_state.mapper} before "
635 "proceeding with a refresh"
636 )
637
638 # overall, the ORM has no internal flow right now for "dont load the
639 # primary row of an object at all, but fire off
640 # selectinload/subqueryload/immediateload for some relationships".
641 # It would probably be a pretty big effort to add such a flow. So
642 # here, the case for #8703 is introduced; user asks to refresh some
643 # relationship attributes only which are
644 # selectinload/subqueryload/immediateload/ etc. (not joinedload).
645 # ORM complains there's no columns in the primary row to load.
646 # So here, we just add the PK cols if that
647 # case is detected, so that there is a SELECT emitted for the primary
648 # row.
649 #
650 # Let's just state right up front, for this one little case,
651 # the ORM here is adding a whole extra SELECT just to satisfy
652 # limitations in the internal flow. This is really not a thing
653 # SQLAlchemy finds itself doing like, ever, obviously, we are
654 # constantly working to *remove* SELECTs we don't need. We
655 # rationalize this for now based on 1. session.refresh() is not
656 # commonly used 2. session.refresh() with only relationship attrs is
657 # even less commonly used 3. the SELECT in question is very low
658 # latency.
659 #
660 # to add the flow to not include the SELECT, the quickest way
661 # might be to just manufacture a single-row result set to send off to
662 # instances(), but we'd have to weave that into context.py and all
663 # that. For 2.0.0, we have enough big changes to navigate for now.
664 #
665 mp = refresh_state.mapper._props
666 for p in only_load_props:
667 if mp[p]._is_relationship:
668 only_load_props = refresh_state_prokeys.union(only_load_props)
669 break
670
671 if refresh_state and refresh_state.load_options:
672 compile_options += {"_current_path": refresh_state.load_path.parent}
673 q = q.options(*refresh_state.load_options)
674
675 new_compile_options, load_options = _set_get_options(
676 compile_options,
677 load_options,
678 version_check=version_check,
679 only_load_props=only_load_props,
680 refresh_state=refresh_state,
681 identity_token=identity_token,
682 is_user_refresh=is_user_refresh,
683 )
684
685 q._compile_options = new_compile_options
686 q._order_by = None
687
688 if no_autoflush:
689 load_options += {"_autoflush": False}
690
691 execution_options = util.EMPTY_DICT.merge_with(
692 execution_options, {"_sa_orm_load_options": load_options}
693 )
694 result = (
695 session.execute(
696 q,
697 params=params,
698 execution_options=execution_options,
699 bind_arguments=bind_arguments,
700 )
701 .unique()
702 .scalars()
703 )
704
705 try:
706 return result.one()
707 except orm_exc.NoResultFound:
708 return None
709
710
711def _set_get_options(
712 compile_opt,
713 load_opt,
714 populate_existing=None,
715 version_check=None,
716 only_load_props=None,
717 refresh_state=None,
718 identity_token=None,
719 is_user_refresh=None,
720):
721 compile_options = {}
722 load_options = {}
723 if version_check:
724 load_options["_version_check"] = version_check
725 if populate_existing:
726 load_options["_populate_existing"] = populate_existing
727 if refresh_state:
728 load_options["_refresh_state"] = refresh_state
729 compile_options["_for_refresh_state"] = True
730 if only_load_props:
731 compile_options["_only_load_props"] = frozenset(only_load_props)
732 if identity_token:
733 load_options["_identity_token"] = identity_token
734
735 if is_user_refresh:
736 load_options["_is_user_refresh"] = is_user_refresh
737 if load_options:
738 load_opt += load_options
739 if compile_options:
740 compile_opt += compile_options
741
742 return compile_opt, load_opt
743
744
745def _setup_entity_query(
746 compile_state,
747 mapper,
748 query_entity,
749 path,
750 adapter,
751 column_collection,
752 with_polymorphic=None,
753 only_load_props=None,
754 polymorphic_discriminator=None,
755 **kw,
756):
757 if with_polymorphic:
758 poly_properties = mapper._iterate_polymorphic_properties(
759 with_polymorphic
760 )
761 else:
762 poly_properties = mapper._polymorphic_properties
763
764 quick_populators = {}
765
766 path.set(compile_state.attributes, "memoized_setups", quick_populators)
767
768 # for the lead entities in the path, e.g. not eager loads, and
769 # assuming a user-passed aliased class, e.g. not a from_self() or any
770 # implicit aliasing, don't add columns to the SELECT that aren't
771 # in the thing that's aliased.
772 check_for_adapt = adapter and len(path) == 1 and path[-1].is_aliased_class
773
774 for value in poly_properties:
775 if only_load_props and value.key not in only_load_props:
776 continue
777 value.setup(
778 compile_state,
779 query_entity,
780 path,
781 adapter,
782 only_load_props=only_load_props,
783 column_collection=column_collection,
784 memoized_populators=quick_populators,
785 check_for_adapt=check_for_adapt,
786 **kw,
787 )
788
789 if (
790 polymorphic_discriminator is not None
791 and polymorphic_discriminator is not mapper.polymorphic_on
792 ):
793 if adapter:
794 pd = adapter.columns[polymorphic_discriminator]
795 else:
796 pd = polymorphic_discriminator
797 column_collection.append(pd)
798
799
800def _warn_for_runid_changed(state):
801 util.warn(
802 "Loading context for %s has changed within a load/refresh "
803 "handler, suggesting a row refresh operation took place. If this "
804 "event handler is expected to be "
805 "emitting row refresh operations within an existing load or refresh "
806 "operation, set restore_load_context=True when establishing the "
807 "listener to ensure the context remains unchanged when the event "
808 "handler completes." % (state_str(state),)
809 )
810
811
812def _instance_processor(
813 query_entity,
814 mapper,
815 context,
816 result,
817 path,
818 adapter,
819 only_load_props=None,
820 refresh_state=None,
821 polymorphic_discriminator=None,
822 _polymorphic_from=None,
823):
824 """Produce a mapper level row processor callable
825 which processes rows into mapped instances."""
826
827 # note that this method, most of which exists in a closure
828 # called _instance(), resists being broken out, as
829 # attempts to do so tend to add significant function
830 # call overhead. _instance() is the most
831 # performance-critical section in the whole ORM.
832
833 identity_class = mapper._identity_class
834 compile_state = context.compile_state
835
836 # look for "row getter" functions that have been assigned along
837 # with the compile state that were cached from a previous load.
838 # these are operator.itemgetter() objects that each will extract a
839 # particular column from each row.
840
841 getter_key = ("getters", mapper)
842 getters = path.get(compile_state.attributes, getter_key, None)
843
844 if getters is None:
845 # no getters, so go through a list of attributes we are loading for,
846 # and the ones that are column based will have already put information
847 # for us in another collection "memoized_setups", which represents the
848 # output of the LoaderStrategy.setup_query() method. We can just as
849 # easily call LoaderStrategy.create_row_processor for each, but by
850 # getting it all at once from setup_query we save another method call
851 # per attribute.
852 props = mapper._prop_set
853 if only_load_props is not None:
854 props = props.intersection(
855 mapper._props[k] for k in only_load_props
856 )
857
858 quick_populators = path.get(
859 context.attributes, "memoized_setups", EMPTY_DICT
860 )
861
862 todo = []
863 cached_populators = {
864 "new": [],
865 "quick": [],
866 "deferred": [],
867 "expire": [],
868 "existing": [],
869 "eager": [],
870 }
871
872 if refresh_state is None:
873 # we can also get the "primary key" tuple getter function
874 pk_cols = mapper.primary_key
875
876 if adapter:
877 pk_cols = [adapter.columns[c] for c in pk_cols]
878 primary_key_getter = result._tuple_getter(pk_cols)
879 else:
880 primary_key_getter = None
881
882 getters = {
883 "cached_populators": cached_populators,
884 "todo": todo,
885 "primary_key_getter": primary_key_getter,
886 }
887 for prop in props:
888 if prop in quick_populators:
889 # this is an inlined path just for column-based attributes.
890 col = quick_populators[prop]
891 if col is _DEFER_FOR_STATE:
892 cached_populators["new"].append(
893 (prop.key, prop._deferred_column_loader)
894 )
895 elif col is _SET_DEFERRED_EXPIRED:
896 # note that in this path, we are no longer
897 # searching in the result to see if the column might
898 # be present in some unexpected way.
899 cached_populators["expire"].append((prop.key, False))
900 elif col is _RAISE_FOR_STATE:
901 cached_populators["new"].append(
902 (prop.key, prop._raise_column_loader)
903 )
904 else:
905 getter = None
906 if adapter:
907 # this logic had been removed for all 1.4 releases
908 # up until 1.4.18; the adapter here is particularly
909 # the compound eager adapter which isn't accommodated
910 # in the quick_populators right now. The "fallback"
911 # logic below instead took over in many more cases
912 # until issue #6596 was identified.
913
914 # note there is still an issue where this codepath
915 # produces no "getter" for cases where a joined-inh
916 # mapping includes a labeled column property, meaning
917 # KeyError is caught internally and we fall back to
918 # _getter(col), which works anyway. The adapter
919 # here for joined inh without any aliasing might not
920 # be useful. Tests which see this include
921 # test.orm.inheritance.test_basic ->
922 # EagerTargetingTest.test_adapt_stringency
923 # OptimizedLoadTest.test_column_expression_joined
924 # PolymorphicOnNotLocalTest.test_polymorphic_on_column_prop # noqa: E501
925 #
926
927 adapted_col = adapter.columns[col]
928 if adapted_col is not None:
929 getter = result._getter(adapted_col, False)
930 if not getter:
931 getter = result._getter(col, False)
932 if getter:
933 cached_populators["quick"].append((prop.key, getter))
934 else:
935 # fall back to the ColumnProperty itself, which
936 # will iterate through all of its columns
937 # to see if one fits
938 prop.create_row_processor(
939 context,
940 query_entity,
941 path,
942 mapper,
943 result,
944 adapter,
945 cached_populators,
946 )
947 else:
948 # loader strategies like subqueryload, selectinload,
949 # joinedload, basically relationships, these need to interact
950 # with the context each time to work correctly.
951 todo.append(prop)
952
953 path.set(compile_state.attributes, getter_key, getters)
954
955 cached_populators = getters["cached_populators"]
956
957 populators = {key: list(value) for key, value in cached_populators.items()}
958 for prop in getters["todo"]:
959 prop.create_row_processor(
960 context, query_entity, path, mapper, result, adapter, populators
961 )
962
963 propagated_loader_options = context.propagated_loader_options
964 load_path = (
965 context.compile_state.current_path + path
966 if context.compile_state.current_path.path
967 else path
968 )
969
970 session_identity_map = context.session.identity_map
971
972 populate_existing = context.populate_existing or mapper.always_refresh
973 load_evt = bool(mapper.class_manager.dispatch.load)
974 refresh_evt = bool(mapper.class_manager.dispatch.refresh)
975 persistent_evt = bool(context.session.dispatch.loaded_as_persistent)
976 if persistent_evt:
977 loaded_as_persistent = context.session.dispatch.loaded_as_persistent
978 instance_state = attributes.instance_state
979 instance_dict = attributes.instance_dict
980 session_id = context.session.hash_key
981 runid = context.runid
982 identity_token = context.identity_token
983
984 version_check = context.version_check
985 if version_check:
986 version_id_col = mapper.version_id_col
987 if version_id_col is not None:
988 if adapter:
989 version_id_col = adapter.columns[version_id_col]
990 version_id_getter = result._getter(version_id_col)
991 else:
992 version_id_getter = None
993
994 if not refresh_state and _polymorphic_from is not None:
995 key = ("loader", path.path)
996
997 if key in context.attributes and context.attributes[key].strategy == (
998 ("selectinload_polymorphic", True),
999 ):
1000 option_entities = context.attributes[key].local_opts["entities"]
1001 else:
1002 option_entities = None
1003 selectin_load_via = mapper._should_selectin_load(
1004 option_entities,
1005 _polymorphic_from,
1006 )
1007
1008 if selectin_load_via and selectin_load_via is not _polymorphic_from:
1009 # only_load_props goes w/ refresh_state only, and in a refresh
1010 # we are a single row query for the exact entity; polymorphic
1011 # loading does not apply
1012 assert only_load_props is None
1013
1014 if selectin_load_via.is_mapper:
1015 _load_supers = []
1016 _endmost_mapper = selectin_load_via
1017 while (
1018 _endmost_mapper
1019 and _endmost_mapper is not _polymorphic_from
1020 ):
1021 _load_supers.append(_endmost_mapper)
1022 _endmost_mapper = _endmost_mapper.inherits
1023 else:
1024 _load_supers = [selectin_load_via]
1025
1026 for _selectinload_entity in _load_supers:
1027 if _PostLoad.path_exists(
1028 context, load_path, _selectinload_entity
1029 ):
1030 continue
1031 callable_ = _load_subclass_via_in(
1032 context,
1033 path,
1034 _selectinload_entity,
1035 _polymorphic_from,
1036 option_entities,
1037 )
1038 _PostLoad.callable_for_path(
1039 context,
1040 load_path,
1041 _selectinload_entity.mapper,
1042 _selectinload_entity,
1043 callable_,
1044 _selectinload_entity,
1045 )
1046
1047 post_load = _PostLoad.for_context(context, load_path, only_load_props)
1048
1049 if refresh_state:
1050 refresh_identity_key = refresh_state.key
1051 if refresh_identity_key is None:
1052 # super-rare condition; a refresh is being called
1053 # on a non-instance-key instance; this is meant to only
1054 # occur within a flush()
1055 refresh_identity_key = mapper._identity_key_from_state(
1056 refresh_state
1057 )
1058 else:
1059 refresh_identity_key = None
1060
1061 primary_key_getter = getters["primary_key_getter"]
1062
1063 if mapper.allow_partial_pks:
1064 is_not_primary_key = _none_set.issuperset
1065 else:
1066 is_not_primary_key = _none_set.intersection
1067
1068 def _instance(row):
1069 # determine the state that we'll be populating
1070 if refresh_identity_key:
1071 # fixed state that we're refreshing
1072 state = refresh_state
1073 instance = state.obj()
1074 dict_ = instance_dict(instance)
1075 isnew = state.runid != runid
1076 currentload = True
1077 loaded_instance = False
1078 else:
1079 # look at the row, see if that identity is in the
1080 # session, or we have to create a new one
1081 identitykey = (
1082 identity_class,
1083 primary_key_getter(row),
1084 identity_token,
1085 )
1086
1087 instance = session_identity_map.get(identitykey)
1088
1089 if instance is not None:
1090 # existing instance
1091 state = instance_state(instance)
1092 dict_ = instance_dict(instance)
1093
1094 isnew = state.runid != runid
1095 currentload = not isnew
1096 loaded_instance = False
1097
1098 if version_check and version_id_getter and not currentload:
1099 _validate_version_id(
1100 mapper, state, dict_, row, version_id_getter
1101 )
1102
1103 else:
1104 # create a new instance
1105
1106 # check for non-NULL values in the primary key columns,
1107 # else no entity is returned for the row
1108 if is_not_primary_key(identitykey[1]):
1109 return None
1110
1111 isnew = True
1112 currentload = True
1113 loaded_instance = True
1114
1115 instance = mapper.class_manager.new_instance()
1116
1117 dict_ = instance_dict(instance)
1118 state = instance_state(instance)
1119 state.key = identitykey
1120 state.identity_token = identity_token
1121
1122 # attach instance to session.
1123 state.session_id = session_id
1124 session_identity_map._add_unpresent(state, identitykey)
1125
1126 effective_populate_existing = populate_existing
1127 if refresh_state is state:
1128 effective_populate_existing = True
1129
1130 # populate. this looks at whether this state is new
1131 # for this load or was existing, and whether or not this
1132 # row is the first row with this identity.
1133 if currentload or effective_populate_existing:
1134 # full population routines. Objects here are either
1135 # just created, or we are doing a populate_existing
1136
1137 # be conservative about setting load_path when populate_existing
1138 # is in effect; want to maintain options from the original
1139 # load. see test_expire->test_refresh_maintains_deferred_options
1140 if isnew and (
1141 propagated_loader_options or not effective_populate_existing
1142 ):
1143 state.load_options = propagated_loader_options
1144 state.load_path = load_path
1145
1146 _populate_full(
1147 context,
1148 row,
1149 state,
1150 dict_,
1151 isnew,
1152 load_path,
1153 loaded_instance,
1154 effective_populate_existing,
1155 populators,
1156 )
1157
1158 if isnew:
1159 # state.runid should be equal to context.runid / runid
1160 # here, however for event checks we are being more conservative
1161 # and checking against existing run id
1162 # assert state.runid == runid
1163
1164 existing_runid = state.runid
1165
1166 if loaded_instance:
1167 if load_evt:
1168 state.manager.dispatch.load(state, context)
1169 if state.runid != existing_runid:
1170 _warn_for_runid_changed(state)
1171 if persistent_evt:
1172 loaded_as_persistent(context.session, state)
1173 if state.runid != existing_runid:
1174 _warn_for_runid_changed(state)
1175 elif refresh_evt:
1176 state.manager.dispatch.refresh(
1177 state, context, only_load_props
1178 )
1179 if state.runid != runid:
1180 _warn_for_runid_changed(state)
1181
1182 if effective_populate_existing or state.modified:
1183 if refresh_state and only_load_props:
1184 state._commit(dict_, only_load_props)
1185 else:
1186 state._commit_all(dict_, session_identity_map)
1187
1188 if post_load:
1189 post_load.add_state(state, True)
1190
1191 else:
1192 # partial population routines, for objects that were already
1193 # in the Session, but a row matches them; apply eager loaders
1194 # on existing objects, etc.
1195 unloaded = state.unloaded
1196 isnew = state not in context.partials
1197
1198 if not isnew or unloaded or populators["eager"]:
1199 # state is having a partial set of its attributes
1200 # refreshed. Populate those attributes,
1201 # and add to the "context.partials" collection.
1202
1203 to_load = _populate_partial(
1204 context,
1205 row,
1206 state,
1207 dict_,
1208 isnew,
1209 load_path,
1210 unloaded,
1211 populators,
1212 )
1213
1214 if isnew:
1215 if refresh_evt:
1216 existing_runid = state.runid
1217 state.manager.dispatch.refresh(state, context, to_load)
1218 if state.runid != existing_runid:
1219 _warn_for_runid_changed(state)
1220
1221 state._commit(dict_, to_load)
1222
1223 if post_load and context.invoke_all_eagers:
1224 post_load.add_state(state, False)
1225
1226 return instance
1227
1228 if mapper.polymorphic_map and not _polymorphic_from and not refresh_state:
1229 # if we are doing polymorphic, dispatch to a different _instance()
1230 # method specific to the subclass mapper
1231 def ensure_no_pk(row):
1232 identitykey = (
1233 identity_class,
1234 primary_key_getter(row),
1235 identity_token,
1236 )
1237 if not is_not_primary_key(identitykey[1]):
1238 return identitykey
1239 else:
1240 return None
1241
1242 _instance = _decorate_polymorphic_switch(
1243 _instance,
1244 context,
1245 query_entity,
1246 mapper,
1247 result,
1248 path,
1249 polymorphic_discriminator,
1250 adapter,
1251 ensure_no_pk,
1252 )
1253
1254 return _instance
1255
1256
1257def _load_subclass_via_in(
1258 context, path, entity, polymorphic_from, option_entities
1259):
1260 mapper = entity.mapper
1261
1262 # TODO: polymorphic_from seems to be a Mapper in all cases.
1263 # this is likely not needed, but as we dont have typing in loading.py
1264 # yet, err on the safe side
1265 polymorphic_from_mapper = polymorphic_from.mapper
1266 not_against_basemost = polymorphic_from_mapper.inherits is not None
1267
1268 zero_idx = len(mapper.base_mapper.primary_key) == 1
1269
1270 if entity.is_aliased_class or not_against_basemost:
1271 q, enable_opt, disable_opt = mapper._subclass_load_via_in(
1272 entity, polymorphic_from
1273 )
1274 else:
1275 q, enable_opt, disable_opt = mapper._subclass_load_via_in_mapper
1276
1277 def do_load(context, path, states, load_only, effective_entity):
1278 if not option_entities:
1279 # filter out states for those that would have selectinloaded
1280 # from another loader
1281 # TODO: we are currently ignoring the case where the
1282 # "selectin_polymorphic" option is used, as this is much more
1283 # complex / specific / very uncommon API use
1284 states = [
1285 (s, v)
1286 for s, v in states
1287 if s.mapper._would_selectin_load_only_from_given_mapper(mapper)
1288 ]
1289
1290 if not states:
1291 return
1292
1293 orig_query = context.query
1294
1295 if path.parent:
1296 enable_opt_lcl = enable_opt._prepend_path(path)
1297 disable_opt_lcl = disable_opt._prepend_path(path)
1298 else:
1299 enable_opt_lcl = enable_opt
1300 disable_opt_lcl = disable_opt
1301 options = (
1302 (enable_opt_lcl,) + orig_query._with_options + (disable_opt_lcl,)
1303 )
1304
1305 q2 = q.options(*options)
1306
1307 q2._compile_options = context.compile_state.default_compile_options
1308 q2._compile_options += {"_current_path": path.parent}
1309
1310 if context.populate_existing:
1311 q2 = q2.execution_options(populate_existing=True)
1312
1313 while states:
1314 chunk = states[0 : _SelectInLoader._chunksize]
1315 states = states[_SelectInLoader._chunksize :]
1316 context.session.execute(
1317 q2,
1318 dict(
1319 primary_keys=[
1320 state.key[1][0] if zero_idx else state.key[1]
1321 for state, load_attrs in chunk
1322 ]
1323 ),
1324 ).unique().scalars().all()
1325
1326 return do_load
1327
1328
1329def _populate_full(
1330 context,
1331 row,
1332 state,
1333 dict_,
1334 isnew,
1335 load_path,
1336 loaded_instance,
1337 populate_existing,
1338 populators,
1339):
1340 if isnew:
1341 # first time we are seeing a row with this identity.
1342 state.runid = context.runid
1343
1344 for key, getter in populators["quick"]:
1345 dict_[key] = getter(row)
1346 if populate_existing:
1347 for key, set_callable in populators["expire"]:
1348 dict_.pop(key, None)
1349 if set_callable:
1350 state.expired_attributes.add(key)
1351 else:
1352 for key, set_callable in populators["expire"]:
1353 if set_callable:
1354 state.expired_attributes.add(key)
1355
1356 for key, populator in populators["new"]:
1357 populator(state, dict_, row)
1358
1359 elif load_path != state.load_path:
1360 # new load path, e.g. object is present in more than one
1361 # column position in a series of rows
1362 state.load_path = load_path
1363
1364 # if we have data, and the data isn't in the dict, OK, let's put
1365 # it in.
1366 for key, getter in populators["quick"]:
1367 if key not in dict_:
1368 dict_[key] = getter(row)
1369
1370 # otherwise treat like an "already seen" row
1371 for key, populator in populators["existing"]:
1372 populator(state, dict_, row)
1373 # TODO: allow "existing" populator to know this is
1374 # a new path for the state:
1375 # populator(state, dict_, row, new_path=True)
1376
1377 else:
1378 # have already seen rows with this identity in this same path.
1379 for key, populator in populators["existing"]:
1380 populator(state, dict_, row)
1381
1382 # TODO: same path
1383 # populator(state, dict_, row, new_path=False)
1384
1385
1386def _populate_partial(
1387 context, row, state, dict_, isnew, load_path, unloaded, populators
1388):
1389 if not isnew:
1390 if unloaded:
1391 # extra pass, see #8166
1392 for key, getter in populators["quick"]:
1393 if key in unloaded:
1394 dict_[key] = getter(row)
1395
1396 to_load = context.partials[state]
1397 for key, populator in populators["existing"]:
1398 if key in to_load:
1399 populator(state, dict_, row)
1400 else:
1401 to_load = unloaded
1402 context.partials[state] = to_load
1403
1404 for key, getter in populators["quick"]:
1405 if key in to_load:
1406 dict_[key] = getter(row)
1407 for key, set_callable in populators["expire"]:
1408 if key in to_load:
1409 dict_.pop(key, None)
1410 if set_callable:
1411 state.expired_attributes.add(key)
1412 for key, populator in populators["new"]:
1413 if key in to_load:
1414 populator(state, dict_, row)
1415
1416 for key, populator in populators["eager"]:
1417 if key not in unloaded:
1418 populator(state, dict_, row)
1419
1420 return to_load
1421
1422
1423def _validate_version_id(mapper, state, dict_, row, getter):
1424 if mapper._get_state_attr_by_column(
1425 state, dict_, mapper.version_id_col
1426 ) != getter(row):
1427 raise orm_exc.StaleDataError(
1428 "Instance '%s' has version id '%s' which "
1429 "does not match database-loaded version id '%s'."
1430 % (
1431 state_str(state),
1432 mapper._get_state_attr_by_column(
1433 state, dict_, mapper.version_id_col
1434 ),
1435 getter(row),
1436 )
1437 )
1438
1439
1440def _decorate_polymorphic_switch(
1441 instance_fn,
1442 context,
1443 query_entity,
1444 mapper,
1445 result,
1446 path,
1447 polymorphic_discriminator,
1448 adapter,
1449 ensure_no_pk,
1450):
1451 if polymorphic_discriminator is not None:
1452 polymorphic_on = polymorphic_discriminator
1453 else:
1454 polymorphic_on = mapper.polymorphic_on
1455 if polymorphic_on is None:
1456 return instance_fn
1457
1458 if adapter:
1459 polymorphic_on = adapter.columns[polymorphic_on]
1460
1461 def configure_subclass_mapper(discriminator):
1462 try:
1463 sub_mapper = mapper.polymorphic_map[discriminator]
1464 except KeyError:
1465 raise AssertionError(
1466 "No such polymorphic_identity %r is defined" % discriminator
1467 )
1468 else:
1469 if sub_mapper is mapper:
1470 return None
1471 elif not sub_mapper.isa(mapper):
1472 return False
1473
1474 return _instance_processor(
1475 query_entity,
1476 sub_mapper,
1477 context,
1478 result,
1479 path,
1480 adapter,
1481 _polymorphic_from=mapper,
1482 )
1483
1484 polymorphic_instances = util.PopulateDict(configure_subclass_mapper)
1485
1486 getter = result._getter(polymorphic_on)
1487
1488 def polymorphic_instance(row):
1489 discriminator = getter(row)
1490 if discriminator is not None:
1491 _instance = polymorphic_instances[discriminator]
1492 if _instance:
1493 return _instance(row)
1494 elif _instance is False:
1495 identitykey = ensure_no_pk(row)
1496
1497 if identitykey:
1498 raise sa_exc.InvalidRequestError(
1499 "Row with identity key %s can't be loaded into an "
1500 "object; the polymorphic discriminator column '%s' "
1501 "refers to %s, which is not a sub-mapper of "
1502 "the requested %s"
1503 % (
1504 identitykey,
1505 polymorphic_on,
1506 mapper.polymorphic_map[discriminator],
1507 mapper,
1508 )
1509 )
1510 else:
1511 return None
1512 else:
1513 return instance_fn(row)
1514 else:
1515 identitykey = ensure_no_pk(row)
1516
1517 if identitykey:
1518 raise sa_exc.InvalidRequestError(
1519 "Row with identity key %s can't be loaded into an "
1520 "object; the polymorphic discriminator column '%s' is "
1521 "NULL" % (identitykey, polymorphic_on)
1522 )
1523 else:
1524 return None
1525
1526 return polymorphic_instance
1527
1528
1529class _PostLoad:
1530 """Track loaders and states for "post load" operations."""
1531
1532 __slots__ = "loaders", "states", "load_keys"
1533
1534 def __init__(self):
1535 self.loaders = {}
1536 self.states = util.OrderedDict()
1537 self.load_keys = None
1538
1539 def add_state(self, state, overwrite):
1540 # the states for a polymorphic load here are all shared
1541 # within a single PostLoad object among multiple subtypes.
1542 # Filtering of callables on a per-subclass basis needs to be done at
1543 # the invocation level
1544 self.states[state] = overwrite
1545
1546 def invoke(self, context, path):
1547 if not self.states:
1548 return
1549 path = path_registry.PathRegistry.coerce(path)
1550 for (
1551 effective_context,
1552 token,
1553 limit_to_mapper,
1554 loader,
1555 arg,
1556 kw,
1557 ) in self.loaders.values():
1558 states = [
1559 (state, overwrite)
1560 for state, overwrite in self.states.items()
1561 if state.manager.mapper.isa(limit_to_mapper)
1562 ]
1563 if states:
1564 loader(
1565 effective_context, path, states, self.load_keys, *arg, **kw
1566 )
1567 self.states.clear()
1568
1569 @classmethod
1570 def for_context(cls, context, path, only_load_props):
1571 pl = context.post_load_paths.get(path.path)
1572 if pl is not None and only_load_props:
1573 pl.load_keys = only_load_props
1574 return pl
1575
1576 @classmethod
1577 def path_exists(self, context, path, key):
1578 return (
1579 path.path in context.post_load_paths
1580 and key in context.post_load_paths[path.path].loaders
1581 )
1582
1583 @classmethod
1584 def callable_for_path(
1585 cls, context, path, limit_to_mapper, token, loader_callable, *arg, **kw
1586 ):
1587 if path.path in context.post_load_paths:
1588 pl = context.post_load_paths[path.path]
1589 else:
1590 pl = context.post_load_paths[path.path] = _PostLoad()
1591 pl.loaders[token] = (
1592 context,
1593 token,
1594 limit_to_mapper,
1595 loader_callable,
1596 arg,
1597 kw,
1598 )
1599
1600
1601def _load_scalar_attributes(mapper, state, attribute_names, passive):
1602 """initiate a column-based attribute refresh operation."""
1603
1604 # assert mapper is _state_mapper(state)
1605 session = state.session
1606 if not session:
1607 raise orm_exc.DetachedInstanceError(
1608 "Instance %s is not bound to a Session; "
1609 "attribute refresh operation cannot proceed" % (state_str(state))
1610 )
1611
1612 no_autoflush = bool(passive & attributes.NO_AUTOFLUSH)
1613
1614 # in the case of inheritance, particularly concrete and abstract
1615 # concrete inheritance, the class manager might have some keys
1616 # of attributes on the superclass that we didn't actually map.
1617 # These could be mapped as "concrete, don't load" or could be completely
1618 # excluded from the mapping and we know nothing about them. Filter them
1619 # here to prevent them from coming through.
1620 if attribute_names:
1621 attribute_names = attribute_names.intersection(mapper.attrs.keys())
1622
1623 if mapper.inherits and not mapper.concrete:
1624 # load based on committed attributes in the object, formed into
1625 # a truncated SELECT that only includes relevant tables. does not
1626 # currently use state.key
1627 statement = mapper._optimized_get_statement(state, attribute_names)
1628 if statement is not None:
1629 # undefer() isn't needed here because statement has the
1630 # columns needed already, this implicitly undefers that column
1631 stmt = FromStatement(mapper, statement)
1632
1633 return _load_on_ident(
1634 session,
1635 stmt,
1636 None,
1637 only_load_props=attribute_names,
1638 refresh_state=state,
1639 no_autoflush=no_autoflush,
1640 )
1641
1642 # normal load, use state.key as the identity to SELECT
1643 has_key = bool(state.key)
1644
1645 if has_key:
1646 identity_key = state.key
1647 else:
1648 # this codepath is rare - only valid when inside a flush, and the
1649 # object is becoming persistent but hasn't yet been assigned
1650 # an identity_key.
1651 # check here to ensure we have the attrs we need.
1652 pk_attrs = [
1653 mapper._columntoproperty[col].key for col in mapper.primary_key
1654 ]
1655 if state.expired_attributes.intersection(pk_attrs):
1656 raise sa_exc.InvalidRequestError(
1657 "Instance %s cannot be refreshed - it's not "
1658 " persistent and does not "
1659 "contain a full primary key." % state_str(state)
1660 )
1661 identity_key = mapper._identity_key_from_state(state)
1662
1663 if (
1664 _none_set.issubset(identity_key) and not mapper.allow_partial_pks
1665 ) or _none_set.issuperset(identity_key):
1666 util.warn_limited(
1667 "Instance %s to be refreshed doesn't "
1668 "contain a full primary key - can't be refreshed "
1669 "(and shouldn't be expired, either).",
1670 state_str(state),
1671 )
1672 return
1673
1674 result = _load_on_ident(
1675 session,
1676 select(mapper).set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL),
1677 identity_key,
1678 refresh_state=state,
1679 only_load_props=attribute_names,
1680 no_autoflush=no_autoflush,
1681 )
1682
1683 # if instance is pending, a refresh operation
1684 # may not complete (even if PK attributes are assigned)
1685 if has_key and result is None:
1686 raise orm_exc.ObjectDeletedError(state)