1# orm/loading.py
2# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: ignore-errors
8
9
10"""private module containing functions used to convert database
11rows into object instances and associated state.
12
13the functions here are called primarily by Query, Mapper,
14as well as some of the attribute loading strategies.
15
16"""
17
18from __future__ import annotations
19
20from typing import Any
21from typing import Dict
22from typing import Iterable
23from typing import List
24from typing import Mapping
25from typing import Optional
26from typing import Sequence
27from typing import Tuple
28from typing import TYPE_CHECKING
29from typing import TypeVar
30from typing import Union
31
32from . import attributes
33from . import exc as orm_exc
34from . import path_registry
35from .base import _DEFER_FOR_STATE
36from .base import _RAISE_FOR_STATE
37from .base import _SET_DEFERRED_EXPIRED
38from .base import PassiveFlag
39from .context import FromStatement
40from .context import ORMCompileState
41from .context import QueryContext
42from .util import _none_set
43from .util import state_str
44from .. import exc as sa_exc
45from .. import util
46from ..engine import result_tuple
47from ..engine.result import ChunkedIteratorResult
48from ..engine.result import FrozenResult
49from ..engine.result import SimpleResultMetaData
50from ..sql import select
51from ..sql import util as sql_util
52from ..sql.selectable import ForUpdateArg
53from ..sql.selectable import LABEL_STYLE_TABLENAME_PLUS_COL
54from ..sql.selectable import SelectState
55from ..util import EMPTY_DICT
56from ..util.typing import TupleAny
57from ..util.typing import Unpack
58
59if TYPE_CHECKING:
60 from ._typing import _IdentityKeyType
61 from .base import LoaderCallableStatus
62 from .interfaces import ORMOption
63 from .mapper import Mapper
64 from .query import Query
65 from .session import Session
66 from .state import InstanceState
67 from ..engine.cursor import CursorResult
68 from ..engine.interfaces import _ExecuteOptions
69 from ..engine.result import Result
70 from ..sql import Select
71
72_T = TypeVar("_T", bound=Any)
73_O = TypeVar("_O", bound=object)
74_new_runid = util.counter()
75
76
77_PopulatorDict = Dict[str, List[Tuple[str, Any]]]
78
79
80def instances(
81 cursor: CursorResult[Unpack[TupleAny]], context: QueryContext
82) -> Result[Unpack[TupleAny]]:
83 """Return a :class:`.Result` given an ORM query context.
84
85 :param cursor: a :class:`.CursorResult`, generated by a statement
86 which came from :class:`.ORMCompileState`
87
88 :param context: a :class:`.QueryContext` object
89
90 :return: a :class:`.Result` object representing ORM results
91
92 .. versionchanged:: 1.4 The instances() function now uses
93 :class:`.Result` objects and has an all new interface.
94
95 """
96
97 context.runid = _new_runid()
98
99 if context.top_level_context:
100 is_top_level = False
101 context.post_load_paths = context.top_level_context.post_load_paths
102 else:
103 is_top_level = True
104 context.post_load_paths = {}
105
106 compile_state = context.compile_state
107 filtered = compile_state._has_mapper_entities
108 single_entity = (
109 not context.load_options._only_return_tuples
110 and len(compile_state._entities) == 1
111 and compile_state._entities[0].supports_single_entity
112 )
113
114 try:
115 (process, labels, extra) = list(
116 zip(
117 *[
118 query_entity.row_processor(context, cursor)
119 for query_entity in context.compile_state._entities
120 ]
121 )
122 )
123
124 if context.yield_per and (
125 context.loaders_require_buffering
126 or context.loaders_require_uniquing
127 ):
128 raise sa_exc.InvalidRequestError(
129 "Can't use yield_per with eager loaders that require uniquing "
130 "or row buffering, e.g. joinedload() against collections "
131 "or subqueryload(). Consider the selectinload() strategy "
132 "for better flexibility in loading objects."
133 )
134
135 except Exception:
136 with util.safe_reraise():
137 cursor.close()
138
139 def _no_unique(entry):
140 raise sa_exc.InvalidRequestError(
141 "Can't use the ORM yield_per feature in conjunction with unique()"
142 )
143
144 def _not_hashable(datatype, *, legacy=False, uncertain=False):
145 if not legacy:
146
147 def go(obj):
148 if uncertain:
149 try:
150 return hash(obj)
151 except:
152 pass
153
154 raise sa_exc.InvalidRequestError(
155 "Can't apply uniqueness to row tuple containing value of "
156 f"""type {datatype!r}; {
157 'the values returned appear to be'
158 if uncertain
159 else 'this datatype produces'
160 } non-hashable values"""
161 )
162
163 return go
164 elif not uncertain:
165 return id
166 else:
167 _use_id = False
168
169 def go(obj):
170 nonlocal _use_id
171
172 if not _use_id:
173 try:
174 return hash(obj)
175 except:
176 pass
177
178 # in #10459, we considered using a warning here, however
179 # as legacy query uses result.unique() in all cases, this
180 # would lead to too many warning cases.
181 _use_id = True
182
183 return id(obj)
184
185 return go
186
187 unique_filters = [
188 (
189 _no_unique
190 if context.yield_per
191 else (
192 _not_hashable(
193 ent.column.type, # type: ignore
194 legacy=context.load_options._legacy_uniquing,
195 uncertain=ent._null_column_type,
196 )
197 if (
198 not ent.use_id_for_hash
199 and (ent._non_hashable_value or ent._null_column_type)
200 )
201 else id if ent.use_id_for_hash else None
202 )
203 )
204 for ent in context.compile_state._entities
205 ]
206
207 row_metadata = SimpleResultMetaData(
208 labels, extra, _unique_filters=unique_filters
209 )
210
211 def chunks(size): # type: ignore
212 while True:
213 yield_per = size
214
215 context.partials = {}
216
217 if yield_per:
218 fetch = cursor.fetchmany(yield_per)
219
220 if not fetch:
221 break
222 else:
223 fetch = cursor._raw_all_rows()
224
225 if single_entity:
226 proc = process[0]
227 rows = [proc(row) for row in fetch]
228 else:
229 rows = [
230 tuple([proc(row) for proc in process]) for row in fetch
231 ]
232
233 # if we are the originating load from a query, meaning we
234 # aren't being called as a result of a nested "post load",
235 # iterate through all the collected post loaders and fire them
236 # off. Previously this used to work recursively, however that
237 # prevented deeply nested structures from being loadable
238 if is_top_level:
239 if yield_per:
240 # if using yield per, memoize the state of the
241 # collection so that it can be restored
242 top_level_post_loads = list(
243 context.post_load_paths.items()
244 )
245
246 while context.post_load_paths:
247 post_loads = list(context.post_load_paths.items())
248 context.post_load_paths.clear()
249 for path, post_load in post_loads:
250 post_load.invoke(context, path)
251
252 if yield_per:
253 context.post_load_paths.clear()
254 context.post_load_paths.update(top_level_post_loads)
255
256 yield rows
257
258 if not yield_per:
259 break
260
261 if context.execution_options.get("prebuffer_rows", False):
262 # this is a bit of a hack at the moment.
263 # I would rather have some option in the result to pre-buffer
264 # internally.
265 _prebuffered = list(chunks(None))
266
267 def chunks(size):
268 return iter(_prebuffered)
269
270 result = ChunkedIteratorResult(
271 row_metadata,
272 chunks,
273 source_supports_scalars=single_entity,
274 raw=cursor,
275 dynamic_yield_per=cursor.context._is_server_side,
276 )
277
278 # filtered and single_entity are used to indicate to legacy Query that the
279 # query has ORM entities, so legacy deduping and scalars should be called
280 # on the result.
281 result._attributes = result._attributes.union(
282 dict(filtered=filtered, is_single_entity=single_entity)
283 )
284
285 # multi_row_eager_loaders OTOH is specific to joinedload.
286 if context.compile_state.multi_row_eager_loaders:
287
288 def require_unique(obj):
289 raise sa_exc.InvalidRequestError(
290 "The unique() method must be invoked on this Result, "
291 "as it contains results that include joined eager loads "
292 "against collections"
293 )
294
295 result._unique_filter_state = (None, require_unique)
296
297 if context.yield_per:
298 result.yield_per(context.yield_per)
299
300 return result
301
302
303@util.preload_module("sqlalchemy.orm.context")
304def merge_frozen_result(session, statement, frozen_result, load=True):
305 """Merge a :class:`_engine.FrozenResult` back into a :class:`_orm.Session`,
306 returning a new :class:`_engine.Result` object with :term:`persistent`
307 objects.
308
309 See the section :ref:`do_orm_execute_re_executing` for an example.
310
311 .. seealso::
312
313 :ref:`do_orm_execute_re_executing`
314
315 :meth:`_engine.Result.freeze`
316
317 :class:`_engine.FrozenResult`
318
319 """
320 querycontext = util.preloaded.orm_context
321
322 if load:
323 # flush current contents if we expect to load data
324 session._autoflush()
325
326 ctx = querycontext.ORMSelectCompileState._create_entities_collection(
327 statement, legacy=False
328 )
329
330 autoflush = session.autoflush
331 try:
332 session.autoflush = False
333 mapped_entities = [
334 i
335 for i, e in enumerate(ctx._entities)
336 if isinstance(e, querycontext._MapperEntity)
337 ]
338 keys = [ent._label_name for ent in ctx._entities]
339
340 keyed_tuple = result_tuple(
341 keys, [ent._extra_entities for ent in ctx._entities]
342 )
343
344 result = []
345 for newrow in frozen_result.rewrite_rows():
346 for i in mapped_entities:
347 if newrow[i] is not None:
348 newrow[i] = session._merge(
349 attributes.instance_state(newrow[i]),
350 attributes.instance_dict(newrow[i]),
351 load=load,
352 _recursive={},
353 _resolve_conflict_map={},
354 )
355
356 result.append(keyed_tuple(newrow))
357
358 return frozen_result.with_new_rows(result)
359 finally:
360 session.autoflush = autoflush
361
362
363@util.became_legacy_20(
364 ":func:`_orm.merge_result`",
365 alternative="The function as well as the method on :class:`_orm.Query` "
366 "is superseded by the :func:`_orm.merge_frozen_result` function.",
367)
368@util.preload_module("sqlalchemy.orm.context")
369def merge_result(
370 query: Query[Any],
371 iterator: Union[FrozenResult, Iterable[Sequence[Any]], Iterable[object]],
372 load: bool = True,
373) -> Union[FrozenResult, Iterable[Any]]:
374 """Merge a result into the given :class:`.Query` object's Session.
375
376 See :meth:`_orm.Query.merge_result` for top-level documentation on this
377 function.
378
379 """
380
381 querycontext = util.preloaded.orm_context
382
383 session = query.session
384 if load:
385 # flush current contents if we expect to load data
386 session._autoflush()
387
388 # TODO: need test coverage and documentation for the FrozenResult
389 # use case.
390 if isinstance(iterator, FrozenResult):
391 frozen_result = iterator
392 iterator = iter(frozen_result.data)
393 else:
394 frozen_result = None
395
396 ctx = querycontext.ORMSelectCompileState._create_entities_collection(
397 query, legacy=True
398 )
399
400 autoflush = session.autoflush
401 try:
402 session.autoflush = False
403 single_entity = not frozen_result and len(ctx._entities) == 1
404
405 if single_entity:
406 if isinstance(ctx._entities[0], querycontext._MapperEntity):
407 result = [
408 session._merge(
409 attributes.instance_state(instance),
410 attributes.instance_dict(instance),
411 load=load,
412 _recursive={},
413 _resolve_conflict_map={},
414 )
415 for instance in iterator
416 ]
417 else:
418 result = list(iterator)
419 else:
420 mapped_entities = [
421 i
422 for i, e in enumerate(ctx._entities)
423 if isinstance(e, querycontext._MapperEntity)
424 ]
425 result = []
426 keys = [ent._label_name for ent in ctx._entities]
427
428 keyed_tuple = result_tuple(
429 keys, [ent._extra_entities for ent in ctx._entities]
430 )
431
432 for row in iterator:
433 newrow = list(row)
434 for i in mapped_entities:
435 if newrow[i] is not None:
436 newrow[i] = session._merge(
437 attributes.instance_state(newrow[i]),
438 attributes.instance_dict(newrow[i]),
439 load=load,
440 _recursive={},
441 _resolve_conflict_map={},
442 )
443 result.append(keyed_tuple(newrow))
444
445 if frozen_result:
446 return frozen_result.with_new_rows(result)
447 else:
448 return iter(result)
449 finally:
450 session.autoflush = autoflush
451
452
453def get_from_identity(
454 session: Session,
455 mapper: Mapper[_O],
456 key: _IdentityKeyType[_O],
457 passive: PassiveFlag,
458) -> Union[LoaderCallableStatus, Optional[_O]]:
459 """Look up the given key in the given session's identity map,
460 check the object for expired state if found.
461
462 """
463 instance = session.identity_map.get(key)
464 if instance is not None:
465 state = attributes.instance_state(instance)
466
467 if mapper.inherits and not state.mapper.isa(mapper):
468 return attributes.PASSIVE_CLASS_MISMATCH
469
470 # expired - ensure it still exists
471 if state.expired:
472 if not passive & attributes.SQL_OK:
473 # TODO: no coverage here
474 return attributes.PASSIVE_NO_RESULT
475 elif not passive & attributes.RELATED_OBJECT_OK:
476 # this mode is used within a flush and the instance's
477 # expired state will be checked soon enough, if necessary.
478 # also used by immediateloader for a mutually-dependent
479 # o2m->m2m load, :ticket:`6301`
480 return instance
481 try:
482 state._load_expired(state, passive)
483 except orm_exc.ObjectDeletedError:
484 session._remove_newly_deleted([state])
485 return None
486 return instance
487 else:
488 return None
489
490
491def load_on_ident(
492 session: Session,
493 statement: Union[Select, FromStatement],
494 key: Optional[_IdentityKeyType],
495 *,
496 load_options: Optional[Sequence[ORMOption]] = None,
497 refresh_state: Optional[InstanceState[Any]] = None,
498 with_for_update: Optional[ForUpdateArg] = None,
499 only_load_props: Optional[Iterable[str]] = None,
500 no_autoflush: bool = False,
501 bind_arguments: Mapping[str, Any] = util.EMPTY_DICT,
502 execution_options: _ExecuteOptions = util.EMPTY_DICT,
503 require_pk_cols: bool = False,
504 is_user_refresh: bool = False,
505):
506 """Load the given identity key from the database."""
507 if key is not None:
508 ident = key[1]
509 identity_token = key[2]
510 else:
511 ident = identity_token = None
512
513 return load_on_pk_identity(
514 session,
515 statement,
516 ident,
517 load_options=load_options,
518 refresh_state=refresh_state,
519 with_for_update=with_for_update,
520 only_load_props=only_load_props,
521 identity_token=identity_token,
522 no_autoflush=no_autoflush,
523 bind_arguments=bind_arguments,
524 execution_options=execution_options,
525 require_pk_cols=require_pk_cols,
526 is_user_refresh=is_user_refresh,
527 )
528
529
530def load_on_pk_identity(
531 session: Session,
532 statement: Union[Select, FromStatement],
533 primary_key_identity: Optional[Tuple[Any, ...]],
534 *,
535 load_options: Optional[Sequence[ORMOption]] = None,
536 refresh_state: Optional[InstanceState[Any]] = None,
537 with_for_update: Optional[ForUpdateArg] = None,
538 only_load_props: Optional[Iterable[str]] = None,
539 identity_token: Optional[Any] = None,
540 no_autoflush: bool = False,
541 bind_arguments: Mapping[str, Any] = util.EMPTY_DICT,
542 execution_options: _ExecuteOptions = util.EMPTY_DICT,
543 require_pk_cols: bool = False,
544 is_user_refresh: bool = False,
545):
546 """Load the given primary key identity from the database."""
547
548 query = statement
549 q = query._clone()
550
551 assert not q._is_lambda_element
552
553 if load_options is None:
554 load_options = QueryContext.default_load_options
555
556 if (
557 statement._compile_options
558 is SelectState.default_select_compile_options
559 ):
560 compile_options = ORMCompileState.default_compile_options
561 else:
562 compile_options = statement._compile_options
563
564 if primary_key_identity is not None:
565 mapper = query._propagate_attrs["plugin_subject"]
566
567 (_get_clause, _get_params) = mapper._get_clause
568
569 # None present in ident - turn those comparisons
570 # into "IS NULL"
571 if None in primary_key_identity:
572 nones = {
573 _get_params[col].key
574 for col, value in zip(mapper.primary_key, primary_key_identity)
575 if value is None
576 }
577
578 _get_clause = sql_util.adapt_criterion_to_null(_get_clause, nones)
579
580 if len(nones) == len(primary_key_identity):
581 util.warn(
582 "fully NULL primary key identity cannot load any "
583 "object. This condition may raise an error in a future "
584 "release."
585 )
586
587 q._where_criteria = (
588 sql_util._deep_annotate(_get_clause, {"_orm_adapt": True}),
589 )
590
591 params = {
592 _get_params[primary_key].key: id_val
593 for id_val, primary_key in zip(
594 primary_key_identity, mapper.primary_key
595 )
596 }
597 else:
598 params = None
599
600 if with_for_update is not None:
601 version_check = True
602 q._for_update_arg = with_for_update
603 elif query._for_update_arg is not None:
604 version_check = True
605 q._for_update_arg = query._for_update_arg
606 else:
607 version_check = False
608
609 if require_pk_cols and only_load_props:
610 if not refresh_state:
611 raise sa_exc.ArgumentError(
612 "refresh_state is required when require_pk_cols is present"
613 )
614
615 refresh_state_prokeys = refresh_state.mapper._primary_key_propkeys
616 has_changes = {
617 key
618 for key in refresh_state_prokeys.difference(only_load_props)
619 if refresh_state.attrs[key].history.has_changes()
620 }
621 if has_changes:
622 # raise if pending pk changes are present.
623 # technically, this could be limited to the case where we have
624 # relationships in the only_load_props collection to be refreshed
625 # also (and only ones that have a secondary eager loader, at that).
626 # however, the error is in place across the board so that behavior
627 # here is easier to predict. The use case it prevents is one
628 # of mutating PK attrs, leaving them unflushed,
629 # calling session.refresh(), and expecting those attrs to remain
630 # still unflushed. It seems likely someone doing all those
631 # things would be better off having the PK attributes flushed
632 # to the database before tinkering like that (session.refresh() is
633 # tinkering).
634 raise sa_exc.InvalidRequestError(
635 f"Please flush pending primary key changes on "
636 "attributes "
637 f"{has_changes} for mapper {refresh_state.mapper} before "
638 "proceeding with a refresh"
639 )
640
641 # overall, the ORM has no internal flow right now for "dont load the
642 # primary row of an object at all, but fire off
643 # selectinload/subqueryload/immediateload for some relationships".
644 # It would probably be a pretty big effort to add such a flow. So
645 # here, the case for #8703 is introduced; user asks to refresh some
646 # relationship attributes only which are
647 # selectinload/subqueryload/immediateload/ etc. (not joinedload).
648 # ORM complains there's no columns in the primary row to load.
649 # So here, we just add the PK cols if that
650 # case is detected, so that there is a SELECT emitted for the primary
651 # row.
652 #
653 # Let's just state right up front, for this one little case,
654 # the ORM here is adding a whole extra SELECT just to satisfy
655 # limitations in the internal flow. This is really not a thing
656 # SQLAlchemy finds itself doing like, ever, obviously, we are
657 # constantly working to *remove* SELECTs we don't need. We
658 # rationalize this for now based on 1. session.refresh() is not
659 # commonly used 2. session.refresh() with only relationship attrs is
660 # even less commonly used 3. the SELECT in question is very low
661 # latency.
662 #
663 # to add the flow to not include the SELECT, the quickest way
664 # might be to just manufacture a single-row result set to send off to
665 # instances(), but we'd have to weave that into context.py and all
666 # that. For 2.0.0, we have enough big changes to navigate for now.
667 #
668 mp = refresh_state.mapper._props
669 for p in only_load_props:
670 if mp[p]._is_relationship:
671 only_load_props = refresh_state_prokeys.union(only_load_props)
672 break
673
674 if refresh_state and refresh_state.load_options:
675 compile_options += {"_current_path": refresh_state.load_path.parent}
676 q = q.options(*refresh_state.load_options)
677
678 new_compile_options, load_options = _set_get_options(
679 compile_options,
680 load_options,
681 version_check=version_check,
682 only_load_props=only_load_props,
683 refresh_state=refresh_state,
684 identity_token=identity_token,
685 is_user_refresh=is_user_refresh,
686 )
687
688 q._compile_options = new_compile_options
689 q._order_by = None
690
691 if no_autoflush:
692 load_options += {"_autoflush": False}
693
694 execution_options = util.EMPTY_DICT.merge_with(
695 execution_options, {"_sa_orm_load_options": load_options}
696 )
697 result = (
698 session.execute(
699 q,
700 params=params,
701 execution_options=execution_options,
702 bind_arguments=bind_arguments,
703 )
704 .unique()
705 .scalars()
706 )
707
708 try:
709 return result.one()
710 except orm_exc.NoResultFound:
711 return None
712
713
714def _set_get_options(
715 compile_opt,
716 load_opt,
717 populate_existing=None,
718 version_check=None,
719 only_load_props=None,
720 refresh_state=None,
721 identity_token=None,
722 is_user_refresh=None,
723):
724 compile_options = {}
725 load_options = {}
726 if version_check:
727 load_options["_version_check"] = version_check
728 if populate_existing:
729 load_options["_populate_existing"] = populate_existing
730 if refresh_state:
731 load_options["_refresh_state"] = refresh_state
732 compile_options["_for_refresh_state"] = True
733 if only_load_props:
734 compile_options["_only_load_props"] = frozenset(only_load_props)
735 if identity_token:
736 load_options["_identity_token"] = identity_token
737
738 if is_user_refresh:
739 load_options["_is_user_refresh"] = is_user_refresh
740 if load_options:
741 load_opt += load_options
742 if compile_options:
743 compile_opt += compile_options
744
745 return compile_opt, load_opt
746
747
748def _setup_entity_query(
749 compile_state,
750 mapper,
751 query_entity,
752 path,
753 adapter,
754 column_collection,
755 with_polymorphic=None,
756 only_load_props=None,
757 polymorphic_discriminator=None,
758 **kw,
759):
760 if with_polymorphic:
761 poly_properties = mapper._iterate_polymorphic_properties(
762 with_polymorphic
763 )
764 else:
765 poly_properties = mapper._polymorphic_properties
766
767 quick_populators = {}
768
769 path.set(compile_state.attributes, "memoized_setups", quick_populators)
770
771 # for the lead entities in the path, e.g. not eager loads, and
772 # assuming a user-passed aliased class, e.g. not a from_self() or any
773 # implicit aliasing, don't add columns to the SELECT that aren't
774 # in the thing that's aliased.
775 check_for_adapt = adapter and len(path) == 1 and path[-1].is_aliased_class
776
777 for value in poly_properties:
778 if only_load_props and value.key not in only_load_props:
779 continue
780 value.setup(
781 compile_state,
782 query_entity,
783 path,
784 adapter,
785 only_load_props=only_load_props,
786 column_collection=column_collection,
787 memoized_populators=quick_populators,
788 check_for_adapt=check_for_adapt,
789 **kw,
790 )
791
792 if (
793 polymorphic_discriminator is not None
794 and polymorphic_discriminator is not mapper.polymorphic_on
795 ):
796 if adapter:
797 pd = adapter.columns[polymorphic_discriminator]
798 else:
799 pd = polymorphic_discriminator
800 column_collection.append(pd)
801
802
803def _warn_for_runid_changed(state):
804 util.warn(
805 "Loading context for %s has changed within a load/refresh "
806 "handler, suggesting a row refresh operation took place. If this "
807 "event handler is expected to be "
808 "emitting row refresh operations within an existing load or refresh "
809 "operation, set restore_load_context=True when establishing the "
810 "listener to ensure the context remains unchanged when the event "
811 "handler completes." % (state_str(state),)
812 )
813
814
815def _instance_processor(
816 query_entity,
817 mapper,
818 context,
819 result,
820 path,
821 adapter,
822 only_load_props=None,
823 refresh_state=None,
824 polymorphic_discriminator=None,
825 _polymorphic_from=None,
826):
827 """Produce a mapper level row processor callable
828 which processes rows into mapped instances."""
829
830 # note that this method, most of which exists in a closure
831 # called _instance(), resists being broken out, as
832 # attempts to do so tend to add significant function
833 # call overhead. _instance() is the most
834 # performance-critical section in the whole ORM.
835
836 identity_class = mapper._identity_class
837 compile_state = context.compile_state
838
839 # look for "row getter" functions that have been assigned along
840 # with the compile state that were cached from a previous load.
841 # these are operator.itemgetter() objects that each will extract a
842 # particular column from each row.
843
844 getter_key = ("getters", mapper)
845 getters = path.get(compile_state.attributes, getter_key, None)
846
847 if getters is None:
848 # no getters, so go through a list of attributes we are loading for,
849 # and the ones that are column based will have already put information
850 # for us in another collection "memoized_setups", which represents the
851 # output of the LoaderStrategy.setup_query() method. We can just as
852 # easily call LoaderStrategy.create_row_processor for each, but by
853 # getting it all at once from setup_query we save another method call
854 # per attribute.
855 props = mapper._prop_set
856 if only_load_props is not None:
857 props = props.intersection(
858 mapper._props[k] for k in only_load_props
859 )
860
861 quick_populators = path.get(
862 context.attributes, "memoized_setups", EMPTY_DICT
863 )
864
865 todo = []
866 cached_populators = {
867 "new": [],
868 "quick": [],
869 "deferred": [],
870 "expire": [],
871 "existing": [],
872 "eager": [],
873 }
874
875 if refresh_state is None:
876 # we can also get the "primary key" tuple getter function
877 pk_cols = mapper.primary_key
878
879 if adapter:
880 pk_cols = [adapter.columns[c] for c in pk_cols]
881 primary_key_getter = result._tuple_getter(pk_cols)
882 else:
883 primary_key_getter = None
884
885 getters = {
886 "cached_populators": cached_populators,
887 "todo": todo,
888 "primary_key_getter": primary_key_getter,
889 }
890 for prop in props:
891 if prop in quick_populators:
892 # this is an inlined path just for column-based attributes.
893 col = quick_populators[prop]
894 if col is _DEFER_FOR_STATE:
895 cached_populators["new"].append(
896 (prop.key, prop._deferred_column_loader)
897 )
898 elif col is _SET_DEFERRED_EXPIRED:
899 # note that in this path, we are no longer
900 # searching in the result to see if the column might
901 # be present in some unexpected way.
902 cached_populators["expire"].append((prop.key, False))
903 elif col is _RAISE_FOR_STATE:
904 cached_populators["new"].append(
905 (prop.key, prop._raise_column_loader)
906 )
907 else:
908 getter = None
909 if adapter:
910 # this logic had been removed for all 1.4 releases
911 # up until 1.4.18; the adapter here is particularly
912 # the compound eager adapter which isn't accommodated
913 # in the quick_populators right now. The "fallback"
914 # logic below instead took over in many more cases
915 # until issue #6596 was identified.
916
917 # note there is still an issue where this codepath
918 # produces no "getter" for cases where a joined-inh
919 # mapping includes a labeled column property, meaning
920 # KeyError is caught internally and we fall back to
921 # _getter(col), which works anyway. The adapter
922 # here for joined inh without any aliasing might not
923 # be useful. Tests which see this include
924 # test.orm.inheritance.test_basic ->
925 # EagerTargetingTest.test_adapt_stringency
926 # OptimizedLoadTest.test_column_expression_joined
927 # PolymorphicOnNotLocalTest.test_polymorphic_on_column_prop # noqa: E501
928 #
929
930 adapted_col = adapter.columns[col]
931 if adapted_col is not None:
932 getter = result._getter(adapted_col, False)
933 if not getter:
934 getter = result._getter(col, False)
935 if getter:
936 cached_populators["quick"].append((prop.key, getter))
937 else:
938 # fall back to the ColumnProperty itself, which
939 # will iterate through all of its columns
940 # to see if one fits
941 prop.create_row_processor(
942 context,
943 query_entity,
944 path,
945 mapper,
946 result,
947 adapter,
948 cached_populators,
949 )
950 else:
951 # loader strategies like subqueryload, selectinload,
952 # joinedload, basically relationships, these need to interact
953 # with the context each time to work correctly.
954 todo.append(prop)
955
956 path.set(compile_state.attributes, getter_key, getters)
957
958 cached_populators = getters["cached_populators"]
959
960 populators = {key: list(value) for key, value in cached_populators.items()}
961 for prop in getters["todo"]:
962 prop.create_row_processor(
963 context, query_entity, path, mapper, result, adapter, populators
964 )
965
966 propagated_loader_options = context.propagated_loader_options
967 load_path = (
968 context.compile_state.current_path + path
969 if context.compile_state.current_path.path
970 else path
971 )
972
973 session_identity_map = context.session.identity_map
974
975 populate_existing = context.populate_existing or mapper.always_refresh
976 load_evt = bool(mapper.class_manager.dispatch.load)
977 refresh_evt = bool(mapper.class_manager.dispatch.refresh)
978 persistent_evt = bool(context.session.dispatch.loaded_as_persistent)
979 if persistent_evt:
980 loaded_as_persistent = context.session.dispatch.loaded_as_persistent
981 instance_state = attributes.instance_state
982 instance_dict = attributes.instance_dict
983 session_id = context.session.hash_key
984 runid = context.runid
985 identity_token = context.identity_token
986
987 version_check = context.version_check
988 if version_check:
989 version_id_col = mapper.version_id_col
990 if version_id_col is not None:
991 if adapter:
992 version_id_col = adapter.columns[version_id_col]
993 version_id_getter = result._getter(version_id_col)
994 else:
995 version_id_getter = None
996
997 if not refresh_state and _polymorphic_from is not None:
998 key = ("loader", path.path)
999
1000 if key in context.attributes and context.attributes[key].strategy == (
1001 ("selectinload_polymorphic", True),
1002 ):
1003 option_entities = context.attributes[key].local_opts["entities"]
1004 else:
1005 option_entities = None
1006 selectin_load_via = mapper._should_selectin_load(
1007 option_entities,
1008 _polymorphic_from,
1009 )
1010
1011 if selectin_load_via and selectin_load_via is not _polymorphic_from:
1012 # only_load_props goes w/ refresh_state only, and in a refresh
1013 # we are a single row query for the exact entity; polymorphic
1014 # loading does not apply
1015 assert only_load_props is None
1016
1017 if selectin_load_via.is_mapper:
1018 _load_supers = []
1019 _endmost_mapper = selectin_load_via
1020 while (
1021 _endmost_mapper
1022 and _endmost_mapper is not _polymorphic_from
1023 ):
1024 _load_supers.append(_endmost_mapper)
1025 _endmost_mapper = _endmost_mapper.inherits
1026 else:
1027 _load_supers = [selectin_load_via]
1028
1029 for _selectinload_entity in _load_supers:
1030 if PostLoad.path_exists(
1031 context, load_path, _selectinload_entity
1032 ):
1033 continue
1034 callable_ = _load_subclass_via_in(
1035 context,
1036 path,
1037 _selectinload_entity,
1038 _polymorphic_from,
1039 option_entities,
1040 )
1041 PostLoad.callable_for_path(
1042 context,
1043 load_path,
1044 _selectinload_entity.mapper,
1045 _selectinload_entity,
1046 callable_,
1047 _selectinload_entity,
1048 )
1049
1050 post_load = PostLoad.for_context(context, load_path, only_load_props)
1051
1052 if refresh_state:
1053 refresh_identity_key = refresh_state.key
1054 if refresh_identity_key is None:
1055 # super-rare condition; a refresh is being called
1056 # on a non-instance-key instance; this is meant to only
1057 # occur within a flush()
1058 refresh_identity_key = mapper._identity_key_from_state(
1059 refresh_state
1060 )
1061 else:
1062 refresh_identity_key = None
1063
1064 primary_key_getter = getters["primary_key_getter"]
1065
1066 if mapper.allow_partial_pks:
1067 is_not_primary_key = _none_set.issuperset
1068 else:
1069 is_not_primary_key = _none_set.intersection
1070
1071 def _instance(row):
1072 # determine the state that we'll be populating
1073 if refresh_identity_key:
1074 # fixed state that we're refreshing
1075 state = refresh_state
1076 instance = state.obj()
1077 dict_ = instance_dict(instance)
1078 isnew = state.runid != runid
1079 currentload = True
1080 loaded_instance = False
1081 else:
1082 # look at the row, see if that identity is in the
1083 # session, or we have to create a new one
1084 identitykey = (
1085 identity_class,
1086 primary_key_getter(row),
1087 identity_token,
1088 )
1089
1090 instance = session_identity_map.get(identitykey)
1091
1092 if instance is not None:
1093 # existing instance
1094 state = instance_state(instance)
1095 dict_ = instance_dict(instance)
1096
1097 isnew = state.runid != runid
1098 currentload = not isnew
1099 loaded_instance = False
1100
1101 if version_check and version_id_getter and not currentload:
1102 _validate_version_id(
1103 mapper, state, dict_, row, version_id_getter
1104 )
1105
1106 else:
1107 # create a new instance
1108
1109 # check for non-NULL values in the primary key columns,
1110 # else no entity is returned for the row
1111 if is_not_primary_key(identitykey[1]):
1112 return None
1113
1114 isnew = True
1115 currentload = True
1116 loaded_instance = True
1117
1118 instance = mapper.class_manager.new_instance()
1119
1120 dict_ = instance_dict(instance)
1121 state = instance_state(instance)
1122 state.key = identitykey
1123 state.identity_token = identity_token
1124
1125 # attach instance to session.
1126 state.session_id = session_id
1127 session_identity_map._add_unpresent(state, identitykey)
1128
1129 effective_populate_existing = populate_existing
1130 if refresh_state is state:
1131 effective_populate_existing = True
1132
1133 # populate. this looks at whether this state is new
1134 # for this load or was existing, and whether or not this
1135 # row is the first row with this identity.
1136 if currentload or effective_populate_existing:
1137 # full population routines. Objects here are either
1138 # just created, or we are doing a populate_existing
1139
1140 # be conservative about setting load_path when populate_existing
1141 # is in effect; want to maintain options from the original
1142 # load. see test_expire->test_refresh_maintains_deferred_options
1143 if isnew and (
1144 propagated_loader_options or not effective_populate_existing
1145 ):
1146 state.load_options = propagated_loader_options
1147 state.load_path = load_path
1148
1149 _populate_full(
1150 context,
1151 row,
1152 state,
1153 dict_,
1154 isnew,
1155 load_path,
1156 loaded_instance,
1157 effective_populate_existing,
1158 populators,
1159 )
1160
1161 if isnew:
1162 # state.runid should be equal to context.runid / runid
1163 # here, however for event checks we are being more conservative
1164 # and checking against existing run id
1165 # assert state.runid == runid
1166
1167 existing_runid = state.runid
1168
1169 if loaded_instance:
1170 if load_evt:
1171 state.manager.dispatch.load(state, context)
1172 if state.runid != existing_runid:
1173 _warn_for_runid_changed(state)
1174 if persistent_evt:
1175 loaded_as_persistent(context.session, state)
1176 if state.runid != existing_runid:
1177 _warn_for_runid_changed(state)
1178 elif refresh_evt:
1179 state.manager.dispatch.refresh(
1180 state, context, only_load_props
1181 )
1182 if state.runid != runid:
1183 _warn_for_runid_changed(state)
1184
1185 if effective_populate_existing or state.modified:
1186 if refresh_state and only_load_props:
1187 state._commit(dict_, only_load_props)
1188 else:
1189 state._commit_all(dict_, session_identity_map)
1190
1191 if post_load:
1192 post_load.add_state(state, True)
1193
1194 else:
1195 # partial population routines, for objects that were already
1196 # in the Session, but a row matches them; apply eager loaders
1197 # on existing objects, etc.
1198 unloaded = state.unloaded
1199 isnew = state not in context.partials
1200
1201 if not isnew or unloaded or populators["eager"]:
1202 # state is having a partial set of its attributes
1203 # refreshed. Populate those attributes,
1204 # and add to the "context.partials" collection.
1205
1206 to_load = _populate_partial(
1207 context,
1208 row,
1209 state,
1210 dict_,
1211 isnew,
1212 load_path,
1213 unloaded,
1214 populators,
1215 )
1216
1217 if isnew:
1218 if refresh_evt:
1219 existing_runid = state.runid
1220 state.manager.dispatch.refresh(state, context, to_load)
1221 if state.runid != existing_runid:
1222 _warn_for_runid_changed(state)
1223
1224 state._commit(dict_, to_load)
1225
1226 if post_load and context.invoke_all_eagers:
1227 post_load.add_state(state, False)
1228
1229 return instance
1230
1231 if mapper.polymorphic_map and not _polymorphic_from and not refresh_state:
1232 # if we are doing polymorphic, dispatch to a different _instance()
1233 # method specific to the subclass mapper
1234 def ensure_no_pk(row):
1235 identitykey = (
1236 identity_class,
1237 primary_key_getter(row),
1238 identity_token,
1239 )
1240 if not is_not_primary_key(identitykey[1]):
1241 return identitykey
1242 else:
1243 return None
1244
1245 _instance = _decorate_polymorphic_switch(
1246 _instance,
1247 context,
1248 query_entity,
1249 mapper,
1250 result,
1251 path,
1252 polymorphic_discriminator,
1253 adapter,
1254 ensure_no_pk,
1255 )
1256
1257 return _instance
1258
1259
1260def _load_subclass_via_in(
1261 context, path, entity, polymorphic_from, option_entities
1262):
1263 mapper = entity.mapper
1264
1265 # TODO: polymorphic_from seems to be a Mapper in all cases.
1266 # this is likely not needed, but as we dont have typing in loading.py
1267 # yet, err on the safe side
1268 polymorphic_from_mapper = polymorphic_from.mapper
1269 not_against_basemost = polymorphic_from_mapper.inherits is not None
1270
1271 zero_idx = len(mapper.base_mapper.primary_key) == 1
1272
1273 if entity.is_aliased_class or not_against_basemost:
1274 q, enable_opt, disable_opt = mapper._subclass_load_via_in(
1275 entity, polymorphic_from
1276 )
1277 else:
1278 q, enable_opt, disable_opt = mapper._subclass_load_via_in_mapper
1279
1280 def do_load(context, path, states, load_only, effective_entity):
1281 if not option_entities:
1282 # filter out states for those that would have selectinloaded
1283 # from another loader
1284 # TODO: we are currently ignoring the case where the
1285 # "selectin_polymorphic" option is used, as this is much more
1286 # complex / specific / very uncommon API use
1287 states = [
1288 (s, v)
1289 for s, v in states
1290 if s.mapper._would_selectin_load_only_from_given_mapper(mapper)
1291 ]
1292
1293 if not states:
1294 return
1295
1296 orig_query = context.query
1297
1298 if path.parent:
1299 enable_opt_lcl = enable_opt._prepend_path(path)
1300 disable_opt_lcl = disable_opt._prepend_path(path)
1301 else:
1302 enable_opt_lcl = enable_opt
1303 disable_opt_lcl = disable_opt
1304 options = (
1305 (enable_opt_lcl,) + orig_query._with_options + (disable_opt_lcl,)
1306 )
1307
1308 q2 = q.options(*options)
1309
1310 q2._compile_options = context.compile_state.default_compile_options
1311 q2._compile_options += {"_current_path": path.parent}
1312
1313 if context.populate_existing:
1314 q2 = q2.execution_options(populate_existing=True)
1315
1316 context.session.execute(
1317 q2,
1318 dict(
1319 primary_keys=[
1320 state.key[1][0] if zero_idx else state.key[1]
1321 for state, load_attrs in states
1322 ]
1323 ),
1324 ).unique().scalars().all()
1325
1326 return do_load
1327
1328
1329def _populate_full(
1330 context,
1331 row,
1332 state,
1333 dict_,
1334 isnew,
1335 load_path,
1336 loaded_instance,
1337 populate_existing,
1338 populators,
1339):
1340 if isnew:
1341 # first time we are seeing a row with this identity.
1342 state.runid = context.runid
1343
1344 for key, getter in populators["quick"]:
1345 dict_[key] = getter(row)
1346 if populate_existing:
1347 for key, set_callable in populators["expire"]:
1348 dict_.pop(key, None)
1349 if set_callable:
1350 state.expired_attributes.add(key)
1351 else:
1352 for key, set_callable in populators["expire"]:
1353 if set_callable:
1354 state.expired_attributes.add(key)
1355
1356 for key, populator in populators["new"]:
1357 populator(state, dict_, row)
1358
1359 elif load_path != state.load_path:
1360 # new load path, e.g. object is present in more than one
1361 # column position in a series of rows
1362 state.load_path = load_path
1363
1364 # if we have data, and the data isn't in the dict, OK, let's put
1365 # it in.
1366 for key, getter in populators["quick"]:
1367 if key not in dict_:
1368 dict_[key] = getter(row)
1369
1370 # otherwise treat like an "already seen" row
1371 for key, populator in populators["existing"]:
1372 populator(state, dict_, row)
1373 # TODO: allow "existing" populator to know this is
1374 # a new path for the state:
1375 # populator(state, dict_, row, new_path=True)
1376
1377 else:
1378 # have already seen rows with this identity in this same path.
1379 for key, populator in populators["existing"]:
1380 populator(state, dict_, row)
1381
1382 # TODO: same path
1383 # populator(state, dict_, row, new_path=False)
1384
1385
1386def _populate_partial(
1387 context, row, state, dict_, isnew, load_path, unloaded, populators
1388):
1389 if not isnew:
1390 if unloaded:
1391 # extra pass, see #8166
1392 for key, getter in populators["quick"]:
1393 if key in unloaded:
1394 dict_[key] = getter(row)
1395
1396 to_load = context.partials[state]
1397 for key, populator in populators["existing"]:
1398 if key in to_load:
1399 populator(state, dict_, row)
1400 else:
1401 to_load = unloaded
1402 context.partials[state] = to_load
1403
1404 for key, getter in populators["quick"]:
1405 if key in to_load:
1406 dict_[key] = getter(row)
1407 for key, set_callable in populators["expire"]:
1408 if key in to_load:
1409 dict_.pop(key, None)
1410 if set_callable:
1411 state.expired_attributes.add(key)
1412 for key, populator in populators["new"]:
1413 if key in to_load:
1414 populator(state, dict_, row)
1415
1416 for key, populator in populators["eager"]:
1417 if key not in unloaded:
1418 populator(state, dict_, row)
1419
1420 return to_load
1421
1422
1423def _validate_version_id(mapper, state, dict_, row, getter):
1424 if mapper._get_state_attr_by_column(
1425 state, dict_, mapper.version_id_col
1426 ) != getter(row):
1427 raise orm_exc.StaleDataError(
1428 "Instance '%s' has version id '%s' which "
1429 "does not match database-loaded version id '%s'."
1430 % (
1431 state_str(state),
1432 mapper._get_state_attr_by_column(
1433 state, dict_, mapper.version_id_col
1434 ),
1435 getter(row),
1436 )
1437 )
1438
1439
1440def _decorate_polymorphic_switch(
1441 instance_fn,
1442 context,
1443 query_entity,
1444 mapper,
1445 result,
1446 path,
1447 polymorphic_discriminator,
1448 adapter,
1449 ensure_no_pk,
1450):
1451 if polymorphic_discriminator is not None:
1452 polymorphic_on = polymorphic_discriminator
1453 else:
1454 polymorphic_on = mapper.polymorphic_on
1455 if polymorphic_on is None:
1456 return instance_fn
1457
1458 if adapter:
1459 polymorphic_on = adapter.columns[polymorphic_on]
1460
1461 def configure_subclass_mapper(discriminator):
1462 try:
1463 sub_mapper = mapper.polymorphic_map[discriminator]
1464 except KeyError:
1465 raise AssertionError(
1466 "No such polymorphic_identity %r is defined" % discriminator
1467 )
1468 else:
1469 if sub_mapper is mapper:
1470 return None
1471 elif not sub_mapper.isa(mapper):
1472 return False
1473
1474 return _instance_processor(
1475 query_entity,
1476 sub_mapper,
1477 context,
1478 result,
1479 path,
1480 adapter,
1481 _polymorphic_from=mapper,
1482 )
1483
1484 polymorphic_instances = util.PopulateDict(configure_subclass_mapper)
1485
1486 getter = result._getter(polymorphic_on)
1487
1488 def polymorphic_instance(row):
1489 discriminator = getter(row)
1490 if discriminator is not None:
1491 _instance = polymorphic_instances[discriminator]
1492 if _instance:
1493 return _instance(row)
1494 elif _instance is False:
1495 identitykey = ensure_no_pk(row)
1496
1497 if identitykey:
1498 raise sa_exc.InvalidRequestError(
1499 "Row with identity key %s can't be loaded into an "
1500 "object; the polymorphic discriminator column '%s' "
1501 "refers to %s, which is not a sub-mapper of "
1502 "the requested %s"
1503 % (
1504 identitykey,
1505 polymorphic_on,
1506 mapper.polymorphic_map[discriminator],
1507 mapper,
1508 )
1509 )
1510 else:
1511 return None
1512 else:
1513 return instance_fn(row)
1514 else:
1515 identitykey = ensure_no_pk(row)
1516
1517 if identitykey:
1518 raise sa_exc.InvalidRequestError(
1519 "Row with identity key %s can't be loaded into an "
1520 "object; the polymorphic discriminator column '%s' is "
1521 "NULL" % (identitykey, polymorphic_on)
1522 )
1523 else:
1524 return None
1525
1526 return polymorphic_instance
1527
1528
1529class PostLoad:
1530 """Track loaders and states for "post load" operations."""
1531
1532 __slots__ = "loaders", "states", "load_keys"
1533
1534 def __init__(self):
1535 self.loaders = {}
1536 self.states = util.OrderedDict()
1537 self.load_keys = None
1538
1539 def add_state(self, state, overwrite):
1540 # the states for a polymorphic load here are all shared
1541 # within a single PostLoad object among multiple subtypes.
1542 # Filtering of callables on a per-subclass basis needs to be done at
1543 # the invocation level
1544 self.states[state] = overwrite
1545
1546 def invoke(self, context, path):
1547 if not self.states:
1548 return
1549 path = path_registry.PathRegistry.coerce(path)
1550 for (
1551 effective_context,
1552 token,
1553 limit_to_mapper,
1554 loader,
1555 arg,
1556 kw,
1557 ) in self.loaders.values():
1558 states = [
1559 (state, overwrite)
1560 for state, overwrite in self.states.items()
1561 if state.manager.mapper.isa(limit_to_mapper)
1562 ]
1563 if states:
1564 loader(
1565 effective_context, path, states, self.load_keys, *arg, **kw
1566 )
1567 self.states.clear()
1568
1569 @classmethod
1570 def for_context(cls, context, path, only_load_props):
1571 pl = context.post_load_paths.get(path.path)
1572 if pl is not None and only_load_props:
1573 pl.load_keys = only_load_props
1574 return pl
1575
1576 @classmethod
1577 def path_exists(self, context, path, key):
1578 return (
1579 path.path in context.post_load_paths
1580 and key in context.post_load_paths[path.path].loaders
1581 )
1582
1583 @classmethod
1584 def callable_for_path(
1585 cls, context, path, limit_to_mapper, token, loader_callable, *arg, **kw
1586 ):
1587 if path.path in context.post_load_paths:
1588 pl = context.post_load_paths[path.path]
1589 else:
1590 pl = context.post_load_paths[path.path] = PostLoad()
1591 pl.loaders[token] = (
1592 context,
1593 token,
1594 limit_to_mapper,
1595 loader_callable,
1596 arg,
1597 kw,
1598 )
1599
1600
1601def load_scalar_attributes(mapper, state, attribute_names, passive):
1602 """initiate a column-based attribute refresh operation."""
1603
1604 # assert mapper is _state_mapper(state)
1605 session = state.session
1606 if not session:
1607 raise orm_exc.DetachedInstanceError(
1608 "Instance %s is not bound to a Session; "
1609 "attribute refresh operation cannot proceed" % (state_str(state))
1610 )
1611
1612 no_autoflush = bool(passive & attributes.NO_AUTOFLUSH)
1613
1614 # in the case of inheritance, particularly concrete and abstract
1615 # concrete inheritance, the class manager might have some keys
1616 # of attributes on the superclass that we didn't actually map.
1617 # These could be mapped as "concrete, don't load" or could be completely
1618 # excluded from the mapping and we know nothing about them. Filter them
1619 # here to prevent them from coming through.
1620 if attribute_names:
1621 attribute_names = attribute_names.intersection(mapper.attrs.keys())
1622
1623 if mapper.inherits and not mapper.concrete:
1624 # load based on committed attributes in the object, formed into
1625 # a truncated SELECT that only includes relevant tables. does not
1626 # currently use state.key
1627 statement = mapper._optimized_get_statement(state, attribute_names)
1628 if statement is not None:
1629 # undefer() isn't needed here because statement has the
1630 # columns needed already, this implicitly undefers that column
1631 stmt = FromStatement(mapper, statement)
1632
1633 return load_on_ident(
1634 session,
1635 stmt,
1636 None,
1637 only_load_props=attribute_names,
1638 refresh_state=state,
1639 no_autoflush=no_autoflush,
1640 )
1641
1642 # normal load, use state.key as the identity to SELECT
1643 has_key = bool(state.key)
1644
1645 if has_key:
1646 identity_key = state.key
1647 else:
1648 # this codepath is rare - only valid when inside a flush, and the
1649 # object is becoming persistent but hasn't yet been assigned
1650 # an identity_key.
1651 # check here to ensure we have the attrs we need.
1652 pk_attrs = [
1653 mapper._columntoproperty[col].key for col in mapper.primary_key
1654 ]
1655 if state.expired_attributes.intersection(pk_attrs):
1656 raise sa_exc.InvalidRequestError(
1657 "Instance %s cannot be refreshed - it's not "
1658 " persistent and does not "
1659 "contain a full primary key." % state_str(state)
1660 )
1661 identity_key = mapper._identity_key_from_state(state)
1662
1663 if (
1664 _none_set.issubset(identity_key) and not mapper.allow_partial_pks
1665 ) or _none_set.issuperset(identity_key):
1666 util.warn_limited(
1667 "Instance %s to be refreshed doesn't "
1668 "contain a full primary key - can't be refreshed "
1669 "(and shouldn't be expired, either).",
1670 state_str(state),
1671 )
1672 return
1673
1674 result = load_on_ident(
1675 session,
1676 select(mapper).set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL),
1677 identity_key,
1678 refresh_state=state,
1679 only_load_props=attribute_names,
1680 no_autoflush=no_autoflush,
1681 )
1682
1683 # if instance is pending, a refresh operation
1684 # may not complete (even if PK attributes are assigned)
1685 if has_key and result is None:
1686 raise orm_exc.ObjectDeletedError(state)