1# orm/loading.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: ignore-errors
8
9
10"""private module containing functions used to convert database
11rows into object instances and associated state.
12
13the functions here are called primarily by Query, Mapper,
14as well as some of the attribute loading strategies.
15
16"""
17
18from __future__ import annotations
19
20from typing import Any
21from typing import Dict
22from typing import Iterable
23from typing import List
24from typing import Mapping
25from typing import Optional
26from typing import Sequence
27from typing import Tuple
28from typing import TYPE_CHECKING
29from typing import TypeVar
30from typing import Union
31
32from . import attributes
33from . import exc as orm_exc
34from . import path_registry
35from .base import _DEFER_FOR_STATE
36from .base import _RAISE_FOR_STATE
37from .base import _SET_DEFERRED_EXPIRED
38from .base import PassiveFlag
39from .context import _ORMCompileState
40from .context import FromStatement
41from .context import QueryContext
42from .strategies import _SelectInLoader
43from .util import _none_set
44from .util import state_str
45from .. import exc as sa_exc
46from .. import util
47from ..engine import result_tuple
48from ..engine.result import ChunkedIteratorResult
49from ..engine.result import FrozenResult
50from ..engine.result import SimpleResultMetaData
51from ..sql import select
52from ..sql import util as sql_util
53from ..sql.selectable import ForUpdateArg
54from ..sql.selectable import LABEL_STYLE_TABLENAME_PLUS_COL
55from ..sql.selectable import SelectState
56from ..util import EMPTY_DICT
57from ..util.typing import TupleAny
58from ..util.typing import Unpack
59
60if TYPE_CHECKING:
61 from ._typing import _IdentityKeyType
62 from .base import LoaderCallableStatus
63 from .interfaces import ORMOption
64 from .mapper import Mapper
65 from .query import Query
66 from .session import Session
67 from .state import InstanceState
68 from ..engine.cursor import CursorResult
69 from ..engine.interfaces import _ExecuteOptions
70 from ..engine.result import Result
71 from ..sql import Select
72
73_T = TypeVar("_T", bound=Any)
74_O = TypeVar("_O", bound=object)
75_new_runid = util.counter()
76
77
78_PopulatorDict = Dict[str, List[Tuple[str, Any]]]
79
80
81def instances(
82 cursor: CursorResult[Unpack[TupleAny]], context: QueryContext
83) -> Result[Unpack[TupleAny]]:
84 """Return a :class:`.Result` given an ORM query context.
85
86 :param cursor: a :class:`.CursorResult`, generated by a statement
87 which came from :class:`.ORMCompileState`
88
89 :param context: a :class:`.QueryContext` object
90
91 :return: a :class:`.Result` object representing ORM results
92
93 .. versionchanged:: 1.4 The instances() function now uses
94 :class:`.Result` objects and has an all new interface.
95
96 """
97
98 context.runid = _new_runid()
99
100 if context.top_level_context:
101 is_top_level = False
102 context.post_load_paths = context.top_level_context.post_load_paths
103 else:
104 is_top_level = True
105 context.post_load_paths = {}
106
107 compile_state = context.compile_state
108 filtered = compile_state._has_mapper_entities
109 single_entity = (
110 not context.load_options._only_return_tuples
111 and len(compile_state._entities) == 1
112 and compile_state._entities[0].supports_single_entity
113 )
114
115 try:
116 (process, labels, extra) = list(
117 zip(
118 *[
119 query_entity.row_processor(context, cursor)
120 for query_entity in context.compile_state._entities
121 ]
122 )
123 )
124
125 if context.yield_per and (
126 context.loaders_require_buffering
127 or context.loaders_require_uniquing
128 ):
129 raise sa_exc.InvalidRequestError(
130 "Can't use yield_per with eager loaders that require uniquing "
131 "or row buffering, e.g. joinedload() against collections "
132 "or subqueryload(). Consider the selectinload() strategy "
133 "for better flexibility in loading objects."
134 )
135
136 except Exception:
137 with util.safe_reraise():
138 cursor.close()
139
140 def _no_unique(entry):
141 raise sa_exc.InvalidRequestError(
142 "Can't use the ORM yield_per feature in conjunction with unique()"
143 )
144
145 def _not_hashable(datatype, *, legacy=False, uncertain=False):
146 if not legacy:
147
148 def go(obj):
149 if uncertain:
150 try:
151 return hash(obj)
152 except:
153 pass
154
155 raise sa_exc.InvalidRequestError(
156 "Can't apply uniqueness to row tuple containing value of "
157 f"""type {datatype!r}; {
158 'the values returned appear to be'
159 if uncertain
160 else 'this datatype produces'
161 } non-hashable values"""
162 )
163
164 return go
165 elif not uncertain:
166 return id
167 else:
168 _use_id = False
169
170 def go(obj):
171 nonlocal _use_id
172
173 if not _use_id:
174 try:
175 return hash(obj)
176 except:
177 pass
178
179 # in #10459, we considered using a warning here, however
180 # as legacy query uses result.unique() in all cases, this
181 # would lead to too many warning cases.
182 _use_id = True
183
184 return id(obj)
185
186 return go
187
188 unique_filters = [
189 (
190 _no_unique
191 if context.yield_per
192 else (
193 _not_hashable(
194 ent.column.type, # type: ignore
195 legacy=context.load_options._legacy_uniquing,
196 uncertain=ent._null_column_type,
197 )
198 if (
199 not ent.use_id_for_hash
200 and (ent._non_hashable_value or ent._null_column_type)
201 )
202 else id if ent.use_id_for_hash else None
203 )
204 )
205 for ent in context.compile_state._entities
206 ]
207
208 row_metadata = SimpleResultMetaData(
209 labels, extra, _unique_filters=unique_filters
210 )
211
212 def chunks(size): # type: ignore
213 while True:
214 yield_per = size
215
216 context.partials = {}
217
218 if yield_per:
219 fetch = cursor.fetchmany(yield_per)
220
221 if not fetch:
222 break
223 else:
224 fetch = cursor._raw_all_rows()
225
226 if single_entity:
227 proc = process[0]
228 rows = [proc(row) for row in fetch]
229 else:
230 rows = [
231 tuple([proc(row) for proc in process]) for row in fetch
232 ]
233
234 # if we are the originating load from a query, meaning we
235 # aren't being called as a result of a nested "post load",
236 # iterate through all the collected post loaders and fire them
237 # off. Previously this used to work recursively, however that
238 # prevented deeply nested structures from being loadable
239 if is_top_level:
240 if yield_per:
241 # if using yield per, memoize the state of the
242 # collection so that it can be restored
243 top_level_post_loads = list(
244 context.post_load_paths.items()
245 )
246
247 while context.post_load_paths:
248 post_loads = list(context.post_load_paths.items())
249 context.post_load_paths.clear()
250 for path, post_load in post_loads:
251 post_load.invoke(context, path)
252
253 if yield_per:
254 context.post_load_paths.clear()
255 context.post_load_paths.update(top_level_post_loads)
256
257 yield rows
258
259 if not yield_per:
260 break
261
262 if context.execution_options.get("prebuffer_rows", False):
263 # this is a bit of a hack at the moment.
264 # I would rather have some option in the result to pre-buffer
265 # internally.
266 _prebuffered = list(chunks(None))
267
268 def chunks(size):
269 return iter(_prebuffered)
270
271 result = ChunkedIteratorResult(
272 row_metadata,
273 chunks,
274 source_supports_scalars=single_entity,
275 raw=cursor,
276 dynamic_yield_per=cursor.context._is_server_side,
277 )
278
279 # filtered and single_entity are used to indicate to legacy Query that the
280 # query has ORM entities, so legacy deduping and scalars should be called
281 # on the result.
282 result._attributes = result._attributes.union(
283 dict(filtered=filtered, is_single_entity=single_entity)
284 )
285
286 # multi_row_eager_loaders OTOH is specific to joinedload.
287 if context.compile_state.multi_row_eager_loaders:
288
289 def require_unique(obj):
290 raise sa_exc.InvalidRequestError(
291 "The unique() method must be invoked on this Result, "
292 "as it contains results that include joined eager loads "
293 "against collections"
294 )
295
296 result._unique_filter_state = (None, require_unique)
297
298 if context.yield_per:
299 result.yield_per(context.yield_per)
300
301 return result
302
303
304@util.preload_module("sqlalchemy.orm.context")
305def merge_frozen_result(session, statement, frozen_result, load=True):
306 """Merge a :class:`_engine.FrozenResult` back into a :class:`_orm.Session`,
307 returning a new :class:`_engine.Result` object with :term:`persistent`
308 objects.
309
310 See the section :ref:`do_orm_execute_re_executing` for an example.
311
312 .. seealso::
313
314 :ref:`do_orm_execute_re_executing`
315
316 :meth:`_engine.Result.freeze`
317
318 :class:`_engine.FrozenResult`
319
320 """
321 querycontext = util.preloaded.orm_context
322
323 if load:
324 # flush current contents if we expect to load data
325 session._autoflush()
326
327 ctx = querycontext._ORMSelectCompileState._create_entities_collection(
328 statement, legacy=False
329 )
330
331 with session.no_autoflush:
332 mapped_entities = [
333 i
334 for i, e in enumerate(ctx._entities)
335 if isinstance(e, querycontext._MapperEntity)
336 ]
337 keys = [ent._label_name for ent in ctx._entities]
338
339 keyed_tuple = result_tuple(
340 keys, [ent._extra_entities for ent in ctx._entities]
341 )
342
343 result = []
344 for newrow in frozen_result._rewrite_rows():
345 for i in mapped_entities:
346 if newrow[i] is not None:
347 newrow[i] = session._merge(
348 attributes.instance_state(newrow[i]),
349 attributes.instance_dict(newrow[i]),
350 load=load,
351 _recursive={},
352 _resolve_conflict_map={},
353 )
354
355 result.append(keyed_tuple(newrow))
356
357 return frozen_result.with_new_rows(result)
358
359
360@util.became_legacy_20(
361 ":func:`_orm.merge_result`",
362 alternative="The function as well as the method on :class:`_orm.Query` "
363 "is superseded by the :func:`_orm.merge_frozen_result` function.",
364)
365@util.preload_module("sqlalchemy.orm.context")
366def merge_result(
367 query: Query[Any],
368 iterator: Union[FrozenResult, Iterable[Sequence[Any]], Iterable[object]],
369 load: bool = True,
370) -> Union[FrozenResult, Iterable[Any]]:
371 """Merge a result into the given :class:`.Query` object's Session.
372
373 See :meth:`_orm.Query.merge_result` for top-level documentation on this
374 function.
375
376 """
377
378 querycontext = util.preloaded.orm_context
379
380 session = query.session
381 if load:
382 # flush current contents if we expect to load data
383 session._autoflush()
384
385 # TODO: need test coverage and documentation for the FrozenResult
386 # use case.
387 if isinstance(iterator, FrozenResult):
388 frozen_result = iterator
389 iterator = iter(frozen_result.data)
390 else:
391 frozen_result = None
392
393 ctx = querycontext._ORMSelectCompileState._create_entities_collection(
394 query, legacy=True
395 )
396
397 autoflush = session.autoflush
398 try:
399 session.autoflush = False
400 single_entity = not frozen_result and len(ctx._entities) == 1
401
402 if single_entity:
403 if isinstance(ctx._entities[0], querycontext._MapperEntity):
404 result = [
405 session._merge(
406 attributes.instance_state(instance),
407 attributes.instance_dict(instance),
408 load=load,
409 _recursive={},
410 _resolve_conflict_map={},
411 )
412 for instance in iterator
413 ]
414 else:
415 result = list(iterator)
416 else:
417 mapped_entities = [
418 i
419 for i, e in enumerate(ctx._entities)
420 if isinstance(e, querycontext._MapperEntity)
421 ]
422 result = []
423 keys = [ent._label_name for ent in ctx._entities]
424
425 keyed_tuple = result_tuple(
426 keys, [ent._extra_entities for ent in ctx._entities]
427 )
428
429 for row in iterator:
430 newrow = list(row)
431 for i in mapped_entities:
432 if newrow[i] is not None:
433 newrow[i] = session._merge(
434 attributes.instance_state(newrow[i]),
435 attributes.instance_dict(newrow[i]),
436 load=load,
437 _recursive={},
438 _resolve_conflict_map={},
439 )
440 result.append(keyed_tuple(newrow))
441
442 if frozen_result:
443 return frozen_result.with_new_rows(result)
444 else:
445 return iter(result)
446 finally:
447 session.autoflush = autoflush
448
449
450def get_from_identity(
451 session: Session,
452 mapper: Mapper[_O],
453 key: _IdentityKeyType[_O],
454 passive: PassiveFlag,
455) -> Union[LoaderCallableStatus, Optional[_O]]:
456 """Look up the given key in the given session's identity map,
457 check the object for expired state if found.
458
459 """
460 instance = session.identity_map.get(key)
461 if instance is not None:
462 state = attributes.instance_state(instance)
463
464 if mapper.inherits and not state.mapper.isa(mapper):
465 return attributes.PASSIVE_CLASS_MISMATCH
466
467 # expired - ensure it still exists
468 if state.expired:
469 if not passive & attributes.SQL_OK:
470 # TODO: no coverage here
471 return attributes.PASSIVE_NO_RESULT
472 elif not passive & attributes.RELATED_OBJECT_OK:
473 # this mode is used within a flush and the instance's
474 # expired state will be checked soon enough, if necessary.
475 # also used by immediateloader for a mutually-dependent
476 # o2m->m2m load, :ticket:`6301`
477 return instance
478 try:
479 state._load_expired(state, passive)
480 except orm_exc.ObjectDeletedError:
481 session._remove_newly_deleted([state])
482 return None
483 return instance
484 else:
485 return None
486
487
488def _load_on_ident(
489 session: Session,
490 statement: Union[Select, FromStatement],
491 key: Optional[_IdentityKeyType],
492 *,
493 load_options: Optional[Sequence[ORMOption]] = None,
494 refresh_state: Optional[InstanceState[Any]] = None,
495 with_for_update: Optional[ForUpdateArg] = None,
496 only_load_props: Optional[Iterable[str]] = None,
497 no_autoflush: bool = False,
498 bind_arguments: Mapping[str, Any] = util.EMPTY_DICT,
499 execution_options: _ExecuteOptions = util.EMPTY_DICT,
500 require_pk_cols: bool = False,
501 is_user_refresh: bool = False,
502):
503 """Load the given identity key from the database."""
504 if key is not None:
505 ident = key[1]
506 identity_token = key[2]
507 else:
508 ident = identity_token = None
509
510 return _load_on_pk_identity(
511 session,
512 statement,
513 ident,
514 load_options=load_options,
515 refresh_state=refresh_state,
516 with_for_update=with_for_update,
517 only_load_props=only_load_props,
518 identity_token=identity_token,
519 no_autoflush=no_autoflush,
520 bind_arguments=bind_arguments,
521 execution_options=execution_options,
522 require_pk_cols=require_pk_cols,
523 is_user_refresh=is_user_refresh,
524 )
525
526
527def _load_on_pk_identity(
528 session: Session,
529 statement: Union[Select, FromStatement],
530 primary_key_identity: Optional[Tuple[Any, ...]],
531 *,
532 load_options: Optional[Sequence[ORMOption]] = None,
533 refresh_state: Optional[InstanceState[Any]] = None,
534 with_for_update: Optional[ForUpdateArg] = None,
535 only_load_props: Optional[Iterable[str]] = None,
536 identity_token: Optional[Any] = None,
537 no_autoflush: bool = False,
538 bind_arguments: Mapping[str, Any] = util.EMPTY_DICT,
539 execution_options: _ExecuteOptions = util.EMPTY_DICT,
540 require_pk_cols: bool = False,
541 is_user_refresh: bool = False,
542):
543 """Load the given primary key identity from the database."""
544
545 query = statement
546 q = query._clone()
547
548 assert not q._is_lambda_element
549
550 if load_options is None:
551 load_options = QueryContext.default_load_options
552
553 if (
554 statement._compile_options
555 is SelectState.default_select_compile_options
556 ):
557 compile_options = _ORMCompileState.default_compile_options
558 else:
559 compile_options = statement._compile_options
560
561 if primary_key_identity is not None:
562 mapper = query._propagate_attrs["plugin_subject"]
563
564 (_get_clause, _get_params) = mapper._get_clause
565
566 # None present in ident - turn those comparisons
567 # into "IS NULL"
568 if None in primary_key_identity:
569 nones = {
570 _get_params[col].key
571 for col, value in zip(mapper.primary_key, primary_key_identity)
572 if value is None
573 }
574
575 _get_clause = sql_util.adapt_criterion_to_null(_get_clause, nones)
576
577 if len(nones) == len(primary_key_identity):
578 util.warn(
579 "fully NULL primary key identity cannot load any "
580 "object. This condition may raise an error in a future "
581 "release."
582 )
583
584 q._where_criteria = (_get_clause,)
585
586 params = {
587 _get_params[primary_key].key: id_val
588 for id_val, primary_key in zip(
589 primary_key_identity, mapper.primary_key
590 )
591 }
592 else:
593 params = None
594
595 if with_for_update is not None:
596 version_check = True
597 q._for_update_arg = with_for_update
598 elif query._for_update_arg is not None:
599 version_check = True
600 q._for_update_arg = query._for_update_arg
601 else:
602 version_check = False
603
604 if require_pk_cols and only_load_props:
605 if not refresh_state:
606 raise sa_exc.ArgumentError(
607 "refresh_state is required when require_pk_cols is present"
608 )
609
610 refresh_state_prokeys = refresh_state.mapper._primary_key_propkeys
611 has_changes = {
612 key
613 for key in refresh_state_prokeys.difference(only_load_props)
614 if refresh_state.attrs[key].history.has_changes()
615 }
616 if has_changes:
617 # raise if pending pk changes are present.
618 # technically, this could be limited to the case where we have
619 # relationships in the only_load_props collection to be refreshed
620 # also (and only ones that have a secondary eager loader, at that).
621 # however, the error is in place across the board so that behavior
622 # here is easier to predict. The use case it prevents is one
623 # of mutating PK attrs, leaving them unflushed,
624 # calling session.refresh(), and expecting those attrs to remain
625 # still unflushed. It seems likely someone doing all those
626 # things would be better off having the PK attributes flushed
627 # to the database before tinkering like that (session.refresh() is
628 # tinkering).
629 raise sa_exc.InvalidRequestError(
630 f"Please flush pending primary key changes on "
631 "attributes "
632 f"{has_changes} for mapper {refresh_state.mapper} before "
633 "proceeding with a refresh"
634 )
635
636 # overall, the ORM has no internal flow right now for "dont load the
637 # primary row of an object at all, but fire off
638 # selectinload/subqueryload/immediateload for some relationships".
639 # It would probably be a pretty big effort to add such a flow. So
640 # here, the case for #8703 is introduced; user asks to refresh some
641 # relationship attributes only which are
642 # selectinload/subqueryload/immediateload/ etc. (not joinedload).
643 # ORM complains there's no columns in the primary row to load.
644 # So here, we just add the PK cols if that
645 # case is detected, so that there is a SELECT emitted for the primary
646 # row.
647 #
648 # Let's just state right up front, for this one little case,
649 # the ORM here is adding a whole extra SELECT just to satisfy
650 # limitations in the internal flow. This is really not a thing
651 # SQLAlchemy finds itself doing like, ever, obviously, we are
652 # constantly working to *remove* SELECTs we don't need. We
653 # rationalize this for now based on 1. session.refresh() is not
654 # commonly used 2. session.refresh() with only relationship attrs is
655 # even less commonly used 3. the SELECT in question is very low
656 # latency.
657 #
658 # to add the flow to not include the SELECT, the quickest way
659 # might be to just manufacture a single-row result set to send off to
660 # instances(), but we'd have to weave that into context.py and all
661 # that. For 2.0.0, we have enough big changes to navigate for now.
662 #
663 mp = refresh_state.mapper._props
664 for p in only_load_props:
665 if mp[p]._is_relationship:
666 only_load_props = refresh_state_prokeys.union(only_load_props)
667 break
668
669 if refresh_state and refresh_state.load_options:
670 compile_options += {"_current_path": refresh_state.load_path.parent}
671 q = q.options(*refresh_state.load_options)
672
673 new_compile_options, load_options = _set_get_options(
674 compile_options,
675 load_options,
676 version_check=version_check,
677 only_load_props=only_load_props,
678 refresh_state=refresh_state,
679 identity_token=identity_token,
680 is_user_refresh=is_user_refresh,
681 )
682
683 q._compile_options = new_compile_options
684 q._order_by = None
685
686 if no_autoflush:
687 load_options += {"_autoflush": False}
688
689 execution_options = util.EMPTY_DICT.merge_with(
690 execution_options, {"_sa_orm_load_options": load_options}
691 )
692 result = (
693 session.execute(
694 q,
695 params=params,
696 execution_options=execution_options,
697 bind_arguments=bind_arguments,
698 )
699 .unique()
700 .scalars()
701 )
702
703 try:
704 return result.one()
705 except orm_exc.NoResultFound:
706 return None
707
708
709def _set_get_options(
710 compile_opt,
711 load_opt,
712 populate_existing=None,
713 version_check=None,
714 only_load_props=None,
715 refresh_state=None,
716 identity_token=None,
717 is_user_refresh=None,
718):
719 compile_options = {}
720 load_options = {}
721 if version_check:
722 load_options["_version_check"] = version_check
723 if populate_existing:
724 load_options["_populate_existing"] = populate_existing
725 if refresh_state:
726 load_options["_refresh_state"] = refresh_state
727 compile_options["_for_refresh_state"] = True
728 if only_load_props:
729 compile_options["_only_load_props"] = frozenset(only_load_props)
730 if identity_token:
731 load_options["_identity_token"] = identity_token
732
733 if is_user_refresh:
734 load_options["_is_user_refresh"] = is_user_refresh
735 if load_options:
736 load_opt += load_options
737 if compile_options:
738 compile_opt += compile_options
739
740 return compile_opt, load_opt
741
742
743def _setup_entity_query(
744 compile_state,
745 mapper,
746 query_entity,
747 path,
748 adapter,
749 column_collection,
750 with_polymorphic=None,
751 only_load_props=None,
752 polymorphic_discriminator=None,
753 **kw,
754):
755 if with_polymorphic:
756 poly_properties = mapper._iterate_polymorphic_properties(
757 with_polymorphic
758 )
759 else:
760 poly_properties = mapper._polymorphic_properties
761
762 quick_populators = {}
763
764 path.set(compile_state.attributes, "memoized_setups", quick_populators)
765
766 # for the lead entities in the path, e.g. not eager loads, and
767 # assuming a user-passed aliased class, e.g. not a from_self() or any
768 # implicit aliasing, don't add columns to the SELECT that aren't
769 # in the thing that's aliased.
770 check_for_adapt = adapter and len(path) == 1 and path[-1].is_aliased_class
771
772 for value in poly_properties:
773 if only_load_props and value.key not in only_load_props:
774 continue
775 value.setup(
776 compile_state,
777 query_entity,
778 path,
779 adapter,
780 only_load_props=only_load_props,
781 column_collection=column_collection,
782 memoized_populators=quick_populators,
783 check_for_adapt=check_for_adapt,
784 **kw,
785 )
786
787 if (
788 polymorphic_discriminator is not None
789 and polymorphic_discriminator is not mapper.polymorphic_on
790 ):
791 if adapter:
792 pd = adapter.columns[polymorphic_discriminator]
793 else:
794 pd = polymorphic_discriminator
795 column_collection.append(pd)
796
797
798def _warn_for_runid_changed(state):
799 util.warn(
800 "Loading context for %s has changed within a load/refresh "
801 "handler, suggesting a row refresh operation took place. If this "
802 "event handler is expected to be "
803 "emitting row refresh operations within an existing load or refresh "
804 "operation, set restore_load_context=True when establishing the "
805 "listener to ensure the context remains unchanged when the event "
806 "handler completes." % (state_str(state),)
807 )
808
809
810def _instance_processor(
811 query_entity,
812 mapper,
813 context,
814 result,
815 path,
816 adapter,
817 only_load_props=None,
818 refresh_state=None,
819 polymorphic_discriminator=None,
820 _polymorphic_from=None,
821):
822 """Produce a mapper level row processor callable
823 which processes rows into mapped instances."""
824
825 # note that this method, most of which exists in a closure
826 # called _instance(), resists being broken out, as
827 # attempts to do so tend to add significant function
828 # call overhead. _instance() is the most
829 # performance-critical section in the whole ORM.
830
831 identity_class = mapper._identity_class
832 compile_state = context.compile_state
833
834 # look for "row getter" functions that have been assigned along
835 # with the compile state that were cached from a previous load.
836 # these are operator.itemgetter() objects that each will extract a
837 # particular column from each row.
838
839 getter_key = ("getters", mapper)
840 getters = path.get(compile_state.attributes, getter_key, None)
841
842 if getters is None:
843 # no getters, so go through a list of attributes we are loading for,
844 # and the ones that are column based will have already put information
845 # for us in another collection "memoized_setups", which represents the
846 # output of the LoaderStrategy.setup_query() method. We can just as
847 # easily call LoaderStrategy.create_row_processor for each, but by
848 # getting it all at once from setup_query we save another method call
849 # per attribute.
850 props = mapper._prop_set
851 if only_load_props is not None:
852 props = props.intersection(
853 mapper._props[k] for k in only_load_props
854 )
855
856 quick_populators = path.get(
857 context.attributes, "memoized_setups", EMPTY_DICT
858 )
859
860 todo = []
861 cached_populators = {
862 "new": [],
863 "quick": [],
864 "deferred": [],
865 "expire": [],
866 "existing": [],
867 "eager": [],
868 }
869
870 if refresh_state is None:
871 # we can also get the "primary key" tuple getter function
872 pk_cols = mapper.primary_key
873
874 if adapter:
875 pk_cols = [adapter.columns[c] for c in pk_cols]
876 primary_key_getter = result._tuple_getter(pk_cols)
877 else:
878 primary_key_getter = None
879
880 getters = {
881 "cached_populators": cached_populators,
882 "todo": todo,
883 "primary_key_getter": primary_key_getter,
884 }
885 for prop in props:
886 if prop in quick_populators:
887 # this is an inlined path just for column-based attributes.
888 col = quick_populators[prop]
889 if col is _DEFER_FOR_STATE:
890 cached_populators["new"].append(
891 (prop.key, prop._deferred_column_loader)
892 )
893 elif col is _SET_DEFERRED_EXPIRED:
894 # note that in this path, we are no longer
895 # searching in the result to see if the column might
896 # be present in some unexpected way.
897 cached_populators["expire"].append((prop.key, False))
898 elif col is _RAISE_FOR_STATE:
899 cached_populators["new"].append(
900 (prop.key, prop._raise_column_loader)
901 )
902 else:
903 getter = None
904 if adapter:
905 # this logic had been removed for all 1.4 releases
906 # up until 1.4.18; the adapter here is particularly
907 # the compound eager adapter which isn't accommodated
908 # in the quick_populators right now. The "fallback"
909 # logic below instead took over in many more cases
910 # until issue #6596 was identified.
911
912 # note there is still an issue where this codepath
913 # produces no "getter" for cases where a joined-inh
914 # mapping includes a labeled column property, meaning
915 # KeyError is caught internally and we fall back to
916 # _getter(col), which works anyway. The adapter
917 # here for joined inh without any aliasing might not
918 # be useful. Tests which see this include
919 # test.orm.inheritance.test_basic ->
920 # EagerTargetingTest.test_adapt_stringency
921 # OptimizedLoadTest.test_column_expression_joined
922 # PolymorphicOnNotLocalTest.test_polymorphic_on_column_prop # noqa: E501
923 #
924
925 adapted_col = adapter.columns[col]
926 if adapted_col is not None:
927 getter = result._getter(adapted_col, False)
928 if not getter:
929 getter = result._getter(col, False)
930 if getter:
931 cached_populators["quick"].append((prop.key, getter))
932 else:
933 # fall back to the ColumnProperty itself, which
934 # will iterate through all of its columns
935 # to see if one fits
936 prop.create_row_processor(
937 context,
938 query_entity,
939 path,
940 mapper,
941 result,
942 adapter,
943 cached_populators,
944 )
945 else:
946 # loader strategies like subqueryload, selectinload,
947 # joinedload, basically relationships, these need to interact
948 # with the context each time to work correctly.
949 todo.append(prop)
950
951 path.set(compile_state.attributes, getter_key, getters)
952
953 cached_populators = getters["cached_populators"]
954
955 populators = {key: list(value) for key, value in cached_populators.items()}
956 for prop in getters["todo"]:
957 prop.create_row_processor(
958 context, query_entity, path, mapper, result, adapter, populators
959 )
960
961 propagated_loader_options = context.propagated_loader_options
962 load_path = (
963 context.compile_state.current_path + path
964 if context.compile_state.current_path.path
965 else path
966 )
967
968 session_identity_map = context.session.identity_map
969
970 populate_existing = context.populate_existing or mapper.always_refresh
971 load_evt = bool(mapper.class_manager.dispatch.load)
972 refresh_evt = bool(mapper.class_manager.dispatch.refresh)
973 persistent_evt = bool(context.session.dispatch.loaded_as_persistent)
974 if persistent_evt:
975 loaded_as_persistent = context.session.dispatch.loaded_as_persistent
976 instance_state = attributes.instance_state
977 instance_dict = attributes.instance_dict
978 session_id = context.session.hash_key
979 runid = context.runid
980 identity_token = context.identity_token
981
982 version_check = context.version_check
983 if version_check:
984 version_id_col = mapper.version_id_col
985 if version_id_col is not None:
986 if adapter:
987 version_id_col = adapter.columns[version_id_col]
988 version_id_getter = result._getter(version_id_col)
989 else:
990 version_id_getter = None
991
992 if not refresh_state and _polymorphic_from is not None:
993 key = ("loader", path.path)
994
995 if key in context.attributes and context.attributes[key].strategy == (
996 ("selectinload_polymorphic", True),
997 ):
998 option_entities = context.attributes[key].local_opts["entities"]
999 else:
1000 option_entities = None
1001 selectin_load_via = mapper._should_selectin_load(
1002 option_entities,
1003 _polymorphic_from,
1004 )
1005
1006 if selectin_load_via and selectin_load_via is not _polymorphic_from:
1007 # only_load_props goes w/ refresh_state only, and in a refresh
1008 # we are a single row query for the exact entity; polymorphic
1009 # loading does not apply
1010 assert only_load_props is None
1011
1012 if selectin_load_via.is_mapper:
1013 _load_supers = []
1014 _endmost_mapper = selectin_load_via
1015 while (
1016 _endmost_mapper
1017 and _endmost_mapper is not _polymorphic_from
1018 ):
1019 _load_supers.append(_endmost_mapper)
1020 _endmost_mapper = _endmost_mapper.inherits
1021 else:
1022 _load_supers = [selectin_load_via]
1023
1024 for _selectinload_entity in _load_supers:
1025 if _PostLoad.path_exists(
1026 context, load_path, _selectinload_entity
1027 ):
1028 continue
1029 callable_ = _load_subclass_via_in(
1030 context,
1031 path,
1032 _selectinload_entity,
1033 _polymorphic_from,
1034 option_entities,
1035 )
1036 _PostLoad.callable_for_path(
1037 context,
1038 load_path,
1039 _selectinload_entity.mapper,
1040 _selectinload_entity,
1041 callable_,
1042 _selectinload_entity,
1043 )
1044
1045 post_load = _PostLoad.for_context(context, load_path, only_load_props)
1046
1047 if refresh_state:
1048 refresh_identity_key = refresh_state.key
1049 if refresh_identity_key is None:
1050 # super-rare condition; a refresh is being called
1051 # on a non-instance-key instance; this is meant to only
1052 # occur within a flush()
1053 refresh_identity_key = mapper._identity_key_from_state(
1054 refresh_state
1055 )
1056 else:
1057 refresh_identity_key = None
1058
1059 primary_key_getter = getters["primary_key_getter"]
1060
1061 if mapper.allow_partial_pks:
1062 is_not_primary_key = _none_set.issuperset
1063 else:
1064 is_not_primary_key = _none_set.intersection
1065
1066 def _instance(row):
1067 # determine the state that we'll be populating
1068 if refresh_identity_key:
1069 # fixed state that we're refreshing
1070 state = refresh_state
1071 instance = state.obj()
1072 dict_ = instance_dict(instance)
1073 isnew = state.runid != runid
1074 currentload = True
1075 loaded_instance = False
1076 else:
1077 # look at the row, see if that identity is in the
1078 # session, or we have to create a new one
1079 identitykey = (
1080 identity_class,
1081 primary_key_getter(row),
1082 identity_token,
1083 )
1084
1085 instance = session_identity_map.get(identitykey)
1086
1087 if instance is not None:
1088 # existing instance
1089 state = instance_state(instance)
1090 dict_ = instance_dict(instance)
1091
1092 isnew = state.runid != runid
1093 currentload = not isnew
1094 loaded_instance = False
1095
1096 if version_check and version_id_getter and not currentload:
1097 _validate_version_id(
1098 mapper, state, dict_, row, version_id_getter
1099 )
1100
1101 else:
1102 # create a new instance
1103
1104 # check for non-NULL values in the primary key columns,
1105 # else no entity is returned for the row
1106 if is_not_primary_key(identitykey[1]):
1107 return None
1108
1109 isnew = True
1110 currentload = True
1111 loaded_instance = True
1112
1113 instance = mapper.class_manager.new_instance()
1114
1115 dict_ = instance_dict(instance)
1116 state = instance_state(instance)
1117 state.key = identitykey
1118 state.identity_token = identity_token
1119
1120 # attach instance to session.
1121 state.session_id = session_id
1122 session_identity_map._add_unpresent(state, identitykey)
1123
1124 effective_populate_existing = populate_existing
1125 if refresh_state is state:
1126 effective_populate_existing = True
1127
1128 # populate. this looks at whether this state is new
1129 # for this load or was existing, and whether or not this
1130 # row is the first row with this identity.
1131 if currentload or effective_populate_existing:
1132 # full population routines. Objects here are either
1133 # just created, or we are doing a populate_existing
1134
1135 # be conservative about setting load_path when populate_existing
1136 # is in effect; want to maintain options from the original
1137 # load. see test_expire->test_refresh_maintains_deferred_options
1138 if isnew and (
1139 propagated_loader_options or not effective_populate_existing
1140 ):
1141 state.load_options = propagated_loader_options
1142 state.load_path = load_path
1143
1144 _populate_full(
1145 context,
1146 row,
1147 state,
1148 dict_,
1149 isnew,
1150 load_path,
1151 loaded_instance,
1152 effective_populate_existing,
1153 populators,
1154 )
1155
1156 if isnew:
1157 # state.runid should be equal to context.runid / runid
1158 # here, however for event checks we are being more conservative
1159 # and checking against existing run id
1160 # assert state.runid == runid
1161
1162 existing_runid = state.runid
1163
1164 if loaded_instance:
1165 if load_evt:
1166 state.manager.dispatch.load(state, context)
1167 if state.runid != existing_runid:
1168 _warn_for_runid_changed(state)
1169 if persistent_evt:
1170 loaded_as_persistent(context.session, state)
1171 if state.runid != existing_runid:
1172 _warn_for_runid_changed(state)
1173 elif refresh_evt:
1174 state.manager.dispatch.refresh(
1175 state, context, only_load_props
1176 )
1177 if state.runid != runid:
1178 _warn_for_runid_changed(state)
1179
1180 if effective_populate_existing or state.modified:
1181 if refresh_state and only_load_props:
1182 state._commit(dict_, only_load_props)
1183 else:
1184 state._commit_all(dict_, session_identity_map)
1185
1186 if post_load:
1187 post_load.add_state(state, True)
1188
1189 else:
1190 # partial population routines, for objects that were already
1191 # in the Session, but a row matches them; apply eager loaders
1192 # on existing objects, etc.
1193 unloaded = state.unloaded
1194 isnew = state not in context.partials
1195
1196 if not isnew or unloaded or populators["eager"]:
1197 # state is having a partial set of its attributes
1198 # refreshed. Populate those attributes,
1199 # and add to the "context.partials" collection.
1200
1201 to_load = _populate_partial(
1202 context,
1203 row,
1204 state,
1205 dict_,
1206 isnew,
1207 load_path,
1208 unloaded,
1209 populators,
1210 )
1211
1212 if isnew:
1213 if refresh_evt:
1214 existing_runid = state.runid
1215 state.manager.dispatch.refresh(state, context, to_load)
1216 if state.runid != existing_runid:
1217 _warn_for_runid_changed(state)
1218
1219 state._commit(dict_, to_load)
1220
1221 if post_load and context.invoke_all_eagers:
1222 post_load.add_state(state, False)
1223
1224 return instance
1225
1226 if mapper.polymorphic_map and not _polymorphic_from and not refresh_state:
1227 # if we are doing polymorphic, dispatch to a different _instance()
1228 # method specific to the subclass mapper
1229 def ensure_no_pk(row):
1230 identitykey = (
1231 identity_class,
1232 primary_key_getter(row),
1233 identity_token,
1234 )
1235 if not is_not_primary_key(identitykey[1]):
1236 return identitykey
1237 else:
1238 return None
1239
1240 _instance = _decorate_polymorphic_switch(
1241 _instance,
1242 context,
1243 query_entity,
1244 mapper,
1245 result,
1246 path,
1247 polymorphic_discriminator,
1248 adapter,
1249 ensure_no_pk,
1250 )
1251
1252 return _instance
1253
1254
1255def _load_subclass_via_in(
1256 context, path, entity, polymorphic_from, option_entities
1257):
1258 mapper = entity.mapper
1259
1260 # TODO: polymorphic_from seems to be a Mapper in all cases.
1261 # this is likely not needed, but as we dont have typing in loading.py
1262 # yet, err on the safe side
1263 polymorphic_from_mapper = polymorphic_from.mapper
1264 not_against_basemost = polymorphic_from_mapper.inherits is not None
1265
1266 zero_idx = len(mapper.base_mapper.primary_key) == 1
1267
1268 if entity.is_aliased_class or not_against_basemost:
1269 q, enable_opt, disable_opt = mapper._subclass_load_via_in(
1270 entity, polymorphic_from
1271 )
1272 else:
1273 q, enable_opt, disable_opt = mapper._subclass_load_via_in_mapper
1274
1275 def do_load(context, path, states, load_only, effective_entity):
1276 if not option_entities:
1277 # filter out states for those that would have selectinloaded
1278 # from another loader
1279 # TODO: we are currently ignoring the case where the
1280 # "selectin_polymorphic" option is used, as this is much more
1281 # complex / specific / very uncommon API use
1282 states = [
1283 (s, v)
1284 for s, v in states
1285 if s.mapper._would_selectin_load_only_from_given_mapper(mapper)
1286 ]
1287
1288 if not states:
1289 return
1290
1291 orig_query = context.query
1292
1293 if path.parent:
1294 enable_opt_lcl = enable_opt._prepend_path(path)
1295 disable_opt_lcl = disable_opt._prepend_path(path)
1296 else:
1297 enable_opt_lcl = enable_opt
1298 disable_opt_lcl = disable_opt
1299 options = (
1300 (enable_opt_lcl,) + orig_query._with_options + (disable_opt_lcl,)
1301 )
1302
1303 q2 = q.options(*options)
1304
1305 q2._compile_options = context.compile_state.default_compile_options
1306 q2._compile_options += {"_current_path": path.parent}
1307
1308 if context.populate_existing:
1309 q2 = q2.execution_options(populate_existing=True)
1310
1311 while states:
1312 chunk = states[0 : _SelectInLoader._chunksize]
1313 states = states[_SelectInLoader._chunksize :]
1314 context.session.execute(
1315 q2,
1316 dict(
1317 primary_keys=[
1318 state.key[1][0] if zero_idx else state.key[1]
1319 for state, load_attrs in chunk
1320 ]
1321 ),
1322 ).unique().scalars().all()
1323
1324 return do_load
1325
1326
1327def _populate_full(
1328 context,
1329 row,
1330 state,
1331 dict_,
1332 isnew,
1333 load_path,
1334 loaded_instance,
1335 populate_existing,
1336 populators,
1337):
1338 if isnew:
1339 # first time we are seeing a row with this identity.
1340 state.runid = context.runid
1341
1342 for key, getter in populators["quick"]:
1343 dict_[key] = getter(row)
1344 if populate_existing:
1345 for key, set_callable in populators["expire"]:
1346 dict_.pop(key, None)
1347 if set_callable:
1348 state.expired_attributes.add(key)
1349 else:
1350 for key, set_callable in populators["expire"]:
1351 if set_callable:
1352 state.expired_attributes.add(key)
1353
1354 for key, populator in populators["new"]:
1355 populator(state, dict_, row)
1356
1357 elif load_path != state.load_path:
1358 # new load path, e.g. object is present in more than one
1359 # column position in a series of rows
1360 state.load_path = load_path
1361
1362 # if we have data, and the data isn't in the dict, OK, let's put
1363 # it in.
1364 for key, getter in populators["quick"]:
1365 if key not in dict_:
1366 dict_[key] = getter(row)
1367
1368 # otherwise treat like an "already seen" row
1369 for key, populator in populators["existing"]:
1370 populator(state, dict_, row)
1371 # TODO: allow "existing" populator to know this is
1372 # a new path for the state:
1373 # populator(state, dict_, row, new_path=True)
1374
1375 else:
1376 # have already seen rows with this identity in this same path.
1377 for key, populator in populators["existing"]:
1378 populator(state, dict_, row)
1379
1380 # TODO: same path
1381 # populator(state, dict_, row, new_path=False)
1382
1383
1384def _populate_partial(
1385 context, row, state, dict_, isnew, load_path, unloaded, populators
1386):
1387 if not isnew:
1388 if unloaded:
1389 # extra pass, see #8166
1390 for key, getter in populators["quick"]:
1391 if key in unloaded:
1392 dict_[key] = getter(row)
1393
1394 to_load = context.partials[state]
1395 for key, populator in populators["existing"]:
1396 if key in to_load:
1397 populator(state, dict_, row)
1398 else:
1399 to_load = unloaded
1400 context.partials[state] = to_load
1401
1402 for key, getter in populators["quick"]:
1403 if key in to_load:
1404 dict_[key] = getter(row)
1405 for key, set_callable in populators["expire"]:
1406 if key in to_load:
1407 dict_.pop(key, None)
1408 if set_callable:
1409 state.expired_attributes.add(key)
1410 for key, populator in populators["new"]:
1411 if key in to_load:
1412 populator(state, dict_, row)
1413
1414 for key, populator in populators["eager"]:
1415 if key not in unloaded:
1416 populator(state, dict_, row)
1417
1418 return to_load
1419
1420
1421def _validate_version_id(mapper, state, dict_, row, getter):
1422 if mapper._get_state_attr_by_column(
1423 state, dict_, mapper.version_id_col
1424 ) != getter(row):
1425 raise orm_exc.StaleDataError(
1426 "Instance '%s' has version id '%s' which "
1427 "does not match database-loaded version id '%s'."
1428 % (
1429 state_str(state),
1430 mapper._get_state_attr_by_column(
1431 state, dict_, mapper.version_id_col
1432 ),
1433 getter(row),
1434 )
1435 )
1436
1437
1438def _decorate_polymorphic_switch(
1439 instance_fn,
1440 context,
1441 query_entity,
1442 mapper,
1443 result,
1444 path,
1445 polymorphic_discriminator,
1446 adapter,
1447 ensure_no_pk,
1448):
1449 if polymorphic_discriminator is not None:
1450 polymorphic_on = polymorphic_discriminator
1451 else:
1452 polymorphic_on = mapper.polymorphic_on
1453 if polymorphic_on is None:
1454 return instance_fn
1455
1456 if adapter:
1457 polymorphic_on = adapter.columns[polymorphic_on]
1458
1459 def configure_subclass_mapper(discriminator):
1460 try:
1461 sub_mapper = mapper.polymorphic_map[discriminator]
1462 except KeyError:
1463 raise AssertionError(
1464 "No such polymorphic_identity %r is defined" % discriminator
1465 )
1466 else:
1467 if sub_mapper is mapper:
1468 return None
1469 elif not sub_mapper.isa(mapper):
1470 return False
1471
1472 return _instance_processor(
1473 query_entity,
1474 sub_mapper,
1475 context,
1476 result,
1477 path,
1478 adapter,
1479 _polymorphic_from=mapper,
1480 )
1481
1482 polymorphic_instances = util.PopulateDict(configure_subclass_mapper)
1483
1484 getter = result._getter(polymorphic_on)
1485
1486 def polymorphic_instance(row):
1487 discriminator = getter(row)
1488 if discriminator is not None:
1489 _instance = polymorphic_instances[discriminator]
1490 if _instance:
1491 return _instance(row)
1492 elif _instance is False:
1493 identitykey = ensure_no_pk(row)
1494
1495 if identitykey:
1496 raise sa_exc.InvalidRequestError(
1497 "Row with identity key %s can't be loaded into an "
1498 "object; the polymorphic discriminator column '%s' "
1499 "refers to %s, which is not a sub-mapper of "
1500 "the requested %s"
1501 % (
1502 identitykey,
1503 polymorphic_on,
1504 mapper.polymorphic_map[discriminator],
1505 mapper,
1506 )
1507 )
1508 else:
1509 return None
1510 else:
1511 return instance_fn(row)
1512 else:
1513 identitykey = ensure_no_pk(row)
1514
1515 if identitykey:
1516 raise sa_exc.InvalidRequestError(
1517 "Row with identity key %s can't be loaded into an "
1518 "object; the polymorphic discriminator column '%s' is "
1519 "NULL" % (identitykey, polymorphic_on)
1520 )
1521 else:
1522 return None
1523
1524 return polymorphic_instance
1525
1526
1527class _PostLoad:
1528 """Track loaders and states for "post load" operations."""
1529
1530 __slots__ = "loaders", "states", "load_keys"
1531
1532 def __init__(self):
1533 self.loaders = {}
1534 self.states = util.OrderedDict()
1535 self.load_keys = None
1536
1537 def add_state(self, state, overwrite):
1538 # the states for a polymorphic load here are all shared
1539 # within a single PostLoad object among multiple subtypes.
1540 # Filtering of callables on a per-subclass basis needs to be done at
1541 # the invocation level
1542 self.states[state] = overwrite
1543
1544 def invoke(self, context, path):
1545 if not self.states:
1546 return
1547 path = path_registry.PathRegistry.coerce(path)
1548 for (
1549 effective_context,
1550 token,
1551 limit_to_mapper,
1552 loader,
1553 arg,
1554 kw,
1555 ) in self.loaders.values():
1556 states = [
1557 (state, overwrite)
1558 for state, overwrite in self.states.items()
1559 if state.manager.mapper.isa(limit_to_mapper)
1560 ]
1561 if states:
1562 loader(
1563 effective_context, path, states, self.load_keys, *arg, **kw
1564 )
1565 self.states.clear()
1566
1567 @classmethod
1568 def for_context(cls, context, path, only_load_props):
1569 pl = context.post_load_paths.get(path.path)
1570 if pl is not None and only_load_props:
1571 pl.load_keys = only_load_props
1572 return pl
1573
1574 @classmethod
1575 def path_exists(self, context, path, key):
1576 return (
1577 path.path in context.post_load_paths
1578 and key in context.post_load_paths[path.path].loaders
1579 )
1580
1581 @classmethod
1582 def callable_for_path(
1583 cls, context, path, limit_to_mapper, token, loader_callable, *arg, **kw
1584 ):
1585 if path.path in context.post_load_paths:
1586 pl = context.post_load_paths[path.path]
1587 else:
1588 pl = context.post_load_paths[path.path] = _PostLoad()
1589 pl.loaders[token] = (
1590 context,
1591 token,
1592 limit_to_mapper,
1593 loader_callable,
1594 arg,
1595 kw,
1596 )
1597
1598
1599def _load_scalar_attributes(mapper, state, attribute_names, passive):
1600 """initiate a column-based attribute refresh operation."""
1601
1602 # assert mapper is _state_mapper(state)
1603 session = state.session
1604 if not session:
1605 raise orm_exc.DetachedInstanceError(
1606 "Instance %s is not bound to a Session; "
1607 "attribute refresh operation cannot proceed" % (state_str(state))
1608 )
1609
1610 no_autoflush = bool(passive & attributes.NO_AUTOFLUSH)
1611
1612 # in the case of inheritance, particularly concrete and abstract
1613 # concrete inheritance, the class manager might have some keys
1614 # of attributes on the superclass that we didn't actually map.
1615 # These could be mapped as "concrete, don't load" or could be completely
1616 # excluded from the mapping and we know nothing about them. Filter them
1617 # here to prevent them from coming through.
1618 if attribute_names:
1619 attribute_names = attribute_names.intersection(mapper.attrs.keys())
1620
1621 if mapper.inherits and not mapper.concrete:
1622 # load based on committed attributes in the object, formed into
1623 # a truncated SELECT that only includes relevant tables. does not
1624 # currently use state.key
1625 statement = mapper._optimized_get_statement(state, attribute_names)
1626 if statement is not None:
1627 # undefer() isn't needed here because statement has the
1628 # columns needed already, this implicitly undefers that column
1629 stmt = FromStatement(mapper, statement)
1630
1631 return _load_on_ident(
1632 session,
1633 stmt,
1634 None,
1635 only_load_props=attribute_names,
1636 refresh_state=state,
1637 no_autoflush=no_autoflush,
1638 )
1639
1640 # normal load, use state.key as the identity to SELECT
1641 has_key = bool(state.key)
1642
1643 if has_key:
1644 identity_key = state.key
1645 else:
1646 # this codepath is rare - only valid when inside a flush, and the
1647 # object is becoming persistent but hasn't yet been assigned
1648 # an identity_key.
1649 # check here to ensure we have the attrs we need.
1650 pk_attrs = [
1651 mapper._columntoproperty[col].key for col in mapper.primary_key
1652 ]
1653 if state.expired_attributes.intersection(pk_attrs):
1654 raise sa_exc.InvalidRequestError(
1655 "Instance %s cannot be refreshed - it's not "
1656 " persistent and does not "
1657 "contain a full primary key." % state_str(state)
1658 )
1659 identity_key = mapper._identity_key_from_state(state)
1660
1661 if (
1662 _none_set.issubset(identity_key) and not mapper.allow_partial_pks
1663 ) or _none_set.issuperset(identity_key):
1664 util.warn_limited(
1665 "Instance %s to be refreshed doesn't "
1666 "contain a full primary key - can't be refreshed "
1667 "(and shouldn't be expired, either).",
1668 state_str(state),
1669 )
1670 return
1671
1672 result = _load_on_ident(
1673 session,
1674 select(mapper).set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL),
1675 identity_key,
1676 refresh_state=state,
1677 only_load_props=attribute_names,
1678 no_autoflush=no_autoflush,
1679 )
1680
1681 # if instance is pending, a refresh operation
1682 # may not complete (even if PK attributes are assigned)
1683 if has_key and result is None:
1684 raise orm_exc.ObjectDeletedError(state)