Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/loading.py: 9%
566 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
1# orm/loading.py
2# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
8"""private module containing functions used to convert database
9rows into object instances and associated state.
11the functions here are called primarily by Query, Mapper,
12as well as some of the attribute loading strategies.
14"""
15from __future__ import absolute_import
17from . import attributes
18from . import exc as orm_exc
19from . import path_registry
20from . import strategy_options
21from .base import _DEFER_FOR_STATE
22from .base import _RAISE_FOR_STATE
23from .base import _SET_DEFERRED_EXPIRED
24from .util import _none_set
25from .util import state_str
26from .. import exc as sa_exc
27from .. import future
28from .. import util
29from ..engine import result_tuple
30from ..engine.result import ChunkedIteratorResult
31from ..engine.result import FrozenResult
32from ..engine.result import SimpleResultMetaData
33from ..sql import util as sql_util
34from ..sql.selectable import LABEL_STYLE_TABLENAME_PLUS_COL
35from ..sql.selectable import SelectState
37_new_runid = util.counter()
40def instances(cursor, context):
41 """Return a :class:`.Result` given an ORM query context.
43 :param cursor: a :class:`.CursorResult`, generated by a statement
44 which came from :class:`.ORMCompileState`
46 :param context: a :class:`.QueryContext` object
48 :return: a :class:`.Result` object representing ORM results
50 .. versionchanged:: 1.4 The instances() function now uses
51 :class:`.Result` objects and has an all new interface.
53 """
55 context.runid = _new_runid()
56 context.post_load_paths = {}
58 compile_state = context.compile_state
59 filtered = compile_state._has_mapper_entities
60 single_entity = (
61 not context.load_options._only_return_tuples
62 and len(compile_state._entities) == 1
63 and compile_state._entities[0].supports_single_entity
64 )
66 try:
67 (process, labels, extra) = list(
68 zip(
69 *[
70 query_entity.row_processor(context, cursor)
71 for query_entity in context.compile_state._entities
72 ]
73 )
74 )
76 if context.yield_per and (
77 context.loaders_require_buffering
78 or context.loaders_require_uniquing
79 ):
80 raise sa_exc.InvalidRequestError(
81 "Can't use yield_per with eager loaders that require uniquing "
82 "or row buffering, e.g. joinedload() against collections "
83 "or subqueryload(). Consider the selectinload() strategy "
84 "for better flexibility in loading objects."
85 )
87 except Exception:
88 with util.safe_reraise():
89 cursor.close()
91 def _no_unique(entry):
92 raise sa_exc.InvalidRequestError(
93 "Can't use the ORM yield_per feature in conjunction with unique()"
94 )
96 def _not_hashable(datatype):
97 def go(obj):
98 raise sa_exc.InvalidRequestError(
99 "Can't apply uniqueness to row tuple containing value of "
100 "type %r; this datatype produces non-hashable values"
101 % datatype
102 )
104 return go
106 if context.load_options._legacy_uniquing:
107 unique_filters = [
108 _no_unique
109 if context.yield_per
110 else id
111 if (
112 ent.use_id_for_hash
113 or ent._non_hashable_value
114 or ent._null_column_type
115 )
116 else None
117 for ent in context.compile_state._entities
118 ]
119 else:
120 unique_filters = [
121 _no_unique
122 if context.yield_per
123 else _not_hashable(ent.column.type)
124 if (not ent.use_id_for_hash and ent._non_hashable_value)
125 else id
126 if ent.use_id_for_hash
127 else None
128 for ent in context.compile_state._entities
129 ]
131 row_metadata = SimpleResultMetaData(
132 labels, extra, _unique_filters=unique_filters
133 )
135 def chunks(size):
136 while True:
137 yield_per = size
139 context.partials = {}
141 if yield_per:
142 fetch = cursor.fetchmany(yield_per)
144 if not fetch:
145 break
146 else:
147 fetch = cursor._raw_all_rows()
149 if single_entity:
150 proc = process[0]
151 rows = [proc(row) for row in fetch]
152 else:
153 rows = [
154 tuple([proc(row) for proc in process]) for row in fetch
155 ]
157 for path, post_load in context.post_load_paths.items():
158 post_load.invoke(context, path)
160 yield rows
162 if not yield_per:
163 break
165 if context.execution_options.get("prebuffer_rows", False):
166 # this is a bit of a hack at the moment.
167 # I would rather have some option in the result to pre-buffer
168 # internally.
169 _prebuffered = list(chunks(None))
171 def chunks(size):
172 return iter(_prebuffered)
174 result = ChunkedIteratorResult(
175 row_metadata,
176 chunks,
177 source_supports_scalars=single_entity,
178 raw=cursor,
179 dynamic_yield_per=cursor.context._is_server_side,
180 )
182 # filtered and single_entity are used to indicate to legacy Query that the
183 # query has ORM entities, so legacy deduping and scalars should be called
184 # on the result.
185 result._attributes = result._attributes.union(
186 dict(filtered=filtered, is_single_entity=single_entity)
187 )
189 # multi_row_eager_loaders OTOH is specific to joinedload.
190 if context.compile_state.multi_row_eager_loaders:
192 def require_unique(obj):
193 raise sa_exc.InvalidRequestError(
194 "The unique() method must be invoked on this Result, "
195 "as it contains results that include joined eager loads "
196 "against collections"
197 )
199 result._unique_filter_state = (None, require_unique)
201 if context.yield_per:
202 result.yield_per(context.yield_per)
204 return result
207@util.preload_module("sqlalchemy.orm.context")
208def merge_frozen_result(session, statement, frozen_result, load=True):
209 """Merge a :class:`_engine.FrozenResult` back into a :class:`_orm.Session`,
210 returning a new :class:`_engine.Result` object with :term:`persistent`
211 objects.
213 See the section :ref:`do_orm_execute_re_executing` for an example.
215 .. seealso::
217 :ref:`do_orm_execute_re_executing`
219 :meth:`_engine.Result.freeze`
221 :class:`_engine.FrozenResult`
223 """
224 querycontext = util.preloaded.orm_context
226 if load:
227 # flush current contents if we expect to load data
228 session._autoflush()
230 ctx = querycontext.ORMSelectCompileState._create_entities_collection(
231 statement, legacy=False
232 )
234 autoflush = session.autoflush
235 try:
236 session.autoflush = False
237 mapped_entities = [
238 i
239 for i, e in enumerate(ctx._entities)
240 if isinstance(e, querycontext._MapperEntity)
241 ]
242 keys = [ent._label_name for ent in ctx._entities]
244 keyed_tuple = result_tuple(
245 keys, [ent._extra_entities for ent in ctx._entities]
246 )
248 result = []
249 for newrow in frozen_result.rewrite_rows():
250 for i in mapped_entities:
251 if newrow[i] is not None:
252 newrow[i] = session._merge(
253 attributes.instance_state(newrow[i]),
254 attributes.instance_dict(newrow[i]),
255 load=load,
256 _recursive={},
257 _resolve_conflict_map={},
258 )
260 result.append(keyed_tuple(newrow))
262 return frozen_result.with_new_rows(result)
263 finally:
264 session.autoflush = autoflush
267@util.deprecated_20(
268 ":func:`_orm.merge_result`",
269 alternative="The function as well as the method on :class:`_orm.Query` "
270 "is superseded by the :func:`_orm.merge_frozen_result` function.",
271 becomes_legacy=True,
272)
273@util.preload_module("sqlalchemy.orm.context")
274def merge_result(query, iterator, load=True):
275 """Merge a result into the given :class:`.Query` object's Session.
277 See :meth:`_orm.Query.merge_result` for top-level documentation on this
278 function.
280 """
282 querycontext = util.preloaded.orm_context
284 session = query.session
285 if load:
286 # flush current contents if we expect to load data
287 session._autoflush()
289 # TODO: need test coverage and documentation for the FrozenResult
290 # use case.
291 if isinstance(iterator, FrozenResult):
292 frozen_result = iterator
293 iterator = iter(frozen_result.data)
294 else:
295 frozen_result = None
297 ctx = querycontext.ORMSelectCompileState._create_entities_collection(
298 query, legacy=True
299 )
301 autoflush = session.autoflush
302 try:
303 session.autoflush = False
304 single_entity = not frozen_result and len(ctx._entities) == 1
306 if single_entity:
307 if isinstance(ctx._entities[0], querycontext._MapperEntity):
308 result = [
309 session._merge(
310 attributes.instance_state(instance),
311 attributes.instance_dict(instance),
312 load=load,
313 _recursive={},
314 _resolve_conflict_map={},
315 )
316 for instance in iterator
317 ]
318 else:
319 result = list(iterator)
320 else:
321 mapped_entities = [
322 i
323 for i, e in enumerate(ctx._entities)
324 if isinstance(e, querycontext._MapperEntity)
325 ]
326 result = []
327 keys = [ent._label_name for ent in ctx._entities]
329 keyed_tuple = result_tuple(
330 keys, [ent._extra_entities for ent in ctx._entities]
331 )
333 for row in iterator:
334 newrow = list(row)
335 for i in mapped_entities:
336 if newrow[i] is not None:
337 newrow[i] = session._merge(
338 attributes.instance_state(newrow[i]),
339 attributes.instance_dict(newrow[i]),
340 load=load,
341 _recursive={},
342 _resolve_conflict_map={},
343 )
344 result.append(keyed_tuple(newrow))
346 if frozen_result:
347 return frozen_result.with_data(result)
348 else:
349 return iter(result)
350 finally:
351 session.autoflush = autoflush
354def get_from_identity(session, mapper, key, passive):
355 """Look up the given key in the given session's identity map,
356 check the object for expired state if found.
358 """
359 instance = session.identity_map.get(key)
360 if instance is not None:
362 state = attributes.instance_state(instance)
364 if mapper.inherits and not state.mapper.isa(mapper):
365 return attributes.PASSIVE_CLASS_MISMATCH
367 # expired - ensure it still exists
368 if state.expired:
369 if not passive & attributes.SQL_OK:
370 # TODO: no coverage here
371 return attributes.PASSIVE_NO_RESULT
372 elif not passive & attributes.RELATED_OBJECT_OK:
373 # this mode is used within a flush and the instance's
374 # expired state will be checked soon enough, if necessary.
375 # also used by immediateloader for a mutually-dependent
376 # o2m->m2m load, :ticket:`6301`
377 return instance
378 try:
379 state._load_expired(state, passive)
380 except orm_exc.ObjectDeletedError:
381 session._remove_newly_deleted([state])
382 return None
383 return instance
384 else:
385 return None
388def load_on_ident(
389 session,
390 statement,
391 key,
392 load_options=None,
393 refresh_state=None,
394 with_for_update=None,
395 only_load_props=None,
396 no_autoflush=False,
397 bind_arguments=util.EMPTY_DICT,
398 execution_options=util.EMPTY_DICT,
399):
400 """Load the given identity key from the database."""
401 if key is not None:
402 ident = key[1]
403 identity_token = key[2]
404 else:
405 ident = identity_token = None
407 return load_on_pk_identity(
408 session,
409 statement,
410 ident,
411 load_options=load_options,
412 refresh_state=refresh_state,
413 with_for_update=with_for_update,
414 only_load_props=only_load_props,
415 identity_token=identity_token,
416 no_autoflush=no_autoflush,
417 bind_arguments=bind_arguments,
418 execution_options=execution_options,
419 )
422def load_on_pk_identity(
423 session,
424 statement,
425 primary_key_identity,
426 load_options=None,
427 refresh_state=None,
428 with_for_update=None,
429 only_load_props=None,
430 identity_token=None,
431 no_autoflush=False,
432 bind_arguments=util.EMPTY_DICT,
433 execution_options=util.EMPTY_DICT,
434):
436 """Load the given primary key identity from the database."""
438 query = statement
439 q = query._clone()
441 assert not q._is_lambda_element
443 # TODO: fix these imports ....
444 from .context import QueryContext, ORMCompileState
446 if load_options is None:
447 load_options = QueryContext.default_load_options
449 if (
450 statement._compile_options
451 is SelectState.default_select_compile_options
452 ):
453 compile_options = ORMCompileState.default_compile_options
454 else:
455 compile_options = statement._compile_options
457 if primary_key_identity is not None:
458 mapper = query._propagate_attrs["plugin_subject"]
460 (_get_clause, _get_params) = mapper._get_clause
462 # None present in ident - turn those comparisons
463 # into "IS NULL"
464 if None in primary_key_identity:
465 nones = set(
466 [
467 _get_params[col].key
468 for col, value in zip(
469 mapper.primary_key, primary_key_identity
470 )
471 if value is None
472 ]
473 )
475 _get_clause = sql_util.adapt_criterion_to_null(_get_clause, nones)
477 if len(nones) == len(primary_key_identity):
478 util.warn(
479 "fully NULL primary key identity cannot load any "
480 "object. This condition may raise an error in a future "
481 "release."
482 )
484 q._where_criteria = (
485 sql_util._deep_annotate(_get_clause, {"_orm_adapt": True}),
486 )
488 params = dict(
489 [
490 (_get_params[primary_key].key, id_val)
491 for id_val, primary_key in zip(
492 primary_key_identity, mapper.primary_key
493 )
494 ]
495 )
496 else:
497 params = None
499 if with_for_update is not None:
500 version_check = True
501 q._for_update_arg = with_for_update
502 elif query._for_update_arg is not None:
503 version_check = True
504 q._for_update_arg = query._for_update_arg
505 else:
506 version_check = False
508 if refresh_state and refresh_state.load_options:
509 compile_options += {"_current_path": refresh_state.load_path.parent}
510 q = q.options(*refresh_state.load_options)
512 new_compile_options, load_options = _set_get_options(
513 compile_options,
514 load_options,
515 version_check=version_check,
516 only_load_props=only_load_props,
517 refresh_state=refresh_state,
518 identity_token=identity_token,
519 )
520 q._compile_options = new_compile_options
521 q._order_by = None
523 if no_autoflush:
524 load_options += {"_autoflush": False}
526 execution_options = util.EMPTY_DICT.merge_with(
527 execution_options, {"_sa_orm_load_options": load_options}
528 )
529 result = (
530 session.execute(
531 q,
532 params=params,
533 execution_options=execution_options,
534 bind_arguments=bind_arguments,
535 )
536 .unique()
537 .scalars()
538 )
540 try:
541 return result.one()
542 except orm_exc.NoResultFound:
543 return None
546def _set_get_options(
547 compile_opt,
548 load_opt,
549 populate_existing=None,
550 version_check=None,
551 only_load_props=None,
552 refresh_state=None,
553 identity_token=None,
554):
556 compile_options = {}
557 load_options = {}
558 if version_check:
559 load_options["_version_check"] = version_check
560 if populate_existing:
561 load_options["_populate_existing"] = populate_existing
562 if refresh_state:
563 load_options["_refresh_state"] = refresh_state
564 compile_options["_for_refresh_state"] = True
565 if only_load_props:
566 compile_options["_only_load_props"] = frozenset(only_load_props)
567 if identity_token:
568 load_options["_refresh_identity_token"] = identity_token
570 if load_options:
571 load_opt += load_options
572 if compile_options:
573 compile_opt += compile_options
575 return compile_opt, load_opt
578def _setup_entity_query(
579 compile_state,
580 mapper,
581 query_entity,
582 path,
583 adapter,
584 column_collection,
585 with_polymorphic=None,
586 only_load_props=None,
587 polymorphic_discriminator=None,
588 **kw
589):
591 if with_polymorphic:
592 poly_properties = mapper._iterate_polymorphic_properties(
593 with_polymorphic
594 )
595 else:
596 poly_properties = mapper._polymorphic_properties
598 quick_populators = {}
600 path.set(compile_state.attributes, "memoized_setups", quick_populators)
602 # for the lead entities in the path, e.g. not eager loads, and
603 # assuming a user-passed aliased class, e.g. not a from_self() or any
604 # implicit aliasing, don't add columns to the SELECT that aren't
605 # in the thing that's aliased.
606 check_for_adapt = adapter and len(path) == 1 and path[-1].is_aliased_class
608 for value in poly_properties:
609 if only_load_props and value.key not in only_load_props:
610 continue
612 value.setup(
613 compile_state,
614 query_entity,
615 path,
616 adapter,
617 only_load_props=only_load_props,
618 column_collection=column_collection,
619 memoized_populators=quick_populators,
620 check_for_adapt=check_for_adapt,
621 **kw
622 )
624 if (
625 polymorphic_discriminator is not None
626 and polymorphic_discriminator is not mapper.polymorphic_on
627 ):
629 if adapter:
630 pd = adapter.columns[polymorphic_discriminator]
631 else:
632 pd = polymorphic_discriminator
633 column_collection.append(pd)
636def _warn_for_runid_changed(state):
637 util.warn(
638 "Loading context for %s has changed within a load/refresh "
639 "handler, suggesting a row refresh operation took place. If this "
640 "event handler is expected to be "
641 "emitting row refresh operations within an existing load or refresh "
642 "operation, set restore_load_context=True when establishing the "
643 "listener to ensure the context remains unchanged when the event "
644 "handler completes." % (state_str(state),)
645 )
648def _instance_processor(
649 query_entity,
650 mapper,
651 context,
652 result,
653 path,
654 adapter,
655 only_load_props=None,
656 refresh_state=None,
657 polymorphic_discriminator=None,
658 _polymorphic_from=None,
659):
660 """Produce a mapper level row processor callable
661 which processes rows into mapped instances."""
663 # note that this method, most of which exists in a closure
664 # called _instance(), resists being broken out, as
665 # attempts to do so tend to add significant function
666 # call overhead. _instance() is the most
667 # performance-critical section in the whole ORM.
669 identity_class = mapper._identity_class
670 compile_state = context.compile_state
672 # look for "row getter" functions that have been assigned along
673 # with the compile state that were cached from a previous load.
674 # these are operator.itemgetter() objects that each will extract a
675 # particular column from each row.
677 getter_key = ("getters", mapper)
678 getters = path.get(compile_state.attributes, getter_key, None)
680 if getters is None:
681 # no getters, so go through a list of attributes we are loading for,
682 # and the ones that are column based will have already put information
683 # for us in another collection "memoized_setups", which represents the
684 # output of the LoaderStrategy.setup_query() method. We can just as
685 # easily call LoaderStrategy.create_row_processor for each, but by
686 # getting it all at once from setup_query we save another method call
687 # per attribute.
688 props = mapper._prop_set
689 if only_load_props is not None:
690 props = props.intersection(
691 mapper._props[k] for k in only_load_props
692 )
694 quick_populators = path.get(
695 context.attributes, "memoized_setups", _none_set
696 )
698 todo = []
699 cached_populators = {
700 "new": [],
701 "quick": [],
702 "deferred": [],
703 "expire": [],
704 "delayed": [],
705 "existing": [],
706 "eager": [],
707 }
709 if refresh_state is None:
710 # we can also get the "primary key" tuple getter function
711 pk_cols = mapper.primary_key
713 if adapter:
714 pk_cols = [adapter.columns[c] for c in pk_cols]
715 primary_key_getter = result._tuple_getter(pk_cols)
716 else:
717 primary_key_getter = None
719 getters = {
720 "cached_populators": cached_populators,
721 "todo": todo,
722 "primary_key_getter": primary_key_getter,
723 }
724 for prop in props:
725 if prop in quick_populators:
726 # this is an inlined path just for column-based attributes.
727 col = quick_populators[prop]
728 if col is _DEFER_FOR_STATE:
729 cached_populators["new"].append(
730 (prop.key, prop._deferred_column_loader)
731 )
732 elif col is _SET_DEFERRED_EXPIRED:
733 # note that in this path, we are no longer
734 # searching in the result to see if the column might
735 # be present in some unexpected way.
736 cached_populators["expire"].append((prop.key, False))
737 elif col is _RAISE_FOR_STATE:
738 cached_populators["new"].append(
739 (prop.key, prop._raise_column_loader)
740 )
741 else:
742 getter = None
743 if adapter:
744 # this logic had been removed for all 1.4 releases
745 # up until 1.4.18; the adapter here is particularly
746 # the compound eager adapter which isn't accommodated
747 # in the quick_populators right now. The "fallback"
748 # logic below instead took over in many more cases
749 # until issue #6596 was identified.
751 # note there is still an issue where this codepath
752 # produces no "getter" for cases where a joined-inh
753 # mapping includes a labeled column property, meaning
754 # KeyError is caught internally and we fall back to
755 # _getter(col), which works anyway. The adapter
756 # here for joined inh without any aliasing might not
757 # be useful. Tests which see this include
758 # test.orm.inheritance.test_basic ->
759 # EagerTargetingTest.test_adapt_stringency
760 # OptimizedLoadTest.test_column_expression_joined
761 # PolymorphicOnNotLocalTest.test_polymorphic_on_column_prop # noqa: E501
762 #
764 adapted_col = adapter.columns[col]
765 if adapted_col is not None:
766 getter = result._getter(adapted_col, False)
767 if not getter:
768 getter = result._getter(col, False)
769 if getter:
770 cached_populators["quick"].append((prop.key, getter))
771 else:
772 # fall back to the ColumnProperty itself, which
773 # will iterate through all of its columns
774 # to see if one fits
775 prop.create_row_processor(
776 context,
777 query_entity,
778 path,
779 mapper,
780 result,
781 adapter,
782 cached_populators,
783 )
784 else:
785 # loader strategies like subqueryload, selectinload,
786 # joinedload, basically relationships, these need to interact
787 # with the context each time to work correctly.
788 todo.append(prop)
790 path.set(compile_state.attributes, getter_key, getters)
792 cached_populators = getters["cached_populators"]
794 populators = {key: list(value) for key, value in cached_populators.items()}
795 for prop in getters["todo"]:
796 prop.create_row_processor(
797 context, query_entity, path, mapper, result, adapter, populators
798 )
800 propagated_loader_options = context.propagated_loader_options
801 load_path = (
802 context.compile_state.current_path + path
803 if context.compile_state.current_path.path
804 else path
805 )
807 session_identity_map = context.session.identity_map
809 populate_existing = context.populate_existing or mapper.always_refresh
810 load_evt = bool(mapper.class_manager.dispatch.load)
811 refresh_evt = bool(mapper.class_manager.dispatch.refresh)
812 persistent_evt = bool(context.session.dispatch.loaded_as_persistent)
813 if persistent_evt:
814 loaded_as_persistent = context.session.dispatch.loaded_as_persistent
815 instance_state = attributes.instance_state
816 instance_dict = attributes.instance_dict
817 session_id = context.session.hash_key
818 runid = context.runid
819 identity_token = context.identity_token
821 version_check = context.version_check
822 if version_check:
823 version_id_col = mapper.version_id_col
824 if version_id_col is not None:
825 if adapter:
826 version_id_col = adapter.columns[version_id_col]
827 version_id_getter = result._getter(version_id_col)
828 else:
829 version_id_getter = None
831 if not refresh_state and _polymorphic_from is not None:
832 key = ("loader", path.path)
833 if key in context.attributes and context.attributes[key].strategy == (
834 ("selectinload_polymorphic", True),
835 ):
836 selectin_load_via = mapper._should_selectin_load(
837 context.attributes[key].local_opts["entities"],
838 _polymorphic_from,
839 )
840 else:
841 selectin_load_via = mapper._should_selectin_load(
842 None, _polymorphic_from
843 )
845 if selectin_load_via and selectin_load_via is not _polymorphic_from:
846 # only_load_props goes w/ refresh_state only, and in a refresh
847 # we are a single row query for the exact entity; polymorphic
848 # loading does not apply
849 assert only_load_props is None
851 callable_ = _load_subclass_via_in(context, path, selectin_load_via)
853 PostLoad.callable_for_path(
854 context,
855 load_path,
856 selectin_load_via.mapper,
857 selectin_load_via,
858 callable_,
859 selectin_load_via,
860 )
862 post_load = PostLoad.for_context(context, load_path, only_load_props)
864 if refresh_state:
865 refresh_identity_key = refresh_state.key
866 if refresh_identity_key is None:
867 # super-rare condition; a refresh is being called
868 # on a non-instance-key instance; this is meant to only
869 # occur within a flush()
870 refresh_identity_key = mapper._identity_key_from_state(
871 refresh_state
872 )
873 else:
874 refresh_identity_key = None
876 primary_key_getter = getters["primary_key_getter"]
878 if mapper.allow_partial_pks:
879 is_not_primary_key = _none_set.issuperset
880 else:
881 is_not_primary_key = _none_set.intersection
883 def _instance(row):
885 # determine the state that we'll be populating
886 if refresh_identity_key:
887 # fixed state that we're refreshing
888 state = refresh_state
889 instance = state.obj()
890 dict_ = instance_dict(instance)
891 isnew = state.runid != runid
892 currentload = True
893 loaded_instance = False
894 else:
895 # look at the row, see if that identity is in the
896 # session, or we have to create a new one
897 identitykey = (
898 identity_class,
899 primary_key_getter(row),
900 identity_token,
901 )
903 instance = session_identity_map.get(identitykey)
905 if instance is not None:
906 # existing instance
907 state = instance_state(instance)
908 dict_ = instance_dict(instance)
910 isnew = state.runid != runid
911 currentload = not isnew
912 loaded_instance = False
914 if version_check and version_id_getter and not currentload:
915 _validate_version_id(
916 mapper, state, dict_, row, version_id_getter
917 )
919 else:
920 # create a new instance
922 # check for non-NULL values in the primary key columns,
923 # else no entity is returned for the row
924 if is_not_primary_key(identitykey[1]):
925 return None
927 isnew = True
928 currentload = True
929 loaded_instance = True
931 instance = mapper.class_manager.new_instance()
933 dict_ = instance_dict(instance)
934 state = instance_state(instance)
935 state.key = identitykey
936 state.identity_token = identity_token
938 # attach instance to session.
939 state.session_id = session_id
940 session_identity_map._add_unpresent(state, identitykey)
942 effective_populate_existing = populate_existing
943 if refresh_state is state:
944 effective_populate_existing = True
946 # populate. this looks at whether this state is new
947 # for this load or was existing, and whether or not this
948 # row is the first row with this identity.
949 if currentload or effective_populate_existing:
950 # full population routines. Objects here are either
951 # just created, or we are doing a populate_existing
953 # be conservative about setting load_path when populate_existing
954 # is in effect; want to maintain options from the original
955 # load. see test_expire->test_refresh_maintains_deferred_options
956 if isnew and (
957 propagated_loader_options or not effective_populate_existing
958 ):
959 state.load_options = propagated_loader_options
960 state.load_path = load_path
962 _populate_full(
963 context,
964 row,
965 state,
966 dict_,
967 isnew,
968 load_path,
969 loaded_instance,
970 effective_populate_existing,
971 populators,
972 )
974 if isnew:
975 # state.runid should be equal to context.runid / runid
976 # here, however for event checks we are being more conservative
977 # and checking against existing run id
978 # assert state.runid == runid
980 existing_runid = state.runid
982 if loaded_instance:
983 if load_evt:
984 state.manager.dispatch.load(state, context)
985 if state.runid != existing_runid:
986 _warn_for_runid_changed(state)
987 if persistent_evt:
988 loaded_as_persistent(context.session, state)
989 if state.runid != existing_runid:
990 _warn_for_runid_changed(state)
991 elif refresh_evt:
992 state.manager.dispatch.refresh(
993 state, context, only_load_props
994 )
995 if state.runid != runid:
996 _warn_for_runid_changed(state)
998 if effective_populate_existing or state.modified:
999 if refresh_state and only_load_props:
1000 state._commit(dict_, only_load_props)
1001 else:
1002 state._commit_all(dict_, session_identity_map)
1004 if post_load:
1005 post_load.add_state(state, True)
1007 else:
1008 # partial population routines, for objects that were already
1009 # in the Session, but a row matches them; apply eager loaders
1010 # on existing objects, etc.
1011 unloaded = state.unloaded
1012 isnew = state not in context.partials
1014 if not isnew or unloaded or populators["eager"]:
1015 # state is having a partial set of its attributes
1016 # refreshed. Populate those attributes,
1017 # and add to the "context.partials" collection.
1019 to_load = _populate_partial(
1020 context,
1021 row,
1022 state,
1023 dict_,
1024 isnew,
1025 load_path,
1026 unloaded,
1027 populators,
1028 )
1030 if isnew:
1031 if refresh_evt:
1032 existing_runid = state.runid
1033 state.manager.dispatch.refresh(state, context, to_load)
1034 if state.runid != existing_runid:
1035 _warn_for_runid_changed(state)
1037 state._commit(dict_, to_load)
1039 if post_load and context.invoke_all_eagers:
1040 post_load.add_state(state, False)
1042 return instance
1044 if mapper.polymorphic_map and not _polymorphic_from and not refresh_state:
1045 # if we are doing polymorphic, dispatch to a different _instance()
1046 # method specific to the subclass mapper
1047 def ensure_no_pk(row):
1048 identitykey = (
1049 identity_class,
1050 primary_key_getter(row),
1051 identity_token,
1052 )
1053 if not is_not_primary_key(identitykey[1]):
1054 return identitykey
1055 else:
1056 return None
1058 _instance = _decorate_polymorphic_switch(
1059 _instance,
1060 context,
1061 query_entity,
1062 mapper,
1063 result,
1064 path,
1065 polymorphic_discriminator,
1066 adapter,
1067 ensure_no_pk,
1068 )
1070 return _instance
1073def _load_subclass_via_in(context, path, entity):
1074 mapper = entity.mapper
1076 zero_idx = len(mapper.base_mapper.primary_key) == 1
1078 if entity.is_aliased_class:
1079 q, enable_opt, disable_opt = mapper._subclass_load_via_in(entity)
1080 else:
1081 q, enable_opt, disable_opt = mapper._subclass_load_via_in_mapper
1083 def do_load(context, path, states, load_only, effective_entity):
1084 orig_query = context.query
1086 options = (enable_opt,) + orig_query._with_options + (disable_opt,)
1087 q2 = q.options(*options)
1089 q2._compile_options = context.compile_state.default_compile_options
1090 q2._compile_options += {"_current_path": path.parent}
1092 if context.populate_existing:
1093 q2 = q2.execution_options(populate_existing=True)
1095 context.session.execute(
1096 q2,
1097 dict(
1098 primary_keys=[
1099 state.key[1][0] if zero_idx else state.key[1]
1100 for state, load_attrs in states
1101 ]
1102 ),
1103 ).unique().scalars().all()
1105 return do_load
1108def _populate_full(
1109 context,
1110 row,
1111 state,
1112 dict_,
1113 isnew,
1114 load_path,
1115 loaded_instance,
1116 populate_existing,
1117 populators,
1118):
1119 if isnew:
1120 # first time we are seeing a row with this identity.
1121 state.runid = context.runid
1123 for key, getter in populators["quick"]:
1124 dict_[key] = getter(row)
1125 if populate_existing:
1126 for key, set_callable in populators["expire"]:
1127 dict_.pop(key, None)
1128 if set_callable:
1129 state.expired_attributes.add(key)
1130 else:
1131 for key, set_callable in populators["expire"]:
1132 if set_callable:
1133 state.expired_attributes.add(key)
1135 for key, populator in populators["new"]:
1136 populator(state, dict_, row)
1137 for key, populator in populators["delayed"]:
1138 populator(state, dict_, row)
1139 elif load_path != state.load_path:
1140 # new load path, e.g. object is present in more than one
1141 # column position in a series of rows
1142 state.load_path = load_path
1144 # if we have data, and the data isn't in the dict, OK, let's put
1145 # it in.
1146 for key, getter in populators["quick"]:
1147 if key not in dict_:
1148 dict_[key] = getter(row)
1150 # otherwise treat like an "already seen" row
1151 for key, populator in populators["existing"]:
1152 populator(state, dict_, row)
1153 # TODO: allow "existing" populator to know this is
1154 # a new path for the state:
1155 # populator(state, dict_, row, new_path=True)
1157 else:
1158 # have already seen rows with this identity in this same path.
1159 for key, populator in populators["existing"]:
1160 populator(state, dict_, row)
1162 # TODO: same path
1163 # populator(state, dict_, row, new_path=False)
1166def _populate_partial(
1167 context, row, state, dict_, isnew, load_path, unloaded, populators
1168):
1170 if not isnew:
1171 to_load = context.partials[state]
1172 for key, populator in populators["existing"]:
1173 if key in to_load:
1174 populator(state, dict_, row)
1175 else:
1176 to_load = unloaded
1177 context.partials[state] = to_load
1179 for key, getter in populators["quick"]:
1180 if key in to_load:
1181 dict_[key] = getter(row)
1182 for key, set_callable in populators["expire"]:
1183 if key in to_load:
1184 dict_.pop(key, None)
1185 if set_callable:
1186 state.expired_attributes.add(key)
1187 for key, populator in populators["new"]:
1188 if key in to_load:
1189 populator(state, dict_, row)
1190 for key, populator in populators["delayed"]:
1191 if key in to_load:
1192 populator(state, dict_, row)
1193 for key, populator in populators["eager"]:
1194 if key not in unloaded:
1195 populator(state, dict_, row)
1197 return to_load
1200def _validate_version_id(mapper, state, dict_, row, getter):
1202 if mapper._get_state_attr_by_column(
1203 state, dict_, mapper.version_id_col
1204 ) != getter(row):
1205 raise orm_exc.StaleDataError(
1206 "Instance '%s' has version id '%s' which "
1207 "does not match database-loaded version id '%s'."
1208 % (
1209 state_str(state),
1210 mapper._get_state_attr_by_column(
1211 state, dict_, mapper.version_id_col
1212 ),
1213 getter(row),
1214 )
1215 )
1218def _decorate_polymorphic_switch(
1219 instance_fn,
1220 context,
1221 query_entity,
1222 mapper,
1223 result,
1224 path,
1225 polymorphic_discriminator,
1226 adapter,
1227 ensure_no_pk,
1228):
1229 if polymorphic_discriminator is not None:
1230 polymorphic_on = polymorphic_discriminator
1231 else:
1232 polymorphic_on = mapper.polymorphic_on
1233 if polymorphic_on is None:
1234 return instance_fn
1236 if adapter:
1237 polymorphic_on = adapter.columns[polymorphic_on]
1239 def configure_subclass_mapper(discriminator):
1240 try:
1241 sub_mapper = mapper.polymorphic_map[discriminator]
1242 except KeyError:
1243 raise AssertionError(
1244 "No such polymorphic_identity %r is defined" % discriminator
1245 )
1246 else:
1247 if sub_mapper is mapper:
1248 return None
1249 elif not sub_mapper.isa(mapper):
1250 return False
1252 return _instance_processor(
1253 query_entity,
1254 sub_mapper,
1255 context,
1256 result,
1257 path,
1258 adapter,
1259 _polymorphic_from=mapper,
1260 )
1262 polymorphic_instances = util.PopulateDict(configure_subclass_mapper)
1264 getter = result._getter(polymorphic_on)
1266 def polymorphic_instance(row):
1267 discriminator = getter(row)
1268 if discriminator is not None:
1269 _instance = polymorphic_instances[discriminator]
1270 if _instance:
1271 return _instance(row)
1272 elif _instance is False:
1273 identitykey = ensure_no_pk(row)
1275 if identitykey:
1276 raise sa_exc.InvalidRequestError(
1277 "Row with identity key %s can't be loaded into an "
1278 "object; the polymorphic discriminator column '%s' "
1279 "refers to %s, which is not a sub-mapper of "
1280 "the requested %s"
1281 % (
1282 identitykey,
1283 polymorphic_on,
1284 mapper.polymorphic_map[discriminator],
1285 mapper,
1286 )
1287 )
1288 else:
1289 return None
1290 else:
1291 return instance_fn(row)
1292 else:
1293 identitykey = ensure_no_pk(row)
1295 if identitykey:
1296 raise sa_exc.InvalidRequestError(
1297 "Row with identity key %s can't be loaded into an "
1298 "object; the polymorphic discriminator column '%s' is "
1299 "NULL" % (identitykey, polymorphic_on)
1300 )
1301 else:
1302 return None
1304 return polymorphic_instance
1307class PostLoad(object):
1308 """Track loaders and states for "post load" operations."""
1310 __slots__ = "loaders", "states", "load_keys"
1312 def __init__(self):
1313 self.loaders = {}
1314 self.states = util.OrderedDict()
1315 self.load_keys = None
1317 def add_state(self, state, overwrite):
1318 # the states for a polymorphic load here are all shared
1319 # within a single PostLoad object among multiple subtypes.
1320 # Filtering of callables on a per-subclass basis needs to be done at
1321 # the invocation level
1322 self.states[state] = overwrite
1324 def invoke(self, context, path):
1325 if not self.states:
1326 return
1327 path = path_registry.PathRegistry.coerce(path)
1328 for token, limit_to_mapper, loader, arg, kw in self.loaders.values():
1329 states = [
1330 (state, overwrite)
1331 for state, overwrite in self.states.items()
1332 if state.manager.mapper.isa(limit_to_mapper)
1333 ]
1334 if states:
1335 loader(context, path, states, self.load_keys, *arg, **kw)
1336 self.states.clear()
1338 @classmethod
1339 def for_context(cls, context, path, only_load_props):
1340 pl = context.post_load_paths.get(path.path)
1341 if pl is not None and only_load_props:
1342 pl.load_keys = only_load_props
1343 return pl
1345 @classmethod
1346 def path_exists(self, context, path, key):
1347 return (
1348 path.path in context.post_load_paths
1349 and key in context.post_load_paths[path.path].loaders
1350 )
1352 @classmethod
1353 def callable_for_path(
1354 cls, context, path, limit_to_mapper, token, loader_callable, *arg, **kw
1355 ):
1356 if path.path in context.post_load_paths:
1357 pl = context.post_load_paths[path.path]
1358 else:
1359 pl = context.post_load_paths[path.path] = PostLoad()
1360 pl.loaders[token] = (token, limit_to_mapper, loader_callable, arg, kw)
1363def load_scalar_attributes(mapper, state, attribute_names, passive):
1364 """initiate a column-based attribute refresh operation."""
1366 # assert mapper is _state_mapper(state)
1367 session = state.session
1368 if not session:
1369 raise orm_exc.DetachedInstanceError(
1370 "Instance %s is not bound to a Session; "
1371 "attribute refresh operation cannot proceed" % (state_str(state))
1372 )
1374 has_key = bool(state.key)
1376 result = False
1378 no_autoflush = (
1379 bool(passive & attributes.NO_AUTOFLUSH) or state.session.autocommit
1380 )
1382 # in the case of inheritance, particularly concrete and abstract
1383 # concrete inheritance, the class manager might have some keys
1384 # of attributes on the superclass that we didn't actually map.
1385 # These could be mapped as "concrete, don't load" or could be completely
1386 # excluded from the mapping and we know nothing about them. Filter them
1387 # here to prevent them from coming through.
1388 if attribute_names:
1389 attribute_names = attribute_names.intersection(mapper.attrs.keys())
1391 if mapper.inherits and not mapper.concrete:
1392 # because we are using Core to produce a select() that we
1393 # pass to the Query, we aren't calling setup() for mapped
1394 # attributes; in 1.0 this means deferred attrs won't get loaded
1395 # by default
1396 statement = mapper._optimized_get_statement(state, attribute_names)
1397 if statement is not None:
1398 # this was previously aliased(mapper, statement), however,
1399 # statement is a select() and Query's coercion now raises for this
1400 # since you can't "select" from a "SELECT" statement. only
1401 # from_statement() allows this.
1402 # note: using from_statement() here means there is an adaption
1403 # with adapt_on_names set up. the other option is to make the
1404 # aliased() against a subquery which affects the SQL.
1406 from .query import FromStatement
1408 stmt = FromStatement(mapper, statement).options(
1409 strategy_options.Load(mapper).undefer("*")
1410 )
1412 result = load_on_ident(
1413 session,
1414 stmt,
1415 None,
1416 only_load_props=attribute_names,
1417 refresh_state=state,
1418 no_autoflush=no_autoflush,
1419 )
1421 if result is False:
1422 if has_key:
1423 identity_key = state.key
1424 else:
1425 # this codepath is rare - only valid when inside a flush, and the
1426 # object is becoming persistent but hasn't yet been assigned
1427 # an identity_key.
1428 # check here to ensure we have the attrs we need.
1429 pk_attrs = [
1430 mapper._columntoproperty[col].key for col in mapper.primary_key
1431 ]
1432 if state.expired_attributes.intersection(pk_attrs):
1433 raise sa_exc.InvalidRequestError(
1434 "Instance %s cannot be refreshed - it's not "
1435 " persistent and does not "
1436 "contain a full primary key." % state_str(state)
1437 )
1438 identity_key = mapper._identity_key_from_state(state)
1440 if (
1441 _none_set.issubset(identity_key) and not mapper.allow_partial_pks
1442 ) or _none_set.issuperset(identity_key):
1443 util.warn_limited(
1444 "Instance %s to be refreshed doesn't "
1445 "contain a full primary key - can't be refreshed "
1446 "(and shouldn't be expired, either).",
1447 state_str(state),
1448 )
1449 return
1451 result = load_on_ident(
1452 session,
1453 future.select(mapper).set_label_style(
1454 LABEL_STYLE_TABLENAME_PLUS_COL
1455 ),
1456 identity_key,
1457 refresh_state=state,
1458 only_load_props=attribute_names,
1459 no_autoflush=no_autoflush,
1460 )
1462 # if instance is pending, a refresh operation
1463 # may not complete (even if PK attributes are assigned)
1464 if has_key and result is None:
1465 raise orm_exc.ObjectDeletedError(state)