1# orm/strategies.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: ignore-errors
8
9
10"""sqlalchemy.orm.interfaces.LoaderStrategy
11implementations, and related MapperOptions."""
12
13from __future__ import annotations
14
15import collections
16import itertools
17from typing import Any
18from typing import Dict
19from typing import Literal
20from typing import Optional
21from typing import Tuple
22from typing import TYPE_CHECKING
23from typing import Union
24
25from . import attributes
26from . import exc as orm_exc
27from . import interfaces
28from . import loading
29from . import path_registry
30from . import properties
31from . import query
32from . import relationships
33from . import unitofwork
34from . import util as orm_util
35from .base import _DEFER_FOR_STATE
36from .base import _RAISE_FOR_STATE
37from .base import _SET_DEFERRED_EXPIRED
38from .base import ATTR_WAS_SET
39from .base import LoaderCallableStatus
40from .base import PASSIVE_OFF
41from .base import PassiveFlag
42from .context import _column_descriptions
43from .context import _ORMCompileState
44from .context import _ORMSelectCompileState
45from .context import QueryContext
46from .interfaces import LoaderStrategy
47from .interfaces import StrategizedProperty
48from .session import _state_session
49from .state import InstanceState
50from .strategy_options import Load
51from .util import _none_only_set
52from .util import AliasedClass
53from .. import event
54from .. import exc as sa_exc
55from .. import inspect
56from .. import log
57from .. import sql
58from .. import util
59from ..sql import util as sql_util
60from ..sql import visitors
61from ..sql.selectable import LABEL_STYLE_TABLENAME_PLUS_COL
62from ..sql.selectable import Select
63
64if TYPE_CHECKING:
65 from .mapper import Mapper
66 from .relationships import RelationshipProperty
67 from ..sql.elements import ColumnElement
68
69
70def _register_attribute(
71 prop,
72 mapper,
73 useobject,
74 compare_function=None,
75 typecallable=None,
76 callable_=None,
77 proxy_property=None,
78 active_history=False,
79 impl_class=None,
80 default_scalar_value=None,
81 **kw,
82):
83 listen_hooks = []
84
85 uselist = useobject and prop.uselist
86
87 if useobject and prop.single_parent:
88 listen_hooks.append(_single_parent_validator)
89
90 if prop.key in prop.parent.validators:
91 fn, opts = prop.parent.validators[prop.key]
92 listen_hooks.append(
93 lambda desc, prop: orm_util._validator_events(
94 desc, prop.key, fn, **opts
95 )
96 )
97
98 if useobject:
99 listen_hooks.append(unitofwork._track_cascade_events)
100
101 # need to assemble backref listeners
102 # after the singleparentvalidator, mapper validator
103 if useobject:
104 backref = prop.back_populates
105 if backref and prop._effective_sync_backref:
106 listen_hooks.append(
107 lambda desc, prop: attributes._backref_listeners(
108 desc, backref, uselist
109 )
110 )
111
112 # a single MapperProperty is shared down a class inheritance
113 # hierarchy, so we set up attribute instrumentation and backref event
114 # for each mapper down the hierarchy.
115
116 # typically, "mapper" is the same as prop.parent, due to the way
117 # the configure_mappers() process runs, however this is not strongly
118 # enforced, and in the case of a second configure_mappers() run the
119 # mapper here might not be prop.parent; also, a subclass mapper may
120 # be called here before a superclass mapper. That is, can't depend
121 # on mappers not already being set up so we have to check each one.
122
123 for m in mapper.self_and_descendants:
124 if prop is m._props.get(
125 prop.key
126 ) and not m.class_manager._attr_has_impl(prop.key):
127 desc = attributes._register_attribute_impl(
128 m.class_,
129 prop.key,
130 parent_token=prop,
131 uselist=uselist,
132 compare_function=compare_function,
133 useobject=useobject,
134 trackparent=useobject
135 and (
136 prop.single_parent
137 or prop.direction is interfaces.ONETOMANY
138 ),
139 typecallable=typecallable,
140 callable_=callable_,
141 active_history=active_history,
142 default_scalar_value=default_scalar_value,
143 impl_class=impl_class,
144 send_modified_events=not useobject or not prop.viewonly,
145 doc=prop.doc,
146 **kw,
147 )
148
149 for hook in listen_hooks:
150 hook(desc, prop)
151
152
153@properties.ColumnProperty.strategy_for(instrument=False, deferred=False)
154class _UninstrumentedColumnLoader(LoaderStrategy):
155 """Represent a non-instrumented MapperProperty.
156
157 The polymorphic_on argument of mapper() often results in this,
158 if the argument is against the with_polymorphic selectable.
159
160 """
161
162 __slots__ = ("columns",)
163
164 def __init__(self, parent, strategy_key):
165 super().__init__(parent, strategy_key)
166 self.columns = self.parent_property.columns
167
168 def setup_query(
169 self,
170 compile_state,
171 query_entity,
172 path,
173 loadopt,
174 adapter,
175 column_collection=None,
176 **kwargs,
177 ):
178 for c in self.columns:
179 if adapter:
180 c = adapter.columns[c]
181 compile_state._append_dedupe_col_collection(c, column_collection)
182
183 def create_row_processor(
184 self,
185 context,
186 query_entity,
187 path,
188 loadopt,
189 mapper,
190 result,
191 adapter,
192 populators,
193 ):
194 pass
195
196
197@log.class_logger
198@properties.ColumnProperty.strategy_for(instrument=True, deferred=False)
199class _ColumnLoader(LoaderStrategy):
200 """Provide loading behavior for a :class:`.ColumnProperty`."""
201
202 __slots__ = "columns", "is_composite"
203
204 def __init__(self, parent, strategy_key):
205 super().__init__(parent, strategy_key)
206 self.columns = self.parent_property.columns
207 self.is_composite = hasattr(self.parent_property, "composite_class")
208
209 def setup_query(
210 self,
211 compile_state,
212 query_entity,
213 path,
214 loadopt,
215 adapter,
216 column_collection,
217 memoized_populators,
218 check_for_adapt=False,
219 **kwargs,
220 ):
221 for c in self.columns:
222 if adapter:
223 if check_for_adapt:
224 c = adapter.adapt_check_present(c)
225 if c is None:
226 return
227 else:
228 c = adapter.columns[c]
229
230 compile_state._append_dedupe_col_collection(c, column_collection)
231
232 fetch = self.columns[0]
233 if adapter:
234 fetch = adapter.columns[fetch]
235 if fetch is None:
236 # None happens here only for dml bulk_persistence cases
237 # when context.DMLReturningColFilter is used
238 return
239
240 memoized_populators[self.parent_property] = fetch
241
242 def init_class_attribute(self, mapper):
243 self.is_class_level = True
244 coltype = self.columns[0].type
245 # TODO: check all columns ? check for foreign key as well?
246 active_history = (
247 self.parent_property.active_history
248 or self.columns[0].primary_key
249 or (
250 mapper.version_id_col is not None
251 and mapper._columntoproperty.get(mapper.version_id_col, None)
252 is self.parent_property
253 )
254 )
255
256 _register_attribute(
257 self.parent_property,
258 mapper,
259 useobject=False,
260 compare_function=coltype.compare_values,
261 active_history=active_history,
262 default_scalar_value=self.parent_property._default_scalar_value,
263 )
264
265 def create_row_processor(
266 self,
267 context,
268 query_entity,
269 path,
270 loadopt,
271 mapper,
272 result,
273 adapter,
274 populators,
275 ):
276 # look through list of columns represented here
277 # to see which, if any, is present in the row.
278
279 for col in self.columns:
280 if adapter:
281 col = adapter.columns[col]
282 getter = result._getter(col, False)
283 if getter:
284 populators["quick"].append((self.key, getter))
285 break
286 else:
287 populators["expire"].append((self.key, True))
288
289
290@log.class_logger
291@properties.ColumnProperty.strategy_for(query_expression=True)
292class _ExpressionColumnLoader(_ColumnLoader):
293 def __init__(self, parent, strategy_key):
294 super().__init__(parent, strategy_key)
295
296 # compare to the "default" expression that is mapped in
297 # the column. If it's sql.null, we don't need to render
298 # unless an expr is passed in the options.
299 null = sql.null().label(None)
300 self._have_default_expression = any(
301 not c.compare(null) for c in self.parent_property.columns
302 )
303
304 def setup_query(
305 self,
306 compile_state,
307 query_entity,
308 path,
309 loadopt,
310 adapter,
311 column_collection,
312 memoized_populators,
313 **kwargs,
314 ):
315 columns = None
316 if loadopt and loadopt._extra_criteria:
317 columns = loadopt._extra_criteria
318
319 elif self._have_default_expression:
320 columns = self.parent_property.columns
321
322 if columns is None:
323 return
324
325 for c in columns:
326 if adapter:
327 c = adapter.columns[c]
328 compile_state._append_dedupe_col_collection(c, column_collection)
329
330 fetch = columns[0]
331 if adapter:
332 fetch = adapter.columns[fetch]
333 if fetch is None:
334 # None is not expected to be the result of any
335 # adapter implementation here, however there may be theoretical
336 # usages of returning() with context.DMLReturningColFilter
337 return
338
339 memoized_populators[self.parent_property] = fetch
340
341 def create_row_processor(
342 self,
343 context,
344 query_entity,
345 path,
346 loadopt,
347 mapper,
348 result,
349 adapter,
350 populators,
351 ):
352 # look through list of columns represented here
353 # to see which, if any, is present in the row.
354 if loadopt and loadopt._extra_criteria:
355 columns = loadopt._extra_criteria
356
357 for col in columns:
358 if adapter:
359 col = adapter.columns[col]
360 getter = result._getter(col, False)
361 if getter:
362 populators["quick"].append((self.key, getter))
363 break
364 else:
365 populators["expire"].append((self.key, True))
366
367 def init_class_attribute(self, mapper):
368 self.is_class_level = True
369
370 _register_attribute(
371 self.parent_property,
372 mapper,
373 useobject=False,
374 compare_function=self.columns[0].type.compare_values,
375 accepts_scalar_loader=False,
376 default_scalar_value=self.parent_property._default_scalar_value,
377 )
378
379
380@log.class_logger
381@properties.ColumnProperty.strategy_for(deferred=True, instrument=True)
382@properties.ColumnProperty.strategy_for(
383 deferred=True, instrument=True, raiseload=True
384)
385@properties.ColumnProperty.strategy_for(do_nothing=True)
386class _DeferredColumnLoader(LoaderStrategy):
387 """Provide loading behavior for a deferred :class:`.ColumnProperty`."""
388
389 __slots__ = "columns", "group", "raiseload"
390
391 def __init__(self, parent, strategy_key):
392 super().__init__(parent, strategy_key)
393 if hasattr(self.parent_property, "composite_class"):
394 raise NotImplementedError(
395 "Deferred loading for composite types not implemented yet"
396 )
397 self.raiseload = self.strategy_opts.get("raiseload", False)
398 self.columns = self.parent_property.columns
399 self.group = self.parent_property.group
400
401 def create_row_processor(
402 self,
403 context,
404 query_entity,
405 path,
406 loadopt,
407 mapper,
408 result,
409 adapter,
410 populators,
411 ):
412 # for a DeferredColumnLoader, this method is only used during a
413 # "row processor only" query; see test_deferred.py ->
414 # tests with "rowproc_only" in their name. As of the 1.0 series,
415 # loading._instance_processor doesn't use a "row processing" function
416 # to populate columns, instead it uses data in the "populators"
417 # dictionary. Normally, the DeferredColumnLoader.setup_query()
418 # sets up that data in the "memoized_populators" dictionary
419 # and "create_row_processor()" here is never invoked.
420
421 if (
422 context.refresh_state
423 and context.query._compile_options._only_load_props
424 and self.key in context.query._compile_options._only_load_props
425 ):
426 self.parent_property._get_strategy(
427 (("deferred", False), ("instrument", True))
428 ).create_row_processor(
429 context,
430 query_entity,
431 path,
432 loadopt,
433 mapper,
434 result,
435 adapter,
436 populators,
437 )
438
439 elif not self.is_class_level:
440 if self.raiseload:
441 set_deferred_for_local_state = (
442 self.parent_property._raise_column_loader
443 )
444 else:
445 set_deferred_for_local_state = (
446 self.parent_property._deferred_column_loader
447 )
448 populators["new"].append((self.key, set_deferred_for_local_state))
449 else:
450 populators["expire"].append((self.key, False))
451
452 def init_class_attribute(self, mapper):
453 self.is_class_level = True
454
455 _register_attribute(
456 self.parent_property,
457 mapper,
458 useobject=False,
459 compare_function=self.columns[0].type.compare_values,
460 callable_=self._load_for_state,
461 load_on_unexpire=False,
462 default_scalar_value=self.parent_property._default_scalar_value,
463 )
464
465 def setup_query(
466 self,
467 compile_state,
468 query_entity,
469 path,
470 loadopt,
471 adapter,
472 column_collection,
473 memoized_populators,
474 only_load_props=None,
475 **kw,
476 ):
477 if (
478 (
479 compile_state.compile_options._render_for_subquery
480 and self.parent_property._renders_in_subqueries
481 )
482 or (
483 loadopt
484 and set(self.columns).intersection(
485 self.parent._should_undefer_in_wildcard
486 )
487 )
488 or (
489 loadopt
490 and self.group
491 and loadopt.local_opts.get(
492 "undefer_group_%s" % self.group, False
493 )
494 )
495 or (only_load_props and self.key in only_load_props)
496 ):
497 self.parent_property._get_strategy(
498 (("deferred", False), ("instrument", True))
499 ).setup_query(
500 compile_state,
501 query_entity,
502 path,
503 loadopt,
504 adapter,
505 column_collection,
506 memoized_populators,
507 **kw,
508 )
509 elif self.is_class_level:
510 memoized_populators[self.parent_property] = _SET_DEFERRED_EXPIRED
511 elif not self.raiseload:
512 memoized_populators[self.parent_property] = _DEFER_FOR_STATE
513 else:
514 memoized_populators[self.parent_property] = _RAISE_FOR_STATE
515
516 def _load_for_state(self, state, passive):
517 if not state.key:
518 return LoaderCallableStatus.ATTR_EMPTY
519
520 if not passive & PassiveFlag.SQL_OK:
521 return LoaderCallableStatus.PASSIVE_NO_RESULT
522
523 localparent = state.manager.mapper
524
525 if self.group:
526 toload = [
527 p.key
528 for p in localparent.iterate_properties
529 if isinstance(p, StrategizedProperty)
530 and isinstance(p.strategy, _DeferredColumnLoader)
531 and p.group == self.group
532 ]
533 else:
534 toload = [self.key]
535
536 # narrow the keys down to just those which have no history
537 group = [k for k in toload if k in state.unmodified]
538
539 session = _state_session(state)
540 if session is None:
541 raise orm_exc.DetachedInstanceError(
542 "Parent instance %s is not bound to a Session; "
543 "deferred load operation of attribute '%s' cannot proceed"
544 % (orm_util.state_str(state), self.key)
545 )
546
547 if self.raiseload:
548 self._invoke_raise_load(state, passive, "raise")
549
550 loading._load_scalar_attributes(
551 state.mapper, state, set(group), PASSIVE_OFF
552 )
553
554 return LoaderCallableStatus.ATTR_WAS_SET
555
556 def _invoke_raise_load(self, state, passive, lazy):
557 raise sa_exc.InvalidRequestError(
558 "'%s' is not available due to raiseload=True" % (self,)
559 )
560
561
562class _LoadDeferredColumns:
563 """serializable loader object used by DeferredColumnLoader"""
564
565 def __init__(self, key: str, raiseload: bool = False):
566 self.key = key
567 self.raiseload = raiseload
568
569 def __call__(self, state, passive=attributes.PASSIVE_OFF):
570 key = self.key
571
572 localparent = state.manager.mapper
573 prop = localparent._props[key]
574 if self.raiseload:
575 strategy_key = (
576 ("deferred", True),
577 ("instrument", True),
578 ("raiseload", True),
579 )
580 else:
581 strategy_key = (("deferred", True), ("instrument", True))
582 strategy = prop._get_strategy(strategy_key)
583 return strategy._load_for_state(state, passive)
584
585
586class _AbstractRelationshipLoader(LoaderStrategy):
587 """LoaderStratgies which deal with related objects."""
588
589 __slots__ = "mapper", "target", "uselist", "entity"
590
591 def __init__(self, parent, strategy_key):
592 super().__init__(parent, strategy_key)
593 self.mapper = self.parent_property.mapper
594 self.entity = self.parent_property.entity
595 self.target = self.parent_property.target
596 self.uselist = self.parent_property.uselist
597
598 def _immediateload_create_row_processor(
599 self,
600 context,
601 query_entity,
602 path,
603 loadopt,
604 mapper,
605 result,
606 adapter,
607 populators,
608 ):
609 return self.parent_property._get_strategy(
610 (("lazy", "immediate"),)
611 ).create_row_processor(
612 context,
613 query_entity,
614 path,
615 loadopt,
616 mapper,
617 result,
618 adapter,
619 populators,
620 )
621
622
623@log.class_logger
624@relationships.RelationshipProperty.strategy_for(do_nothing=True)
625class _DoNothingLoader(LoaderStrategy):
626 """Relationship loader that makes no change to the object's state.
627
628 Compared to NoLoader, this loader does not initialize the
629 collection/attribute to empty/none; the usual default LazyLoader will
630 take effect.
631
632 """
633
634
635@log.class_logger
636@relationships.RelationshipProperty.strategy_for(lazy="noload")
637@relationships.RelationshipProperty.strategy_for(lazy=None)
638class _NoLoader(_AbstractRelationshipLoader):
639 """Provide loading behavior for a :class:`.Relationship`
640 with "lazy=None".
641
642 """
643
644 __slots__ = ()
645
646 @util.deprecated(
647 "2.1",
648 "The ``noload`` loader strategy is deprecated and will be removed "
649 "in a future release. This option "
650 "produces incorrect results by returning ``None`` for related "
651 "items.",
652 )
653 def init_class_attribute(self, mapper):
654 self.is_class_level = True
655
656 _register_attribute(
657 self.parent_property,
658 mapper,
659 useobject=True,
660 typecallable=self.parent_property.collection_class,
661 )
662
663 def create_row_processor(
664 self,
665 context,
666 query_entity,
667 path,
668 loadopt,
669 mapper,
670 result,
671 adapter,
672 populators,
673 ):
674 def invoke_no_load(state, dict_, row):
675 if self.uselist:
676 attributes.init_state_collection(state, dict_, self.key)
677 else:
678 dict_[self.key] = None
679
680 populators["new"].append((self.key, invoke_no_load))
681
682
683@log.class_logger
684@relationships.RelationshipProperty.strategy_for(lazy=True)
685@relationships.RelationshipProperty.strategy_for(lazy="select")
686@relationships.RelationshipProperty.strategy_for(lazy="raise")
687@relationships.RelationshipProperty.strategy_for(lazy="raise_on_sql")
688@relationships.RelationshipProperty.strategy_for(lazy="baked_select")
689class _LazyLoader(
690 _AbstractRelationshipLoader, util.MemoizedSlots, log.Identified
691):
692 """Provide loading behavior for a :class:`.Relationship`
693 with "lazy=True", that is loads when first accessed.
694
695 """
696
697 __slots__ = (
698 "_lazywhere",
699 "_rev_lazywhere",
700 "_lazyload_reverse_option",
701 "_order_by",
702 "use_get",
703 "is_aliased_class",
704 "_bind_to_col",
705 "_equated_columns",
706 "_rev_bind_to_col",
707 "_rev_equated_columns",
708 "_simple_lazy_clause",
709 "_raise_always",
710 "_raise_on_sql",
711 )
712
713 _lazywhere: ColumnElement[bool]
714 _bind_to_col: Dict[str, ColumnElement[Any]]
715 _rev_lazywhere: ColumnElement[bool]
716 _rev_bind_to_col: Dict[str, ColumnElement[Any]]
717
718 parent_property: RelationshipProperty[Any]
719
720 def __init__(
721 self, parent: RelationshipProperty[Any], strategy_key: Tuple[Any, ...]
722 ):
723 super().__init__(parent, strategy_key)
724 self._raise_always = self.strategy_opts["lazy"] == "raise"
725 self._raise_on_sql = self.strategy_opts["lazy"] == "raise_on_sql"
726
727 self.is_aliased_class = inspect(self.entity).is_aliased_class
728
729 join_condition = self.parent_property._join_condition
730 (
731 self._lazywhere,
732 self._bind_to_col,
733 self._equated_columns,
734 ) = join_condition.create_lazy_clause()
735
736 (
737 self._rev_lazywhere,
738 self._rev_bind_to_col,
739 self._rev_equated_columns,
740 ) = join_condition.create_lazy_clause(reverse_direction=True)
741
742 if self.parent_property.order_by:
743 self._order_by = util.to_list(self.parent_property.order_by)
744 else:
745 self._order_by = None
746
747 self.logger.info("%s lazy loading clause %s", self, self._lazywhere)
748
749 # determine if our "lazywhere" clause is the same as the mapper's
750 # get() clause. then we can just use mapper.get()
751 #
752 # TODO: the "not self.uselist" can be taken out entirely; a m2o
753 # load that populates for a list (very unusual, but is possible with
754 # the API) can still set for "None" and the attribute system will
755 # populate as an empty list.
756 self.use_get = (
757 not self.is_aliased_class
758 and not self.uselist
759 and self.entity._get_clause[0].compare(
760 self._lazywhere,
761 use_proxies=True,
762 compare_keys=False,
763 equivalents=self.mapper._equivalent_columns,
764 )
765 )
766
767 if self.use_get:
768 for col in list(self._equated_columns):
769 if col in self.mapper._equivalent_columns:
770 for c in self.mapper._equivalent_columns[col]:
771 self._equated_columns[c] = self._equated_columns[col]
772
773 self.logger.info(
774 "%s will use Session.get() to optimize instance loads", self
775 )
776
777 def init_class_attribute(self, mapper):
778 self.is_class_level = True
779
780 _legacy_inactive_history_style = (
781 self.parent_property._legacy_inactive_history_style
782 )
783
784 if self.parent_property.active_history:
785 active_history = True
786 _deferred_history = False
787
788 elif (
789 self.parent_property.direction is not interfaces.MANYTOONE
790 or not self.use_get
791 ):
792 if _legacy_inactive_history_style:
793 active_history = True
794 _deferred_history = False
795 else:
796 active_history = False
797 _deferred_history = True
798 else:
799 active_history = _deferred_history = False
800
801 _register_attribute(
802 self.parent_property,
803 mapper,
804 useobject=True,
805 callable_=self._load_for_state,
806 typecallable=self.parent_property.collection_class,
807 active_history=active_history,
808 _deferred_history=_deferred_history,
809 )
810
811 def _memoized_attr__simple_lazy_clause(self):
812 lazywhere = self._lazywhere
813
814 criterion, bind_to_col = (lazywhere, self._bind_to_col)
815
816 params = []
817
818 def visit_bindparam(bindparam):
819 bindparam.unique = False
820
821 visitors.traverse(criterion, {}, {"bindparam": visit_bindparam})
822
823 def visit_bindparam(bindparam):
824 if bindparam._identifying_key in bind_to_col:
825 params.append(
826 (
827 bindparam.key,
828 bind_to_col[bindparam._identifying_key],
829 None,
830 )
831 )
832 elif bindparam.callable is None:
833 params.append((bindparam.key, None, bindparam.value))
834
835 criterion = visitors.cloned_traverse(
836 criterion, {}, {"bindparam": visit_bindparam}
837 )
838
839 return criterion, params
840
841 def _generate_lazy_clause(self, state, passive):
842 criterion, param_keys = self._simple_lazy_clause
843
844 if state is None:
845 return sql_util.adapt_criterion_to_null(
846 criterion, [key for key, ident, value in param_keys]
847 )
848
849 mapper = self.parent_property.parent
850
851 o = state.obj() # strong ref
852 dict_ = attributes.instance_dict(o)
853
854 if passive & PassiveFlag.INIT_OK:
855 passive ^= PassiveFlag.INIT_OK
856
857 params = {}
858 for key, ident, value in param_keys:
859 if ident is not None:
860 if passive and passive & PassiveFlag.LOAD_AGAINST_COMMITTED:
861 value = mapper._get_committed_state_attr_by_column(
862 state, dict_, ident, passive
863 )
864 else:
865 value = mapper._get_state_attr_by_column(
866 state, dict_, ident, passive
867 )
868
869 params[key] = value
870
871 return criterion, params
872
873 def _invoke_raise_load(self, state, passive, lazy):
874 raise sa_exc.InvalidRequestError(
875 "'%s' is not available due to lazy='%s'" % (self, lazy)
876 )
877
878 def _load_for_state(
879 self,
880 state,
881 passive,
882 loadopt=None,
883 extra_criteria=(),
884 extra_options=(),
885 alternate_effective_path=None,
886 execution_options=util.EMPTY_DICT,
887 ):
888 if not state.key and (
889 (
890 not self.parent_property.load_on_pending
891 and not state._load_pending
892 )
893 or not state.session_id
894 ):
895 return LoaderCallableStatus.ATTR_EMPTY
896
897 pending = not state.key
898 primary_key_identity = None
899
900 use_get = self.use_get and (not loadopt or not loadopt._extra_criteria)
901
902 if (not passive & PassiveFlag.SQL_OK and not use_get) or (
903 not passive & attributes.NON_PERSISTENT_OK and pending
904 ):
905 return LoaderCallableStatus.PASSIVE_NO_RESULT
906
907 if (
908 # we were given lazy="raise"
909 self._raise_always
910 # the no_raise history-related flag was not passed
911 and not passive & PassiveFlag.NO_RAISE
912 and (
913 # if we are use_get and related_object_ok is disabled,
914 # which means we are at most looking in the identity map
915 # for history purposes or otherwise returning
916 # PASSIVE_NO_RESULT, don't raise. This is also a
917 # history-related flag
918 not use_get
919 or passive & PassiveFlag.RELATED_OBJECT_OK
920 )
921 ):
922 self._invoke_raise_load(state, passive, "raise")
923
924 session = _state_session(state)
925 if not session:
926 if passive & PassiveFlag.NO_RAISE:
927 return LoaderCallableStatus.PASSIVE_NO_RESULT
928
929 raise orm_exc.DetachedInstanceError(
930 "Parent instance %s is not bound to a Session; "
931 "lazy load operation of attribute '%s' cannot proceed"
932 % (orm_util.state_str(state), self.key)
933 )
934
935 # if we have a simple primary key load, check the
936 # identity map without generating a Query at all
937 if use_get:
938 primary_key_identity = self._get_ident_for_use_get(
939 session, state, passive
940 )
941 if LoaderCallableStatus.PASSIVE_NO_RESULT in primary_key_identity:
942 return LoaderCallableStatus.PASSIVE_NO_RESULT
943 elif LoaderCallableStatus.NEVER_SET in primary_key_identity:
944 return LoaderCallableStatus.NEVER_SET
945
946 # test for None alone in primary_key_identity based on
947 # allow_partial_pks preference. PASSIVE_NO_RESULT and NEVER_SET
948 # have already been tested above
949 if not self.mapper.allow_partial_pks:
950 if _none_only_set.intersection(primary_key_identity):
951 return None
952 else:
953 if _none_only_set.issuperset(primary_key_identity):
954 return None
955
956 if (
957 self.key in state.dict
958 and not passive & PassiveFlag.DEFERRED_HISTORY_LOAD
959 ):
960 return LoaderCallableStatus.ATTR_WAS_SET
961
962 # look for this identity in the identity map. Delegate to the
963 # Query class in use, as it may have special rules for how it
964 # does this, including how it decides what the correct
965 # identity_token would be for this identity.
966
967 instance = session._identity_lookup(
968 self.entity,
969 primary_key_identity,
970 passive=passive,
971 lazy_loaded_from=state,
972 )
973
974 if instance is not None:
975 if instance is LoaderCallableStatus.PASSIVE_CLASS_MISMATCH:
976 return None
977 else:
978 return instance
979 elif (
980 not passive & PassiveFlag.SQL_OK
981 or not passive & PassiveFlag.RELATED_OBJECT_OK
982 ):
983 return LoaderCallableStatus.PASSIVE_NO_RESULT
984
985 return self._emit_lazyload(
986 session,
987 state,
988 primary_key_identity,
989 passive,
990 loadopt,
991 extra_criteria,
992 extra_options,
993 alternate_effective_path,
994 execution_options,
995 )
996
997 def _get_ident_for_use_get(self, session, state, passive):
998 instance_mapper = state.manager.mapper
999
1000 if passive & PassiveFlag.LOAD_AGAINST_COMMITTED:
1001 get_attr = instance_mapper._get_committed_state_attr_by_column
1002 else:
1003 get_attr = instance_mapper._get_state_attr_by_column
1004
1005 dict_ = state.dict
1006
1007 return [
1008 get_attr(state, dict_, self._equated_columns[pk], passive=passive)
1009 for pk in self.mapper.primary_key
1010 ]
1011
1012 @util.preload_module("sqlalchemy.orm.strategy_options")
1013 def _emit_lazyload(
1014 self,
1015 session,
1016 state,
1017 primary_key_identity,
1018 passive,
1019 loadopt,
1020 extra_criteria,
1021 extra_options,
1022 alternate_effective_path,
1023 execution_options,
1024 ):
1025 strategy_options = util.preloaded.orm_strategy_options
1026
1027 clauseelement = self.entity.__clause_element__()
1028 stmt = Select._create_raw_select(
1029 _raw_columns=[clauseelement],
1030 _propagate_attrs=clauseelement._propagate_attrs,
1031 _label_style=LABEL_STYLE_TABLENAME_PLUS_COL,
1032 _compile_options=_ORMCompileState.default_compile_options,
1033 )
1034 load_options = QueryContext.default_load_options
1035
1036 load_options += {
1037 "_invoke_all_eagers": False,
1038 "_lazy_loaded_from": state,
1039 }
1040
1041 if self.parent_property.secondary is not None:
1042 stmt = stmt.select_from(
1043 self.mapper, self.parent_property.secondary
1044 )
1045
1046 pending = not state.key
1047
1048 # don't autoflush on pending
1049 if pending or passive & attributes.NO_AUTOFLUSH:
1050 stmt._execution_options = util.immutabledict({"autoflush": False})
1051
1052 use_get = self.use_get
1053
1054 if state.load_options or (loadopt and loadopt._extra_criteria):
1055 if alternate_effective_path is None:
1056 effective_path = state.load_path[self.parent_property]
1057 else:
1058 effective_path = alternate_effective_path[self.parent_property]
1059
1060 opts = state.load_options
1061
1062 if loadopt and loadopt._extra_criteria:
1063 use_get = False
1064 opts += (
1065 orm_util.LoaderCriteriaOption(self.entity, extra_criteria),
1066 )
1067
1068 stmt._with_options = opts
1069 elif alternate_effective_path is None:
1070 # this path is used if there are not already any options
1071 # in the query, but an event may want to add them
1072 effective_path = state.mapper._path_registry[self.parent_property]
1073 else:
1074 # added by immediateloader
1075 effective_path = alternate_effective_path[self.parent_property]
1076
1077 if extra_options:
1078 stmt._with_options += extra_options
1079
1080 stmt._compile_options += {"_current_path": effective_path}
1081
1082 if use_get:
1083 if self._raise_on_sql and not passive & PassiveFlag.NO_RAISE:
1084 self._invoke_raise_load(state, passive, "raise_on_sql")
1085
1086 return loading._load_on_pk_identity(
1087 session,
1088 stmt,
1089 primary_key_identity,
1090 load_options=load_options,
1091 execution_options=execution_options,
1092 )
1093
1094 if self._order_by:
1095 stmt._order_by_clauses = self._order_by
1096
1097 def _lazyload_reverse(compile_context):
1098 for rev in self.parent_property._reverse_property:
1099 # reverse props that are MANYTOONE are loading *this*
1100 # object from get(), so don't need to eager out to those.
1101 if (
1102 rev.direction is interfaces.MANYTOONE
1103 and rev._use_get
1104 and not isinstance(rev.strategy, _LazyLoader)
1105 ):
1106 strategy_options.Load._construct_for_existing_path(
1107 compile_context.compile_options._current_path[
1108 rev.parent
1109 ]
1110 ).lazyload(rev).process_compile_state(compile_context)
1111
1112 stmt = stmt._add_compile_state_func(
1113 _lazyload_reverse, self.parent_property
1114 )
1115
1116 lazy_clause, params = self._generate_lazy_clause(state, passive)
1117
1118 if execution_options:
1119 execution_options = util.EMPTY_DICT.merge_with(
1120 execution_options,
1121 {
1122 "_sa_orm_load_options": load_options,
1123 },
1124 )
1125 else:
1126 execution_options = {
1127 "_sa_orm_load_options": load_options,
1128 }
1129
1130 if (
1131 self.key in state.dict
1132 and not passive & PassiveFlag.DEFERRED_HISTORY_LOAD
1133 ):
1134 return LoaderCallableStatus.ATTR_WAS_SET
1135
1136 if pending:
1137 if util.has_intersection(orm_util._none_set, params.values()):
1138 return None
1139
1140 elif util.has_intersection(orm_util._never_set, params.values()):
1141 return None
1142
1143 if self._raise_on_sql and not passive & PassiveFlag.NO_RAISE:
1144 self._invoke_raise_load(state, passive, "raise_on_sql")
1145
1146 stmt._where_criteria = (lazy_clause,)
1147
1148 result = session.execute(
1149 stmt, params, execution_options=execution_options
1150 )
1151
1152 result = result.unique().scalars().all()
1153
1154 if self.uselist:
1155 return result
1156 else:
1157 l = len(result)
1158 if l:
1159 if l > 1:
1160 util.warn(
1161 "Multiple rows returned with "
1162 "uselist=False for lazily-loaded attribute '%s' "
1163 % self.parent_property
1164 )
1165
1166 return result[0]
1167 else:
1168 return None
1169
1170 def create_row_processor(
1171 self,
1172 context,
1173 query_entity,
1174 path,
1175 loadopt,
1176 mapper,
1177 result,
1178 adapter,
1179 populators,
1180 ):
1181 key = self.key
1182
1183 if (
1184 context.load_options._is_user_refresh
1185 and context.query._compile_options._only_load_props
1186 and self.key in context.query._compile_options._only_load_props
1187 ):
1188 return self._immediateload_create_row_processor(
1189 context,
1190 query_entity,
1191 path,
1192 loadopt,
1193 mapper,
1194 result,
1195 adapter,
1196 populators,
1197 )
1198
1199 if not self.is_class_level or (loadopt and loadopt._extra_criteria):
1200 # we are not the primary manager for this attribute
1201 # on this class - set up a
1202 # per-instance lazyloader, which will override the
1203 # class-level behavior.
1204 # this currently only happens when using a
1205 # "lazyload" option on a "no load"
1206 # attribute - "eager" attributes always have a
1207 # class-level lazyloader installed.
1208 set_lazy_callable = (
1209 InstanceState._instance_level_callable_processor
1210 )(
1211 mapper.class_manager,
1212 _LoadLazyAttribute(
1213 key,
1214 self,
1215 loadopt,
1216 (
1217 loadopt._generate_extra_criteria(context)
1218 if loadopt._extra_criteria
1219 else None
1220 ),
1221 ),
1222 key,
1223 )
1224
1225 populators["new"].append((self.key, set_lazy_callable))
1226 elif context.populate_existing or mapper.always_refresh:
1227
1228 def reset_for_lazy_callable(state, dict_, row):
1229 # we are the primary manager for this attribute on
1230 # this class - reset its
1231 # per-instance attribute state, so that the class-level
1232 # lazy loader is
1233 # executed when next referenced on this instance.
1234 # this is needed in
1235 # populate_existing() types of scenarios to reset
1236 # any existing state.
1237 state._reset(dict_, key)
1238
1239 populators["new"].append((self.key, reset_for_lazy_callable))
1240
1241
1242class _LoadLazyAttribute:
1243 """semi-serializable loader object used by LazyLoader
1244
1245 Historically, this object would be carried along with instances that
1246 needed to run lazyloaders, so it had to be serializable to support
1247 cached instances.
1248
1249 this is no longer a general requirement, and the case where this object
1250 is used is exactly the case where we can't really serialize easily,
1251 which is when extra criteria in the loader option is present.
1252
1253 We can't reliably serialize that as it refers to mapped entities and
1254 AliasedClass objects that are local to the current process, which would
1255 need to be matched up on deserialize e.g. the sqlalchemy.ext.serializer
1256 approach.
1257
1258 """
1259
1260 def __init__(self, key, initiating_strategy, loadopt, extra_criteria):
1261 self.key = key
1262 self.strategy_key = initiating_strategy.strategy_key
1263 self.loadopt = loadopt
1264 self.extra_criteria = extra_criteria
1265
1266 def __getstate__(self):
1267 if self.extra_criteria is not None:
1268 util.warn(
1269 "Can't reliably serialize a lazyload() option that "
1270 "contains additional criteria; please use eager loading "
1271 "for this case"
1272 )
1273 return {
1274 "key": self.key,
1275 "strategy_key": self.strategy_key,
1276 "loadopt": self.loadopt,
1277 "extra_criteria": (),
1278 }
1279
1280 def __call__(self, state, passive=attributes.PASSIVE_OFF):
1281 key = self.key
1282 instance_mapper = state.manager.mapper
1283 prop = instance_mapper._props[key]
1284 strategy = prop._strategies[self.strategy_key]
1285
1286 return strategy._load_for_state(
1287 state,
1288 passive,
1289 loadopt=self.loadopt,
1290 extra_criteria=self.extra_criteria,
1291 )
1292
1293
1294class _PostLoader(_AbstractRelationshipLoader):
1295 """A relationship loader that emits a second SELECT statement."""
1296
1297 __slots__ = ()
1298
1299 def _setup_for_recursion(self, context, path, loadopt, join_depth=None):
1300 effective_path = (
1301 context.compile_state.current_path or orm_util.PathRegistry.root
1302 ) + path
1303
1304 top_level_context = context._get_top_level_context()
1305 execution_options = util.immutabledict(
1306 {"sa_top_level_orm_context": top_level_context}
1307 )
1308
1309 if loadopt:
1310 recursion_depth = loadopt.local_opts.get("recursion_depth", None)
1311 unlimited_recursion = recursion_depth == -1
1312 else:
1313 recursion_depth = None
1314 unlimited_recursion = False
1315
1316 if recursion_depth is not None:
1317 if not self.parent_property._is_self_referential:
1318 raise sa_exc.InvalidRequestError(
1319 f"recursion_depth option on relationship "
1320 f"{self.parent_property} not valid for "
1321 "non-self-referential relationship"
1322 )
1323 recursion_depth = context.execution_options.get(
1324 f"_recursion_depth_{id(self)}", recursion_depth
1325 )
1326
1327 if not unlimited_recursion and recursion_depth < 0:
1328 return (
1329 effective_path,
1330 False,
1331 execution_options,
1332 recursion_depth,
1333 )
1334
1335 if not unlimited_recursion:
1336 execution_options = execution_options.union(
1337 {
1338 f"_recursion_depth_{id(self)}": recursion_depth - 1,
1339 }
1340 )
1341
1342 if loading._PostLoad.path_exists(
1343 context, effective_path, self.parent_property
1344 ):
1345 return effective_path, False, execution_options, recursion_depth
1346
1347 path_w_prop = path[self.parent_property]
1348 effective_path_w_prop = effective_path[self.parent_property]
1349
1350 if not path_w_prop.contains(context.attributes, "loader"):
1351 if join_depth:
1352 if effective_path_w_prop.length / 2 > join_depth:
1353 return (
1354 effective_path,
1355 False,
1356 execution_options,
1357 recursion_depth,
1358 )
1359 elif effective_path_w_prop.contains_mapper(self.mapper):
1360 return (
1361 effective_path,
1362 False,
1363 execution_options,
1364 recursion_depth,
1365 )
1366
1367 return effective_path, True, execution_options, recursion_depth
1368
1369
1370@relationships.RelationshipProperty.strategy_for(lazy="immediate")
1371class _ImmediateLoader(_PostLoader):
1372 __slots__ = ("join_depth",)
1373
1374 def __init__(self, parent, strategy_key):
1375 super().__init__(parent, strategy_key)
1376 self.join_depth = self.parent_property.join_depth
1377
1378 def init_class_attribute(self, mapper):
1379 self.parent_property._get_strategy(
1380 (("lazy", "select"),)
1381 ).init_class_attribute(mapper)
1382
1383 def create_row_processor(
1384 self,
1385 context,
1386 query_entity,
1387 path,
1388 loadopt,
1389 mapper,
1390 result,
1391 adapter,
1392 populators,
1393 ):
1394 if not context.compile_state.compile_options._enable_eagerloads:
1395 return
1396
1397 (
1398 effective_path,
1399 run_loader,
1400 execution_options,
1401 recursion_depth,
1402 ) = self._setup_for_recursion(context, path, loadopt, self.join_depth)
1403
1404 if not run_loader:
1405 # this will not emit SQL and will only emit for a many-to-one
1406 # "use get" load. the "_RELATED" part means it may return
1407 # instance even if its expired, since this is a mutually-recursive
1408 # load operation.
1409 flags = attributes.PASSIVE_NO_FETCH_RELATED | PassiveFlag.NO_RAISE
1410 else:
1411 flags = attributes.PASSIVE_OFF | PassiveFlag.NO_RAISE
1412
1413 loading._PostLoad.callable_for_path(
1414 context,
1415 effective_path,
1416 self.parent,
1417 self.parent_property,
1418 self._load_for_path,
1419 loadopt,
1420 flags,
1421 recursion_depth,
1422 execution_options,
1423 )
1424
1425 def _load_for_path(
1426 self,
1427 context,
1428 path,
1429 states,
1430 load_only,
1431 loadopt,
1432 flags,
1433 recursion_depth,
1434 execution_options,
1435 ):
1436 if recursion_depth:
1437 new_opt = Load(loadopt.path.entity)
1438 new_opt.context = (
1439 loadopt,
1440 loadopt._recurse(),
1441 )
1442 alternate_effective_path = path._truncate_recursive()
1443 extra_options = (new_opt,)
1444 else:
1445 alternate_effective_path = path
1446 extra_options = ()
1447
1448 key = self.key
1449 lazyloader = self.parent_property._get_strategy((("lazy", "select"),))
1450 for state, overwrite in states:
1451 dict_ = state.dict
1452
1453 if overwrite or key not in dict_:
1454 value = lazyloader._load_for_state(
1455 state,
1456 flags,
1457 extra_options=extra_options,
1458 alternate_effective_path=alternate_effective_path,
1459 execution_options=execution_options,
1460 )
1461 if value not in (
1462 ATTR_WAS_SET,
1463 LoaderCallableStatus.PASSIVE_NO_RESULT,
1464 ):
1465 state.get_impl(key).set_committed_value(
1466 state, dict_, value
1467 )
1468
1469
1470@log.class_logger
1471@relationships.RelationshipProperty.strategy_for(lazy="subquery")
1472class _SubqueryLoader(_PostLoader):
1473 __slots__ = ("join_depth",)
1474
1475 def __init__(self, parent, strategy_key):
1476 super().__init__(parent, strategy_key)
1477 self.join_depth = self.parent_property.join_depth
1478
1479 def init_class_attribute(self, mapper):
1480 self.parent_property._get_strategy(
1481 (("lazy", "select"),)
1482 ).init_class_attribute(mapper)
1483
1484 def _get_leftmost(
1485 self,
1486 orig_query_entity_index,
1487 subq_path,
1488 current_compile_state,
1489 is_root,
1490 ):
1491 given_subq_path = subq_path
1492 subq_path = subq_path.path
1493 subq_mapper = orm_util._class_to_mapper(subq_path[0])
1494
1495 # determine attributes of the leftmost mapper
1496 if (
1497 self.parent.isa(subq_mapper)
1498 and self.parent_property is subq_path[1]
1499 ):
1500 leftmost_mapper, leftmost_prop = self.parent, self.parent_property
1501 else:
1502 leftmost_mapper, leftmost_prop = subq_mapper, subq_path[1]
1503
1504 if is_root:
1505 # the subq_path is also coming from cached state, so when we start
1506 # building up this path, it has to also be converted to be in terms
1507 # of the current state. this is for the specific case of the entity
1508 # is an AliasedClass against a subquery that's not otherwise going
1509 # to adapt
1510 new_subq_path = current_compile_state._entities[
1511 orig_query_entity_index
1512 ].entity_zero._path_registry[leftmost_prop]
1513 additional = len(subq_path) - len(new_subq_path)
1514 if additional:
1515 new_subq_path += path_registry.PathRegistry.coerce(
1516 subq_path[-additional:]
1517 )
1518 else:
1519 new_subq_path = given_subq_path
1520
1521 leftmost_cols = leftmost_prop.local_columns
1522
1523 leftmost_attr = [
1524 getattr(
1525 new_subq_path.path[0].entity,
1526 leftmost_mapper._columntoproperty[c].key,
1527 )
1528 for c in leftmost_cols
1529 ]
1530
1531 return leftmost_mapper, leftmost_attr, leftmost_prop, new_subq_path
1532
1533 def _generate_from_original_query(
1534 self,
1535 orig_compile_state,
1536 orig_query,
1537 leftmost_mapper,
1538 leftmost_attr,
1539 leftmost_relationship,
1540 orig_entity,
1541 ):
1542 # reformat the original query
1543 # to look only for significant columns
1544 q = orig_query._clone().correlate(None)
1545
1546 # LEGACY: make a Query back from the select() !!
1547 # This suits at least two legacy cases:
1548 # 1. applications which expect before_compile() to be called
1549 # below when we run .subquery() on this query (Keystone)
1550 # 2. applications which are doing subqueryload with complex
1551 # from_self() queries, as query.subquery() / .statement
1552 # has to do the full compile context for multiply-nested
1553 # from_self() (Neutron) - see test_subqload_from_self
1554 # for demo.
1555 q2 = query.Query.__new__(query.Query)
1556 q2.__dict__.update(q.__dict__)
1557 q = q2
1558
1559 # set the query's "FROM" list explicitly to what the
1560 # FROM list would be in any case, as we will be limiting
1561 # the columns in the SELECT list which may no longer include
1562 # all entities mentioned in things like WHERE, JOIN, etc.
1563 if not q._from_obj:
1564 q._enable_assertions = False
1565 q.select_from.non_generative(
1566 q,
1567 *{
1568 ent["entity"]
1569 for ent in _column_descriptions(
1570 orig_query, compile_state=orig_compile_state
1571 )
1572 if ent["entity"] is not None
1573 },
1574 )
1575
1576 # select from the identity columns of the outer (specifically, these
1577 # are the 'local_cols' of the property). This will remove other
1578 # columns from the query that might suggest the right entity which is
1579 # why we do set select_from above. The attributes we have are
1580 # coerced and adapted using the original query's adapter, which is
1581 # needed only for the case of adapting a subclass column to
1582 # that of a polymorphic selectable, e.g. we have
1583 # Engineer.primary_language and the entity is Person. All other
1584 # adaptations, e.g. from_self, select_entity_from(), will occur
1585 # within the new query when it compiles, as the compile_state we are
1586 # using here is only a partial one. If the subqueryload is from a
1587 # with_polymorphic() or other aliased() object, left_attr will already
1588 # be the correct attributes so no adaptation is needed.
1589 target_cols = orig_compile_state._adapt_col_list(
1590 [
1591 sql.coercions.expect(sql.roles.ColumnsClauseRole, o)
1592 for o in leftmost_attr
1593 ],
1594 orig_compile_state._get_current_adapter(),
1595 )
1596 q._raw_columns = target_cols
1597
1598 distinct_target_key = leftmost_relationship.distinct_target_key
1599
1600 if distinct_target_key is True:
1601 q._distinct = True
1602 elif distinct_target_key is None:
1603 # if target_cols refer to a non-primary key or only
1604 # part of a composite primary key, set the q as distinct
1605 for t in {c.table for c in target_cols}:
1606 if not set(target_cols).issuperset(t.primary_key):
1607 q._distinct = True
1608 break
1609
1610 # don't need ORDER BY if no limit/offset
1611 if not q._has_row_limiting_clause:
1612 q._order_by_clauses = ()
1613
1614 if q._distinct is True and q._order_by_clauses:
1615 # the logic to automatically add the order by columns to the query
1616 # when distinct is True is deprecated in the query
1617 to_add = sql_util.expand_column_list_from_order_by(
1618 target_cols, q._order_by_clauses
1619 )
1620 if to_add:
1621 q._set_entities(target_cols + to_add)
1622
1623 # the original query now becomes a subquery
1624 # which we'll join onto.
1625 # LEGACY: as "q" is a Query, the before_compile() event is invoked
1626 # here.
1627 embed_q = q.set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL).subquery()
1628 left_alias = orm_util.AliasedClass(
1629 leftmost_mapper, embed_q, use_mapper_path=True
1630 )
1631 return left_alias
1632
1633 def _prep_for_joins(self, left_alias, subq_path):
1634 # figure out what's being joined. a.k.a. the fun part
1635 to_join = []
1636 pairs = list(subq_path.pairs())
1637
1638 for i, (mapper, prop) in enumerate(pairs):
1639 if i > 0:
1640 # look at the previous mapper in the chain -
1641 # if it is as or more specific than this prop's
1642 # mapper, use that instead.
1643 # note we have an assumption here that
1644 # the non-first element is always going to be a mapper,
1645 # not an AliasedClass
1646
1647 prev_mapper = pairs[i - 1][1].mapper
1648 to_append = prev_mapper if prev_mapper.isa(mapper) else mapper
1649 else:
1650 to_append = mapper
1651
1652 to_join.append((to_append, prop.key))
1653
1654 # determine the immediate parent class we are joining from,
1655 # which needs to be aliased.
1656
1657 if len(to_join) < 2:
1658 # in the case of a one level eager load, this is the
1659 # leftmost "left_alias".
1660 parent_alias = left_alias
1661 else:
1662 info = inspect(to_join[-1][0])
1663 if info.is_aliased_class:
1664 parent_alias = info.entity
1665 else:
1666 # alias a plain mapper as we may be
1667 # joining multiple times
1668 parent_alias = orm_util.AliasedClass(
1669 info.entity, use_mapper_path=True
1670 )
1671
1672 local_cols = self.parent_property.local_columns
1673
1674 local_attr = [
1675 getattr(parent_alias, self.parent._columntoproperty[c].key)
1676 for c in local_cols
1677 ]
1678 return to_join, local_attr, parent_alias
1679
1680 def _apply_joins(
1681 self, q, to_join, left_alias, parent_alias, effective_entity
1682 ):
1683 ltj = len(to_join)
1684 if ltj == 1:
1685 to_join = [
1686 getattr(left_alias, to_join[0][1]).of_type(effective_entity)
1687 ]
1688 elif ltj == 2:
1689 to_join = [
1690 getattr(left_alias, to_join[0][1]).of_type(parent_alias),
1691 getattr(parent_alias, to_join[-1][1]).of_type(
1692 effective_entity
1693 ),
1694 ]
1695 elif ltj > 2:
1696 middle = [
1697 (
1698 (
1699 orm_util.AliasedClass(item[0])
1700 if not inspect(item[0]).is_aliased_class
1701 else item[0].entity
1702 ),
1703 item[1],
1704 )
1705 for item in to_join[1:-1]
1706 ]
1707 inner = []
1708
1709 while middle:
1710 item = middle.pop(0)
1711 attr = getattr(item[0], item[1])
1712 if middle:
1713 attr = attr.of_type(middle[0][0])
1714 else:
1715 attr = attr.of_type(parent_alias)
1716
1717 inner.append(attr)
1718
1719 to_join = (
1720 [getattr(left_alias, to_join[0][1]).of_type(inner[0].parent)]
1721 + inner
1722 + [
1723 getattr(parent_alias, to_join[-1][1]).of_type(
1724 effective_entity
1725 )
1726 ]
1727 )
1728
1729 for attr in to_join:
1730 q = q.join(attr)
1731
1732 return q
1733
1734 def _setup_options(
1735 self,
1736 context,
1737 q,
1738 subq_path,
1739 rewritten_path,
1740 orig_query,
1741 effective_entity,
1742 loadopt,
1743 ):
1744 # note that because the subqueryload object
1745 # does not re-use the cached query, instead always making
1746 # use of the current invoked query, while we have two queries
1747 # here (orig and context.query), they are both non-cached
1748 # queries and we can transfer the options as is without
1749 # adjusting for new criteria. Some work on #6881 / #6889
1750 # brought this into question.
1751 new_options = orig_query._with_options
1752
1753 if loadopt and loadopt._extra_criteria:
1754 new_options += (
1755 orm_util.LoaderCriteriaOption(
1756 self.entity,
1757 loadopt._generate_extra_criteria(context),
1758 ),
1759 )
1760
1761 # propagate loader options etc. to the new query.
1762 # these will fire relative to subq_path.
1763 q = q._with_current_path(rewritten_path)
1764 q = q.options(*new_options)
1765
1766 return q
1767
1768 def _setup_outermost_orderby(self, q):
1769 if self.parent_property.order_by:
1770
1771 def _setup_outermost_orderby(compile_context):
1772 compile_context.eager_order_by += tuple(
1773 util.to_list(self.parent_property.order_by)
1774 )
1775
1776 q = q._add_compile_state_func(
1777 _setup_outermost_orderby, self.parent_property
1778 )
1779
1780 return q
1781
1782 class _SubqCollections:
1783 """Given a :class:`_query.Query` used to emit the "subquery load",
1784 provide a load interface that executes the query at the
1785 first moment a value is needed.
1786
1787 """
1788
1789 __slots__ = (
1790 "session",
1791 "execution_options",
1792 "load_options",
1793 "params",
1794 "subq",
1795 "_data",
1796 )
1797
1798 def __init__(self, context, subq):
1799 # avoid creating a cycle by storing context
1800 # even though that's preferable
1801 self.session = context.session
1802 self.execution_options = context.execution_options
1803 self.load_options = context.load_options
1804 self.params = context.params or {}
1805 self.subq = subq
1806 self._data = None
1807
1808 def get(self, key, default):
1809 if self._data is None:
1810 self._load()
1811 return self._data.get(key, default)
1812
1813 def _load(self):
1814 self._data = collections.defaultdict(list)
1815
1816 q = self.subq
1817 assert q.session is None
1818
1819 q = q.with_session(self.session)
1820
1821 if self.load_options._populate_existing:
1822 q = q.populate_existing()
1823 # to work with baked query, the parameters may have been
1824 # updated since this query was created, so take these into account
1825
1826 rows = list(q.params(self.params))
1827 for k, v in itertools.groupby(rows, lambda x: x[1:]):
1828 self._data[k].extend(vv[0] for vv in v)
1829
1830 def loader(self, state, dict_, row):
1831 if self._data is None:
1832 self._load()
1833
1834 def _setup_query_from_rowproc(
1835 self,
1836 context,
1837 query_entity,
1838 path,
1839 entity,
1840 loadopt,
1841 adapter,
1842 ):
1843 compile_state = context.compile_state
1844 if (
1845 not compile_state.compile_options._enable_eagerloads
1846 or compile_state.compile_options._for_refresh_state
1847 ):
1848 return
1849
1850 orig_query_entity_index = compile_state._entities.index(query_entity)
1851 context.loaders_require_buffering = True
1852
1853 path = path[self.parent_property]
1854
1855 # build up a path indicating the path from the leftmost
1856 # entity to the thing we're subquery loading.
1857 with_poly_entity = path.get(
1858 compile_state.attributes, "path_with_polymorphic", None
1859 )
1860 if with_poly_entity is not None:
1861 effective_entity = with_poly_entity
1862 else:
1863 effective_entity = self.entity
1864
1865 subq_path, rewritten_path = context.query._execution_options.get(
1866 ("subquery_paths", None),
1867 (orm_util.PathRegistry.root, orm_util.PathRegistry.root),
1868 )
1869 is_root = subq_path is orm_util.PathRegistry.root
1870 subq_path = subq_path + path
1871 rewritten_path = rewritten_path + path
1872
1873 # use the current query being invoked, not the compile state
1874 # one. this is so that we get the current parameters. however,
1875 # it means we can't use the existing compile state, we have to make
1876 # a new one. other approaches include possibly using the
1877 # compiled query but swapping the params, seems only marginally
1878 # less time spent but more complicated
1879 orig_query = context.query._execution_options.get(
1880 ("orig_query", _SubqueryLoader), context.query
1881 )
1882
1883 # make a new compile_state for the query that's probably cached, but
1884 # we're sort of undoing a bit of that caching :(
1885 compile_state_cls = _ORMCompileState._get_plugin_class_for_plugin(
1886 orig_query, "orm"
1887 )
1888
1889 if orig_query._is_lambda_element:
1890 if context.load_options._lazy_loaded_from is None:
1891 util.warn(
1892 'subqueryloader for "%s" must invoke lambda callable '
1893 "at %r in "
1894 "order to produce a new query, decreasing the efficiency "
1895 "of caching for this statement. Consider using "
1896 "selectinload() for more effective full-lambda caching"
1897 % (self, orig_query)
1898 )
1899 orig_query = orig_query._resolved
1900
1901 # this is the more "quick" version, however it's not clear how
1902 # much of this we need. in particular I can't get a test to
1903 # fail if the "set_base_alias" is missing and not sure why that is.
1904 orig_compile_state = compile_state_cls._create_entities_collection(
1905 orig_query, legacy=False
1906 )
1907
1908 (
1909 leftmost_mapper,
1910 leftmost_attr,
1911 leftmost_relationship,
1912 rewritten_path,
1913 ) = self._get_leftmost(
1914 orig_query_entity_index,
1915 rewritten_path,
1916 orig_compile_state,
1917 is_root,
1918 )
1919
1920 # generate a new Query from the original, then
1921 # produce a subquery from it.
1922 left_alias = self._generate_from_original_query(
1923 orig_compile_state,
1924 orig_query,
1925 leftmost_mapper,
1926 leftmost_attr,
1927 leftmost_relationship,
1928 entity,
1929 )
1930
1931 # generate another Query that will join the
1932 # left alias to the target relationships.
1933 # basically doing a longhand
1934 # "from_self()". (from_self() itself not quite industrial
1935 # strength enough for all contingencies...but very close)
1936
1937 q = query.Query(effective_entity)
1938
1939 q._execution_options = context.query._execution_options.merge_with(
1940 context.execution_options,
1941 {
1942 ("orig_query", _SubqueryLoader): orig_query,
1943 ("subquery_paths", None): (subq_path, rewritten_path),
1944 },
1945 )
1946
1947 q = q._set_enable_single_crit(False)
1948 to_join, local_attr, parent_alias = self._prep_for_joins(
1949 left_alias, subq_path
1950 )
1951
1952 q = q.add_columns(*local_attr)
1953 q = self._apply_joins(
1954 q, to_join, left_alias, parent_alias, effective_entity
1955 )
1956
1957 q = self._setup_options(
1958 context,
1959 q,
1960 subq_path,
1961 rewritten_path,
1962 orig_query,
1963 effective_entity,
1964 loadopt,
1965 )
1966 q = self._setup_outermost_orderby(q)
1967
1968 return q
1969
1970 def create_row_processor(
1971 self,
1972 context,
1973 query_entity,
1974 path,
1975 loadopt,
1976 mapper,
1977 result,
1978 adapter,
1979 populators,
1980 ):
1981 if (
1982 loadopt
1983 and context.compile_state.statement is not None
1984 and context.compile_state.statement.is_dml
1985 ):
1986 util.warn_deprecated(
1987 "The subqueryload loader option is not compatible with DML "
1988 "statements such as INSERT, UPDATE. Only SELECT may be used."
1989 "This warning will become an exception in a future release.",
1990 "2.0",
1991 )
1992
1993 if context.refresh_state:
1994 return self._immediateload_create_row_processor(
1995 context,
1996 query_entity,
1997 path,
1998 loadopt,
1999 mapper,
2000 result,
2001 adapter,
2002 populators,
2003 )
2004
2005 _, run_loader, _, _ = self._setup_for_recursion(
2006 context, path, loadopt, self.join_depth
2007 )
2008 if not run_loader:
2009 return
2010
2011 if not isinstance(context.compile_state, _ORMSelectCompileState):
2012 # issue 7505 - subqueryload() in 1.3 and previous would silently
2013 # degrade for from_statement() without warning. this behavior
2014 # is restored here
2015 return
2016
2017 if not self.parent.class_manager[self.key].impl.supports_population:
2018 raise sa_exc.InvalidRequestError(
2019 "'%s' does not support object "
2020 "population - eager loading cannot be applied." % self
2021 )
2022
2023 # a little dance here as the "path" is still something that only
2024 # semi-tracks the exact series of things we are loading, still not
2025 # telling us about with_polymorphic() and stuff like that when it's at
2026 # the root.. the initial MapperEntity is more accurate for this case.
2027 if len(path) == 1:
2028 if not orm_util._entity_isa(query_entity.entity_zero, self.parent):
2029 return
2030 elif not orm_util._entity_isa(path[-1], self.parent):
2031 return
2032
2033 subq = self._setup_query_from_rowproc(
2034 context,
2035 query_entity,
2036 path,
2037 path[-1],
2038 loadopt,
2039 adapter,
2040 )
2041
2042 if subq is None:
2043 return
2044
2045 assert subq.session is None
2046
2047 path = path[self.parent_property]
2048
2049 local_cols = self.parent_property.local_columns
2050
2051 # cache the loaded collections in the context
2052 # so that inheriting mappers don't re-load when they
2053 # call upon create_row_processor again
2054 collections = path.get(context.attributes, "collections")
2055 if collections is None:
2056 collections = self._SubqCollections(context, subq)
2057 path.set(context.attributes, "collections", collections)
2058
2059 if adapter:
2060 local_cols = [adapter.columns[c] for c in local_cols]
2061
2062 if self.uselist:
2063 self._create_collection_loader(
2064 context, result, collections, local_cols, populators
2065 )
2066 else:
2067 self._create_scalar_loader(
2068 context, result, collections, local_cols, populators
2069 )
2070
2071 def _create_collection_loader(
2072 self, context, result, collections, local_cols, populators
2073 ):
2074 tuple_getter = result._tuple_getter(local_cols)
2075
2076 def load_collection_from_subq(state, dict_, row):
2077 collection = collections.get(tuple_getter(row), ())
2078 state.get_impl(self.key).set_committed_value(
2079 state, dict_, collection
2080 )
2081
2082 def load_collection_from_subq_existing_row(state, dict_, row):
2083 if self.key not in dict_:
2084 load_collection_from_subq(state, dict_, row)
2085
2086 populators["new"].append((self.key, load_collection_from_subq))
2087 populators["existing"].append(
2088 (self.key, load_collection_from_subq_existing_row)
2089 )
2090
2091 if context.invoke_all_eagers:
2092 populators["eager"].append((self.key, collections.loader))
2093
2094 def _create_scalar_loader(
2095 self, context, result, collections, local_cols, populators
2096 ):
2097 tuple_getter = result._tuple_getter(local_cols)
2098
2099 def load_scalar_from_subq(state, dict_, row):
2100 collection = collections.get(tuple_getter(row), (None,))
2101 if len(collection) > 1:
2102 util.warn(
2103 "Multiple rows returned with "
2104 "uselist=False for eagerly-loaded attribute '%s' " % self
2105 )
2106
2107 scalar = collection[0]
2108 state.get_impl(self.key).set_committed_value(state, dict_, scalar)
2109
2110 def load_scalar_from_subq_existing_row(state, dict_, row):
2111 if self.key not in dict_:
2112 load_scalar_from_subq(state, dict_, row)
2113
2114 populators["new"].append((self.key, load_scalar_from_subq))
2115 populators["existing"].append(
2116 (self.key, load_scalar_from_subq_existing_row)
2117 )
2118 if context.invoke_all_eagers:
2119 populators["eager"].append((self.key, collections.loader))
2120
2121
2122@log.class_logger
2123@relationships.RelationshipProperty.strategy_for(lazy="joined")
2124@relationships.RelationshipProperty.strategy_for(lazy=False)
2125class _JoinedLoader(_AbstractRelationshipLoader):
2126 """Provide loading behavior for a :class:`.Relationship`
2127 using joined eager loading.
2128
2129 """
2130
2131 __slots__ = "join_depth"
2132
2133 def __init__(self, parent, strategy_key):
2134 super().__init__(parent, strategy_key)
2135 self.join_depth = self.parent_property.join_depth
2136
2137 def init_class_attribute(self, mapper):
2138 self.parent_property._get_strategy(
2139 (("lazy", "select"),)
2140 ).init_class_attribute(mapper)
2141
2142 def setup_query(
2143 self,
2144 compile_state,
2145 query_entity,
2146 path,
2147 loadopt,
2148 adapter,
2149 column_collection=None,
2150 parentmapper=None,
2151 chained_from_outerjoin=False,
2152 **kwargs,
2153 ):
2154 """Add a left outer join to the statement that's being constructed."""
2155
2156 if not compile_state.compile_options._enable_eagerloads:
2157 return
2158 elif (
2159 loadopt
2160 and compile_state.statement is not None
2161 and compile_state.statement.is_dml
2162 ):
2163 util.warn_deprecated(
2164 "The joinedload loader option is not compatible with DML "
2165 "statements such as INSERT, UPDATE. Only SELECT may be used."
2166 "This warning will become an exception in a future release.",
2167 "2.0",
2168 )
2169 elif self.uselist:
2170 compile_state.multi_row_eager_loaders = True
2171
2172 path = path[self.parent_property]
2173
2174 user_defined_adapter = (
2175 self._init_user_defined_eager_proc(
2176 loadopt, compile_state, compile_state.attributes
2177 )
2178 if loadopt
2179 else False
2180 )
2181
2182 if user_defined_adapter is not False:
2183 # setup an adapter but dont create any JOIN, assume it's already
2184 # in the query
2185 (
2186 clauses,
2187 adapter,
2188 add_to_collection,
2189 ) = self._setup_query_on_user_defined_adapter(
2190 compile_state,
2191 query_entity,
2192 path,
2193 adapter,
2194 user_defined_adapter,
2195 )
2196
2197 # don't do "wrap" for multi-row, we want to wrap
2198 # limited/distinct SELECT,
2199 # because we want to put the JOIN on the outside.
2200
2201 else:
2202 # if not via query option, check for
2203 # a cycle
2204 if not path.contains(compile_state.attributes, "loader"):
2205 if self.join_depth:
2206 if path.length / 2 > self.join_depth:
2207 return
2208 elif path.contains_mapper(self.mapper):
2209 return
2210
2211 # add the JOIN and create an adapter
2212 (
2213 clauses,
2214 adapter,
2215 add_to_collection,
2216 chained_from_outerjoin,
2217 ) = self._generate_row_adapter(
2218 compile_state,
2219 query_entity,
2220 path,
2221 loadopt,
2222 adapter,
2223 column_collection,
2224 parentmapper,
2225 chained_from_outerjoin,
2226 )
2227
2228 # for multi-row, we want to wrap limited/distinct SELECT,
2229 # because we want to put the JOIN on the outside.
2230 compile_state.eager_adding_joins = True
2231
2232 with_poly_entity = path.get(
2233 compile_state.attributes, "path_with_polymorphic", None
2234 )
2235 if with_poly_entity is not None:
2236 with_polymorphic = inspect(
2237 with_poly_entity
2238 ).with_polymorphic_mappers
2239 else:
2240 with_polymorphic = None
2241
2242 path = path[self.entity]
2243
2244 loading._setup_entity_query(
2245 compile_state,
2246 self.mapper,
2247 query_entity,
2248 path,
2249 clauses,
2250 add_to_collection,
2251 with_polymorphic=with_polymorphic,
2252 parentmapper=self.mapper,
2253 chained_from_outerjoin=chained_from_outerjoin,
2254 )
2255
2256 has_nones = util.NONE_SET.intersection(compile_state.secondary_columns)
2257
2258 if has_nones:
2259 if with_poly_entity is not None:
2260 raise sa_exc.InvalidRequestError(
2261 "Detected unaliased columns when generating joined "
2262 "load. Make sure to use aliased=True or flat=True "
2263 "when using joined loading with with_polymorphic()."
2264 )
2265 else:
2266 compile_state.secondary_columns = [
2267 c for c in compile_state.secondary_columns if c is not None
2268 ]
2269
2270 def _init_user_defined_eager_proc(
2271 self, loadopt, compile_state, target_attributes
2272 ):
2273 # check if the opt applies at all
2274 if "eager_from_alias" not in loadopt.local_opts:
2275 # nope
2276 return False
2277
2278 path = loadopt.path.parent
2279
2280 # the option applies. check if the "user_defined_eager_row_processor"
2281 # has been built up.
2282 adapter = path.get(
2283 compile_state.attributes, "user_defined_eager_row_processor", False
2284 )
2285 if adapter is not False:
2286 # just return it
2287 return adapter
2288
2289 # otherwise figure it out.
2290 alias = loadopt.local_opts["eager_from_alias"]
2291 root_mapper, prop = path[-2:]
2292
2293 if alias is not None:
2294 if isinstance(alias, str):
2295 alias = prop.target.alias(alias)
2296 adapter = orm_util.ORMAdapter(
2297 orm_util._TraceAdaptRole.JOINEDLOAD_USER_DEFINED_ALIAS,
2298 prop.mapper,
2299 selectable=alias,
2300 equivalents=prop.mapper._equivalent_columns,
2301 limit_on_entity=False,
2302 )
2303 else:
2304 if path.contains(
2305 compile_state.attributes, "path_with_polymorphic"
2306 ):
2307 with_poly_entity = path.get(
2308 compile_state.attributes, "path_with_polymorphic"
2309 )
2310 adapter = orm_util.ORMAdapter(
2311 orm_util._TraceAdaptRole.JOINEDLOAD_PATH_WITH_POLYMORPHIC,
2312 with_poly_entity,
2313 equivalents=prop.mapper._equivalent_columns,
2314 )
2315 else:
2316 adapter = compile_state._polymorphic_adapters.get(
2317 prop.mapper, None
2318 )
2319 path.set(
2320 target_attributes,
2321 "user_defined_eager_row_processor",
2322 adapter,
2323 )
2324
2325 return adapter
2326
2327 def _setup_query_on_user_defined_adapter(
2328 self, context, entity, path, adapter, user_defined_adapter
2329 ):
2330 # apply some more wrapping to the "user defined adapter"
2331 # if we are setting up the query for SQL render.
2332 adapter = entity._get_entity_clauses(context)
2333
2334 if adapter and user_defined_adapter:
2335 user_defined_adapter = user_defined_adapter.wrap(adapter)
2336 path.set(
2337 context.attributes,
2338 "user_defined_eager_row_processor",
2339 user_defined_adapter,
2340 )
2341 elif adapter:
2342 user_defined_adapter = adapter
2343 path.set(
2344 context.attributes,
2345 "user_defined_eager_row_processor",
2346 user_defined_adapter,
2347 )
2348
2349 add_to_collection = context.primary_columns
2350 return user_defined_adapter, adapter, add_to_collection
2351
2352 def _generate_row_adapter(
2353 self,
2354 compile_state,
2355 entity,
2356 path,
2357 loadopt,
2358 adapter,
2359 column_collection,
2360 parentmapper,
2361 chained_from_outerjoin,
2362 ):
2363 with_poly_entity = path.get(
2364 compile_state.attributes, "path_with_polymorphic", None
2365 )
2366 if with_poly_entity:
2367 to_adapt = with_poly_entity
2368 else:
2369 insp = inspect(self.entity)
2370 if insp.is_aliased_class:
2371 alt_selectable = insp.selectable
2372 else:
2373 alt_selectable = None
2374
2375 to_adapt = orm_util.AliasedClass(
2376 self.mapper,
2377 alias=(
2378 alt_selectable._anonymous_fromclause(flat=True)
2379 if alt_selectable is not None
2380 else None
2381 ),
2382 flat=True,
2383 use_mapper_path=True,
2384 )
2385
2386 to_adapt_insp = inspect(to_adapt)
2387
2388 clauses = to_adapt_insp._memo(
2389 ("joinedloader_ormadapter", self),
2390 orm_util.ORMAdapter,
2391 orm_util._TraceAdaptRole.JOINEDLOAD_MEMOIZED_ADAPTER,
2392 to_adapt_insp,
2393 equivalents=self.mapper._equivalent_columns,
2394 adapt_required=True,
2395 allow_label_resolve=False,
2396 anonymize_labels=True,
2397 )
2398
2399 assert clauses.is_aliased_class
2400
2401 innerjoin = (
2402 loadopt.local_opts.get("innerjoin", self.parent_property.innerjoin)
2403 if loadopt is not None
2404 else self.parent_property.innerjoin
2405 )
2406
2407 if not innerjoin:
2408 # if this is an outer join, all non-nested eager joins from
2409 # this path must also be outer joins
2410 chained_from_outerjoin = True
2411
2412 compile_state.create_eager_joins.append(
2413 (
2414 self._create_eager_join,
2415 entity,
2416 path,
2417 adapter,
2418 parentmapper,
2419 clauses,
2420 innerjoin,
2421 chained_from_outerjoin,
2422 loadopt._extra_criteria if loadopt else (),
2423 )
2424 )
2425
2426 add_to_collection = compile_state.secondary_columns
2427 path.set(compile_state.attributes, "eager_row_processor", clauses)
2428
2429 return clauses, adapter, add_to_collection, chained_from_outerjoin
2430
2431 def _create_eager_join(
2432 self,
2433 compile_state,
2434 query_entity,
2435 path,
2436 adapter,
2437 parentmapper,
2438 clauses,
2439 innerjoin,
2440 chained_from_outerjoin,
2441 extra_criteria,
2442 ):
2443 if parentmapper is None:
2444 localparent = query_entity.mapper
2445 else:
2446 localparent = parentmapper
2447
2448 # whether or not the Query will wrap the selectable in a subquery,
2449 # and then attach eager load joins to that (i.e., in the case of
2450 # LIMIT/OFFSET etc.)
2451 should_nest_selectable = (
2452 compile_state.multi_row_eager_loaders
2453 and compile_state._should_nest_selectable
2454 )
2455
2456 query_entity_key = None
2457
2458 if (
2459 query_entity not in compile_state.eager_joins
2460 and not should_nest_selectable
2461 and compile_state.from_clauses
2462 ):
2463 indexes = sql_util.find_left_clause_that_matches_given(
2464 compile_state.from_clauses, query_entity.selectable
2465 )
2466
2467 if len(indexes) > 1:
2468 # for the eager load case, I can't reproduce this right
2469 # now. For query.join() I can.
2470 raise sa_exc.InvalidRequestError(
2471 "Can't identify which query entity in which to joined "
2472 "eager load from. Please use an exact match when "
2473 "specifying the join path."
2474 )
2475
2476 if indexes:
2477 clause = compile_state.from_clauses[indexes[0]]
2478 # join to an existing FROM clause on the query.
2479 # key it to its list index in the eager_joins dict.
2480 # Query._compile_context will adapt as needed and
2481 # append to the FROM clause of the select().
2482 query_entity_key, default_towrap = indexes[0], clause
2483
2484 if query_entity_key is None:
2485 query_entity_key, default_towrap = (
2486 query_entity,
2487 query_entity.selectable,
2488 )
2489
2490 towrap = compile_state.eager_joins.setdefault(
2491 query_entity_key, default_towrap
2492 )
2493
2494 if adapter:
2495 if getattr(adapter, "is_aliased_class", False):
2496 # joining from an adapted entity. The adapted entity
2497 # might be a "with_polymorphic", so resolve that to our
2498 # specific mapper's entity before looking for our attribute
2499 # name on it.
2500 efm = adapter.aliased_insp._entity_for_mapper(
2501 localparent
2502 if localparent.isa(self.parent)
2503 else self.parent
2504 )
2505
2506 # look for our attribute on the adapted entity, else fall back
2507 # to our straight property
2508 onclause = getattr(efm.entity, self.key, self.parent_property)
2509 else:
2510 onclause = getattr(
2511 orm_util.AliasedClass(
2512 self.parent, adapter.selectable, use_mapper_path=True
2513 ),
2514 self.key,
2515 self.parent_property,
2516 )
2517
2518 else:
2519 onclause = self.parent_property
2520
2521 assert clauses.is_aliased_class
2522
2523 attach_on_outside = (
2524 not chained_from_outerjoin
2525 or not innerjoin
2526 or innerjoin == "unnested"
2527 or query_entity.entity_zero.represents_outer_join
2528 )
2529
2530 extra_join_criteria = extra_criteria
2531 additional_entity_criteria = compile_state.global_attributes.get(
2532 ("additional_entity_criteria", self.mapper), ()
2533 )
2534 if additional_entity_criteria:
2535 extra_join_criteria += tuple(
2536 ae._resolve_where_criteria(self.mapper)
2537 for ae in additional_entity_criteria
2538 if ae.propagate_to_loaders
2539 )
2540
2541 if attach_on_outside:
2542 # this is the "classic" eager join case.
2543 eagerjoin = orm_util._ORMJoin(
2544 towrap,
2545 clauses.aliased_insp,
2546 onclause,
2547 isouter=not innerjoin
2548 or query_entity.entity_zero.represents_outer_join
2549 or (chained_from_outerjoin and isinstance(towrap, sql.Join)),
2550 _left_memo=self.parent,
2551 _right_memo=path[self.mapper],
2552 _extra_criteria=extra_join_criteria,
2553 )
2554 else:
2555 # all other cases are innerjoin=='nested' approach
2556 eagerjoin = self._splice_nested_inner_join(
2557 path, path[-2], towrap, clauses, onclause, extra_join_criteria
2558 )
2559
2560 compile_state.eager_joins[query_entity_key] = eagerjoin
2561
2562 # send a hint to the Query as to where it may "splice" this join
2563 eagerjoin.stop_on = query_entity.selectable
2564
2565 if not parentmapper:
2566 # for parentclause that is the non-eager end of the join,
2567 # ensure all the parent cols in the primaryjoin are actually
2568 # in the
2569 # columns clause (i.e. are not deferred), so that aliasing applied
2570 # by the Query propagates those columns outward.
2571 # This has the effect
2572 # of "undefering" those columns.
2573 for col in sql_util._find_columns(
2574 self.parent_property.primaryjoin
2575 ):
2576 if localparent.persist_selectable.c.contains_column(col):
2577 if adapter:
2578 col = adapter.columns[col]
2579 compile_state._append_dedupe_col_collection(
2580 col, compile_state.primary_columns
2581 )
2582
2583 if self.parent_property.order_by:
2584 compile_state.eager_order_by += tuple(
2585 (eagerjoin._target_adapter.copy_and_process)(
2586 util.to_list(self.parent_property.order_by)
2587 )
2588 )
2589
2590 def _splice_nested_inner_join(
2591 self,
2592 path,
2593 entity_we_want_to_splice_onto,
2594 join_obj,
2595 clauses,
2596 onclause,
2597 extra_criteria,
2598 entity_inside_join_structure: Union[
2599 Mapper, None, Literal[False]
2600 ] = False,
2601 detected_existing_path: Optional[path_registry.PathRegistry] = None,
2602 ):
2603 # recursive fn to splice a nested join into an existing one.
2604 # entity_inside_join_structure=False means this is the outermost call,
2605 # and it should return a value. entity_inside_join_structure=<mapper>
2606 # indicates we've descended into a join and are looking at a FROM
2607 # clause representing this mapper; if this is not
2608 # entity_we_want_to_splice_onto then return None to end the recursive
2609 # branch
2610
2611 assert entity_we_want_to_splice_onto is path[-2]
2612
2613 if entity_inside_join_structure is False:
2614 assert isinstance(join_obj, orm_util._ORMJoin)
2615
2616 if isinstance(join_obj, sql.selectable.FromGrouping):
2617 # FromGrouping - continue descending into the structure
2618 return self._splice_nested_inner_join(
2619 path,
2620 entity_we_want_to_splice_onto,
2621 join_obj.element,
2622 clauses,
2623 onclause,
2624 extra_criteria,
2625 entity_inside_join_structure,
2626 )
2627 elif isinstance(join_obj, orm_util._ORMJoin):
2628 # _ORMJoin - continue descending into the structure
2629
2630 join_right_path = join_obj._right_memo
2631
2632 # see if right side of join is viable
2633 target_join = self._splice_nested_inner_join(
2634 path,
2635 entity_we_want_to_splice_onto,
2636 join_obj.right,
2637 clauses,
2638 onclause,
2639 extra_criteria,
2640 entity_inside_join_structure=(
2641 join_right_path[-1].mapper
2642 if join_right_path is not None
2643 else None
2644 ),
2645 )
2646
2647 if target_join is not None:
2648 # for a right splice, attempt to flatten out
2649 # a JOIN b JOIN c JOIN .. to avoid needless
2650 # parenthesis nesting
2651 if not join_obj.isouter and not target_join.isouter:
2652 eagerjoin = join_obj._splice_into_center(target_join)
2653 else:
2654 eagerjoin = orm_util._ORMJoin(
2655 join_obj.left,
2656 target_join,
2657 join_obj.onclause,
2658 isouter=join_obj.isouter,
2659 _left_memo=join_obj._left_memo,
2660 )
2661
2662 eagerjoin._target_adapter = target_join._target_adapter
2663 return eagerjoin
2664
2665 else:
2666 # see if left side of join is viable
2667 target_join = self._splice_nested_inner_join(
2668 path,
2669 entity_we_want_to_splice_onto,
2670 join_obj.left,
2671 clauses,
2672 onclause,
2673 extra_criteria,
2674 entity_inside_join_structure=join_obj._left_memo,
2675 detected_existing_path=join_right_path,
2676 )
2677
2678 if target_join is not None:
2679 eagerjoin = orm_util._ORMJoin(
2680 target_join,
2681 join_obj.right,
2682 join_obj.onclause,
2683 isouter=join_obj.isouter,
2684 _right_memo=join_obj._right_memo,
2685 )
2686 eagerjoin._target_adapter = target_join._target_adapter
2687 return eagerjoin
2688
2689 # neither side viable, return None, or fail if this was the top
2690 # most call
2691 if entity_inside_join_structure is False:
2692 assert (
2693 False
2694 ), "assertion failed attempting to produce joined eager loads"
2695 return None
2696
2697 # reached an endpoint (e.g. a table that's mapped, or an alias of that
2698 # table). determine if we can use this endpoint to splice onto
2699
2700 # is this the entity we want to splice onto in the first place?
2701 if not entity_we_want_to_splice_onto.isa(entity_inside_join_structure):
2702 return None
2703
2704 # path check. if we know the path how this join endpoint got here,
2705 # lets look at our path we are satisfying and see if we're in the
2706 # wrong place. This is specifically for when our entity may
2707 # appear more than once in the path, issue #11449
2708 # updated in issue #11965.
2709 if detected_existing_path and len(detected_existing_path) > 2:
2710 # this assertion is currently based on how this call is made,
2711 # where given a join_obj, the call will have these parameters as
2712 # entity_inside_join_structure=join_obj._left_memo
2713 # and entity_inside_join_structure=join_obj._right_memo.mapper
2714 assert detected_existing_path[-3] is entity_inside_join_structure
2715
2716 # from that, see if the path we are targeting matches the
2717 # "existing" path of this join all the way up to the midpoint
2718 # of this join object (e.g. the relationship).
2719 # if not, then this is not our target
2720 #
2721 # a test condition where this test is false looks like:
2722 #
2723 # desired splice: Node->kind->Kind
2724 # path of desired splice: NodeGroup->nodes->Node->kind
2725 # path we've located: NodeGroup->nodes->Node->common_node->Node
2726 #
2727 # above, because we want to splice kind->Kind onto
2728 # NodeGroup->nodes->Node, this is not our path because it actually
2729 # goes more steps than we want into self-referential
2730 # ->common_node->Node
2731 #
2732 # a test condition where this test is true looks like:
2733 #
2734 # desired splice: B->c2s->C2
2735 # path of desired splice: A->bs->B->c2s
2736 # path we've located: A->bs->B->c1s->C1
2737 #
2738 # above, we want to splice c2s->C2 onto B, and the located path
2739 # shows that the join ends with B->c1s->C1. so we will
2740 # add another join onto that, which would create a "branch" that
2741 # we might represent in a pseudopath as:
2742 #
2743 # B->c1s->C1
2744 # ->c2s->C2
2745 #
2746 # i.e. A JOIN B ON <bs> JOIN C1 ON <c1s>
2747 # JOIN C2 ON <c2s>
2748 #
2749
2750 if detected_existing_path[0:-2] != path.path[0:-1]:
2751 return None
2752
2753 return orm_util._ORMJoin(
2754 join_obj,
2755 clauses.aliased_insp,
2756 onclause,
2757 isouter=False,
2758 _left_memo=entity_inside_join_structure,
2759 _right_memo=path[path[-1].mapper],
2760 _extra_criteria=extra_criteria,
2761 )
2762
2763 def _create_eager_adapter(self, context, result, adapter, path, loadopt):
2764 compile_state = context.compile_state
2765
2766 user_defined_adapter = (
2767 self._init_user_defined_eager_proc(
2768 loadopt, compile_state, context.attributes
2769 )
2770 if loadopt
2771 else False
2772 )
2773
2774 if user_defined_adapter is not False:
2775 decorator = user_defined_adapter
2776 # user defined eagerloads are part of the "primary"
2777 # portion of the load.
2778 # the adapters applied to the Query should be honored.
2779 if compile_state.compound_eager_adapter and decorator:
2780 decorator = decorator.wrap(
2781 compile_state.compound_eager_adapter
2782 )
2783 elif compile_state.compound_eager_adapter:
2784 decorator = compile_state.compound_eager_adapter
2785 else:
2786 decorator = path.get(
2787 compile_state.attributes, "eager_row_processor"
2788 )
2789 if decorator is None:
2790 return False
2791
2792 if self.mapper._result_has_identity_key(result, decorator):
2793 return decorator
2794 else:
2795 # no identity key - don't return a row
2796 # processor, will cause a degrade to lazy
2797 return False
2798
2799 def create_row_processor(
2800 self,
2801 context,
2802 query_entity,
2803 path,
2804 loadopt,
2805 mapper,
2806 result,
2807 adapter,
2808 populators,
2809 ):
2810
2811 if not context.compile_state.compile_options._enable_eagerloads:
2812 return
2813
2814 if not self.parent.class_manager[self.key].impl.supports_population:
2815 raise sa_exc.InvalidRequestError(
2816 "'%s' does not support object "
2817 "population - eager loading cannot be applied." % self
2818 )
2819
2820 if self.uselist:
2821 context.loaders_require_uniquing = True
2822
2823 our_path = path[self.parent_property]
2824
2825 eager_adapter = self._create_eager_adapter(
2826 context, result, adapter, our_path, loadopt
2827 )
2828
2829 if eager_adapter is not False:
2830 key = self.key
2831
2832 _instance = loading._instance_processor(
2833 query_entity,
2834 self.mapper,
2835 context,
2836 result,
2837 our_path[self.entity],
2838 eager_adapter,
2839 )
2840
2841 if not self.uselist:
2842 self._create_scalar_loader(context, key, _instance, populators)
2843 else:
2844 self._create_collection_loader(
2845 context, key, _instance, populators
2846 )
2847 else:
2848 self.parent_property._get_strategy(
2849 (("lazy", "select"),)
2850 ).create_row_processor(
2851 context,
2852 query_entity,
2853 path,
2854 loadopt,
2855 mapper,
2856 result,
2857 adapter,
2858 populators,
2859 )
2860
2861 def _create_collection_loader(self, context, key, _instance, populators):
2862 def load_collection_from_joined_new_row(state, dict_, row):
2863 # note this must unconditionally clear out any existing collection.
2864 # an existing collection would be present only in the case of
2865 # populate_existing().
2866 collection = attributes.init_state_collection(state, dict_, key)
2867 result_list = util.UniqueAppender(
2868 collection, "append_without_event"
2869 )
2870 context.attributes[(state, key)] = result_list
2871 inst = _instance(row)
2872 if inst is not None:
2873 result_list.append(inst)
2874
2875 def load_collection_from_joined_existing_row(state, dict_, row):
2876 if (state, key) in context.attributes:
2877 result_list = context.attributes[(state, key)]
2878 else:
2879 # appender_key can be absent from context.attributes
2880 # with isnew=False when self-referential eager loading
2881 # is used; the same instance may be present in two
2882 # distinct sets of result columns
2883 collection = attributes.init_state_collection(
2884 state, dict_, key
2885 )
2886 result_list = util.UniqueAppender(
2887 collection, "append_without_event"
2888 )
2889 context.attributes[(state, key)] = result_list
2890 inst = _instance(row)
2891 if inst is not None:
2892 result_list.append(inst)
2893
2894 def load_collection_from_joined_exec(state, dict_, row):
2895 _instance(row)
2896
2897 populators["new"].append(
2898 (self.key, load_collection_from_joined_new_row)
2899 )
2900 populators["existing"].append(
2901 (self.key, load_collection_from_joined_existing_row)
2902 )
2903 if context.invoke_all_eagers:
2904 populators["eager"].append(
2905 (self.key, load_collection_from_joined_exec)
2906 )
2907
2908 def _create_scalar_loader(self, context, key, _instance, populators):
2909 def load_scalar_from_joined_new_row(state, dict_, row):
2910 # set a scalar object instance directly on the parent
2911 # object, bypassing InstrumentedAttribute event handlers.
2912 dict_[key] = _instance(row)
2913
2914 def load_scalar_from_joined_existing_row(state, dict_, row):
2915 # call _instance on the row, even though the object has
2916 # been created, so that we further descend into properties
2917 existing = _instance(row)
2918
2919 # conflicting value already loaded, this shouldn't happen
2920 if key in dict_:
2921 if existing is not dict_[key]:
2922 util.warn(
2923 "Multiple rows returned with "
2924 "uselist=False for eagerly-loaded attribute '%s' "
2925 % self
2926 )
2927 else:
2928 # this case is when one row has multiple loads of the
2929 # same entity (e.g. via aliasing), one has an attribute
2930 # that the other doesn't.
2931 dict_[key] = existing
2932
2933 def load_scalar_from_joined_exec(state, dict_, row):
2934 _instance(row)
2935
2936 populators["new"].append((self.key, load_scalar_from_joined_new_row))
2937 populators["existing"].append(
2938 (self.key, load_scalar_from_joined_existing_row)
2939 )
2940 if context.invoke_all_eagers:
2941 populators["eager"].append(
2942 (self.key, load_scalar_from_joined_exec)
2943 )
2944
2945
2946@log.class_logger
2947@relationships.RelationshipProperty.strategy_for(lazy="selectin")
2948class _SelectInLoader(_PostLoader, util.MemoizedSlots):
2949 __slots__ = (
2950 "join_depth",
2951 "omit_join",
2952 "_parent_alias",
2953 "_query_info",
2954 "_fallback_query_info",
2955 )
2956
2957 query_info = collections.namedtuple(
2958 "queryinfo",
2959 [
2960 "load_only_child",
2961 "load_with_join",
2962 "in_expr",
2963 "pk_cols",
2964 "zero_idx",
2965 "child_lookup_cols",
2966 ],
2967 )
2968
2969 _chunksize = 500
2970
2971 def __init__(self, parent, strategy_key):
2972 super().__init__(parent, strategy_key)
2973 self.join_depth = self.parent_property.join_depth
2974 is_m2o = self.parent_property.direction is interfaces.MANYTOONE
2975
2976 if self.parent_property.omit_join is not None:
2977 self.omit_join = self.parent_property.omit_join
2978 else:
2979 lazyloader = self.parent_property._get_strategy(
2980 (("lazy", "select"),)
2981 )
2982 if is_m2o:
2983 self.omit_join = lazyloader.use_get
2984 else:
2985 self.omit_join = self.parent._get_clause[0].compare(
2986 lazyloader._rev_lazywhere,
2987 use_proxies=True,
2988 compare_keys=False,
2989 equivalents=self.parent._equivalent_columns,
2990 )
2991
2992 if self.omit_join:
2993 if is_m2o:
2994 self._query_info = self._init_for_omit_join_m2o()
2995 self._fallback_query_info = self._init_for_join()
2996 else:
2997 self._query_info = self._init_for_omit_join()
2998 else:
2999 self._query_info = self._init_for_join()
3000
3001 def _init_for_omit_join(self):
3002 pk_to_fk = dict(
3003 self.parent_property._join_condition.local_remote_pairs
3004 )
3005 pk_to_fk.update(
3006 (equiv, pk_to_fk[k])
3007 for k in list(pk_to_fk)
3008 for equiv in self.parent._equivalent_columns.get(k, ())
3009 )
3010
3011 pk_cols = fk_cols = [
3012 pk_to_fk[col] for col in self.parent.primary_key if col in pk_to_fk
3013 ]
3014 if len(fk_cols) > 1:
3015 in_expr = sql.tuple_(*fk_cols)
3016 zero_idx = False
3017 else:
3018 in_expr = fk_cols[0]
3019 zero_idx = True
3020
3021 return self.query_info(False, False, in_expr, pk_cols, zero_idx, None)
3022
3023 def _init_for_omit_join_m2o(self):
3024 pk_cols = self.mapper.primary_key
3025 if len(pk_cols) > 1:
3026 in_expr = sql.tuple_(*pk_cols)
3027 zero_idx = False
3028 else:
3029 in_expr = pk_cols[0]
3030 zero_idx = True
3031
3032 lazyloader = self.parent_property._get_strategy((("lazy", "select"),))
3033 lookup_cols = [lazyloader._equated_columns[pk] for pk in pk_cols]
3034
3035 return self.query_info(
3036 True, False, in_expr, pk_cols, zero_idx, lookup_cols
3037 )
3038
3039 def _init_for_join(self):
3040 self._parent_alias = AliasedClass(self.parent.class_)
3041 pa_insp = inspect(self._parent_alias)
3042 pk_cols = [
3043 pa_insp._adapt_element(col) for col in self.parent.primary_key
3044 ]
3045 if len(pk_cols) > 1:
3046 in_expr = sql.tuple_(*pk_cols)
3047 zero_idx = False
3048 else:
3049 in_expr = pk_cols[0]
3050 zero_idx = True
3051 return self.query_info(False, True, in_expr, pk_cols, zero_idx, None)
3052
3053 def init_class_attribute(self, mapper):
3054 self.parent_property._get_strategy(
3055 (("lazy", "select"),)
3056 ).init_class_attribute(mapper)
3057
3058 def create_row_processor(
3059 self,
3060 context,
3061 query_entity,
3062 path,
3063 loadopt,
3064 mapper,
3065 result,
3066 adapter,
3067 populators,
3068 ):
3069 if context.refresh_state:
3070 return self._immediateload_create_row_processor(
3071 context,
3072 query_entity,
3073 path,
3074 loadopt,
3075 mapper,
3076 result,
3077 adapter,
3078 populators,
3079 )
3080
3081 (
3082 effective_path,
3083 run_loader,
3084 execution_options,
3085 recursion_depth,
3086 ) = self._setup_for_recursion(
3087 context, path, loadopt, join_depth=self.join_depth
3088 )
3089
3090 if not run_loader:
3091 return
3092
3093 if not context.compile_state.compile_options._enable_eagerloads:
3094 return
3095
3096 if not self.parent.class_manager[self.key].impl.supports_population:
3097 raise sa_exc.InvalidRequestError(
3098 "'%s' does not support object "
3099 "population - eager loading cannot be applied." % self
3100 )
3101
3102 # a little dance here as the "path" is still something that only
3103 # semi-tracks the exact series of things we are loading, still not
3104 # telling us about with_polymorphic() and stuff like that when it's at
3105 # the root.. the initial MapperEntity is more accurate for this case.
3106 if len(path) == 1:
3107 if not orm_util._entity_isa(query_entity.entity_zero, self.parent):
3108 return
3109 elif not orm_util._entity_isa(path[-1], self.parent):
3110 return
3111
3112 selectin_path = effective_path
3113
3114 path_w_prop = path[self.parent_property]
3115
3116 # build up a path indicating the path from the leftmost
3117 # entity to the thing we're subquery loading.
3118 with_poly_entity = path_w_prop.get(
3119 context.attributes, "path_with_polymorphic", None
3120 )
3121 if with_poly_entity is not None:
3122 effective_entity = inspect(with_poly_entity)
3123 else:
3124 effective_entity = self.entity
3125
3126 loading._PostLoad.callable_for_path(
3127 context,
3128 selectin_path,
3129 self.parent,
3130 self.parent_property,
3131 self._load_for_path,
3132 effective_entity,
3133 loadopt,
3134 recursion_depth,
3135 execution_options,
3136 )
3137
3138 def _load_for_path(
3139 self,
3140 context,
3141 path,
3142 states,
3143 load_only,
3144 effective_entity,
3145 loadopt,
3146 recursion_depth,
3147 execution_options,
3148 ):
3149 if load_only and self.key not in load_only:
3150 return
3151
3152 query_info = self._query_info
3153
3154 if query_info.load_only_child:
3155 our_states = collections.defaultdict(list)
3156 none_states = []
3157
3158 mapper = self.parent
3159
3160 for state, overwrite in states:
3161 state_dict = state.dict
3162 related_ident = tuple(
3163 mapper._get_state_attr_by_column(
3164 state,
3165 state_dict,
3166 lk,
3167 passive=attributes.PASSIVE_NO_FETCH,
3168 )
3169 for lk in query_info.child_lookup_cols
3170 )
3171 # if the loaded parent objects do not have the foreign key
3172 # to the related item loaded, then degrade into the joined
3173 # version of selectinload
3174 if LoaderCallableStatus.PASSIVE_NO_RESULT in related_ident:
3175 query_info = self._fallback_query_info
3176 break
3177
3178 # organize states into lists keyed to particular foreign
3179 # key values.
3180 if None not in related_ident:
3181 our_states[related_ident].append(
3182 (state, state_dict, overwrite)
3183 )
3184 else:
3185 # For FK values that have None, add them to a
3186 # separate collection that will be populated separately
3187 none_states.append((state, state_dict, overwrite))
3188
3189 # note the above conditional may have changed query_info
3190 if not query_info.load_only_child:
3191 our_states = [
3192 (state.key[1], state, state.dict, overwrite)
3193 for state, overwrite in states
3194 ]
3195
3196 pk_cols = query_info.pk_cols
3197 in_expr = query_info.in_expr
3198
3199 if not query_info.load_with_join:
3200 # in "omit join" mode, the primary key column and the
3201 # "in" expression are in terms of the related entity. So
3202 # if the related entity is polymorphic or otherwise aliased,
3203 # we need to adapt our "pk_cols" and "in_expr" to that
3204 # entity. in non-"omit join" mode, these are against the
3205 # parent entity and do not need adaption.
3206 if effective_entity.is_aliased_class:
3207 pk_cols = [
3208 effective_entity._adapt_element(col) for col in pk_cols
3209 ]
3210 in_expr = effective_entity._adapt_element(in_expr)
3211
3212 bundle_ent = orm_util.Bundle("pk", *pk_cols)
3213 bundle_sql = bundle_ent.__clause_element__()
3214
3215 entity_sql = effective_entity.__clause_element__()
3216 q = Select._create_raw_select(
3217 _raw_columns=[bundle_sql, entity_sql],
3218 _label_style=LABEL_STYLE_TABLENAME_PLUS_COL,
3219 _compile_options=_ORMCompileState.default_compile_options,
3220 _propagate_attrs={
3221 "compile_state_plugin": "orm",
3222 "plugin_subject": effective_entity,
3223 },
3224 )
3225
3226 if not query_info.load_with_join:
3227 # the Bundle we have in the "omit_join" case is against raw, non
3228 # annotated columns, so to ensure the Query knows its primary
3229 # entity, we add it explicitly. If we made the Bundle against
3230 # annotated columns, we hit a performance issue in this specific
3231 # case, which is detailed in issue #4347.
3232 q = q.select_from(effective_entity)
3233 else:
3234 # in the non-omit_join case, the Bundle is against the annotated/
3235 # mapped column of the parent entity, but the #4347 issue does not
3236 # occur in this case.
3237 q = q.select_from(self._parent_alias).join(
3238 getattr(self._parent_alias, self.parent_property.key).of_type(
3239 effective_entity
3240 )
3241 )
3242
3243 q = q.filter(in_expr.in_(sql.bindparam("primary_keys")))
3244
3245 # a test which exercises what these comments talk about is
3246 # test_selectin_relations.py -> test_twolevel_selectin_w_polymorphic
3247 #
3248 # effective_entity above is given to us in terms of the cached
3249 # statement, namely this one:
3250 orig_query = context.compile_state.select_statement
3251
3252 # the actual statement that was requested is this one:
3253 # context_query = context.user_passed_query
3254 #
3255 # that's not the cached one, however. So while it is of the identical
3256 # structure, if it has entities like AliasedInsp, which we get from
3257 # aliased() or with_polymorphic(), the AliasedInsp will likely be a
3258 # different object identity each time, and will not match up
3259 # hashing-wise to the corresponding AliasedInsp that's in the
3260 # cached query, meaning it won't match on paths and loader lookups
3261 # and loaders like this one will be skipped if it is used in options.
3262 #
3263 # as it turns out, standard loader options like selectinload(),
3264 # lazyload() that have a path need
3265 # to come from the cached query so that the AliasedInsp etc. objects
3266 # that are in the query line up with the object that's in the path
3267 # of the strategy object. however other options like
3268 # with_loader_criteria() that doesn't have a path (has a fixed entity)
3269 # and needs to have access to the latest closure state in order to
3270 # be correct, we need to use the uncached one.
3271 #
3272 # as of #8399 we let the loader option itself figure out what it
3273 # wants to do given cached and uncached version of itself.
3274
3275 effective_path = path[self.parent_property]
3276
3277 if orig_query is context.user_passed_query:
3278 new_options = orig_query._with_options
3279 else:
3280 cached_options = orig_query._with_options
3281 uncached_options = context.user_passed_query._with_options
3282
3283 # propagate compile state options from the original query,
3284 # updating their "extra_criteria" as necessary.
3285 # note this will create a different cache key than
3286 # "orig" options if extra_criteria is present, because the copy
3287 # of extra_criteria will have different boundparam than that of
3288 # the QueryableAttribute in the path
3289 new_options = [
3290 orig_opt._adapt_cached_option_to_uncached_option(
3291 context, uncached_opt
3292 )
3293 for orig_opt, uncached_opt in zip(
3294 cached_options, uncached_options
3295 )
3296 ]
3297
3298 if loadopt and loadopt._extra_criteria:
3299 new_options += (
3300 orm_util.LoaderCriteriaOption(
3301 effective_entity,
3302 loadopt._generate_extra_criteria(context),
3303 ),
3304 )
3305
3306 if recursion_depth is not None:
3307 effective_path = effective_path._truncate_recursive()
3308
3309 q = q.options(*new_options)
3310
3311 q = q._update_compile_options({"_current_path": effective_path})
3312 if context.populate_existing:
3313 q = q.execution_options(populate_existing=True)
3314
3315 if self.parent_property.order_by:
3316 if not query_info.load_with_join:
3317 eager_order_by = self.parent_property.order_by
3318 if effective_entity.is_aliased_class:
3319 eager_order_by = [
3320 effective_entity._adapt_element(elem)
3321 for elem in eager_order_by
3322 ]
3323 q = q.order_by(*eager_order_by)
3324 else:
3325
3326 def _setup_outermost_orderby(compile_context):
3327 compile_context.eager_order_by += tuple(
3328 util.to_list(self.parent_property.order_by)
3329 )
3330
3331 q = q._add_compile_state_func(
3332 _setup_outermost_orderby, self.parent_property
3333 )
3334
3335 if query_info.load_only_child:
3336 self._load_via_child(
3337 our_states,
3338 none_states,
3339 query_info,
3340 q,
3341 context,
3342 execution_options,
3343 )
3344 else:
3345 self._load_via_parent(
3346 our_states, query_info, q, context, execution_options
3347 )
3348
3349 def _load_via_child(
3350 self,
3351 our_states,
3352 none_states,
3353 query_info,
3354 q,
3355 context,
3356 execution_options,
3357 ):
3358 uselist = self.uselist
3359
3360 # this sort is really for the benefit of the unit tests
3361 our_keys = sorted(our_states)
3362 while our_keys:
3363 chunk = our_keys[0 : self._chunksize]
3364 our_keys = our_keys[self._chunksize :]
3365 data = {
3366 k: v
3367 for k, v in context.session.execute(
3368 q,
3369 params={
3370 "primary_keys": [
3371 key[0] if query_info.zero_idx else key
3372 for key in chunk
3373 ]
3374 },
3375 execution_options=execution_options,
3376 ).unique()
3377 }
3378
3379 for key in chunk:
3380 # for a real foreign key and no concurrent changes to the
3381 # DB while running this method, "key" is always present in
3382 # data. However, for primaryjoins without real foreign keys
3383 # a non-None primaryjoin condition may still refer to no
3384 # related object.
3385 related_obj = data.get(key, None)
3386 for state, dict_, overwrite in our_states[key]:
3387 if not overwrite and self.key in dict_:
3388 continue
3389
3390 state.get_impl(self.key).set_committed_value(
3391 state,
3392 dict_,
3393 related_obj if not uselist else [related_obj],
3394 )
3395 # populate none states with empty value / collection
3396 for state, dict_, overwrite in none_states:
3397 if not overwrite and self.key in dict_:
3398 continue
3399
3400 # note it's OK if this is a uselist=True attribute, the empty
3401 # collection will be populated
3402 state.get_impl(self.key).set_committed_value(state, dict_, None)
3403
3404 def _load_via_parent(
3405 self, our_states, query_info, q, context, execution_options
3406 ):
3407 uselist = self.uselist
3408 _empty_result = () if uselist else None
3409
3410 while our_states:
3411 chunk = our_states[0 : self._chunksize]
3412 our_states = our_states[self._chunksize :]
3413
3414 primary_keys = [
3415 key[0] if query_info.zero_idx else key
3416 for key, state, state_dict, overwrite in chunk
3417 ]
3418
3419 data = collections.defaultdict(list)
3420 for k, v in itertools.groupby(
3421 context.session.execute(
3422 q,
3423 params={"primary_keys": primary_keys},
3424 execution_options=execution_options,
3425 ).unique(),
3426 lambda x: x[0],
3427 ):
3428 data[k].extend(vv[1] for vv in v)
3429
3430 for key, state, state_dict, overwrite in chunk:
3431 if not overwrite and self.key in state_dict:
3432 continue
3433
3434 collection = data.get(key, _empty_result)
3435
3436 if not uselist and collection:
3437 if len(collection) > 1:
3438 util.warn(
3439 "Multiple rows returned with "
3440 "uselist=False for eagerly-loaded "
3441 "attribute '%s' " % self
3442 )
3443 state.get_impl(self.key).set_committed_value(
3444 state, state_dict, collection[0]
3445 )
3446 else:
3447 # note that empty tuple set on uselist=False sets the
3448 # value to None
3449 state.get_impl(self.key).set_committed_value(
3450 state, state_dict, collection
3451 )
3452
3453
3454def _single_parent_validator(desc, prop):
3455 def _do_check(state, value, oldvalue, initiator):
3456 if value is not None and initiator.key == prop.key:
3457 hasparent = initiator.hasparent(attributes.instance_state(value))
3458 if hasparent and oldvalue is not value:
3459 raise sa_exc.InvalidRequestError(
3460 "Instance %s is already associated with an instance "
3461 "of %s via its %s attribute, and is only allowed a "
3462 "single parent."
3463 % (orm_util.instance_str(value), state.class_, prop),
3464 code="bbf1",
3465 )
3466 return value
3467
3468 def append(state, value, initiator):
3469 return _do_check(state, value, None, initiator)
3470
3471 def set_(state, value, oldvalue, initiator):
3472 return _do_check(state, value, oldvalue, initiator)
3473
3474 event.listen(
3475 desc, "append", append, raw=True, retval=True, active_history=True
3476 )
3477 event.listen(desc, "set", set_, raw=True, retval=True, active_history=True)