1# orm/strategies.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: ignore-errors
8
9
10"""sqlalchemy.orm.interfaces.LoaderStrategy
11implementations, and related MapperOptions."""
12
13from __future__ import annotations
14
15import collections
16import itertools
17from typing import Any
18from typing import Dict
19from typing import Literal
20from typing import Optional
21from typing import Tuple
22from typing import TYPE_CHECKING
23from typing import Union
24
25from . import attributes
26from . import exc as orm_exc
27from . import interfaces
28from . import loading
29from . import path_registry
30from . import properties
31from . import query
32from . import relationships
33from . import unitofwork
34from . import util as orm_util
35from .base import _DEFER_FOR_STATE
36from .base import _RAISE_FOR_STATE
37from .base import _SET_DEFERRED_EXPIRED
38from .base import ATTR_WAS_SET
39from .base import LoaderCallableStatus
40from .base import PASSIVE_OFF
41from .base import PassiveFlag
42from .context import _column_descriptions
43from .context import _ORMCompileState
44from .context import _ORMSelectCompileState
45from .context import QueryContext
46from .interfaces import LoaderStrategy
47from .interfaces import StrategizedProperty
48from .session import _state_session
49from .state import InstanceState
50from .strategy_options import Load
51from .util import _none_only_set
52from .util import AliasedClass
53from .. import event
54from .. import exc as sa_exc
55from .. import inspect
56from .. import log
57from .. import sql
58from .. import util
59from ..sql import util as sql_util
60from ..sql import visitors
61from ..sql.selectable import LABEL_STYLE_TABLENAME_PLUS_COL
62from ..sql.selectable import Select
63
64if TYPE_CHECKING:
65 from .mapper import Mapper
66 from .relationships import RelationshipProperty
67 from ..sql.elements import ColumnElement
68
69
70def _register_attribute(
71 prop,
72 mapper,
73 useobject,
74 compare_function=None,
75 typecallable=None,
76 callable_=None,
77 proxy_property=None,
78 active_history=False,
79 impl_class=None,
80 default_scalar_value=None,
81 **kw,
82):
83 listen_hooks = []
84
85 uselist = useobject and prop.uselist
86
87 if useobject and prop.single_parent:
88 listen_hooks.append(_single_parent_validator)
89
90 if prop.key in prop.parent.validators:
91 fn, opts = prop.parent.validators[prop.key]
92 listen_hooks.append(
93 lambda desc, prop: orm_util._validator_events(
94 desc, prop.key, fn, **opts
95 )
96 )
97
98 if useobject:
99 listen_hooks.append(unitofwork._track_cascade_events)
100
101 # need to assemble backref listeners
102 # after the singleparentvalidator, mapper validator
103 if useobject:
104 backref = prop.back_populates
105 if backref and prop._effective_sync_backref:
106 listen_hooks.append(
107 lambda desc, prop: attributes._backref_listeners(
108 desc, backref, uselist
109 )
110 )
111
112 # a single MapperProperty is shared down a class inheritance
113 # hierarchy, so we set up attribute instrumentation and backref event
114 # for each mapper down the hierarchy.
115
116 # typically, "mapper" is the same as prop.parent, due to the way
117 # the configure_mappers() process runs, however this is not strongly
118 # enforced, and in the case of a second configure_mappers() run the
119 # mapper here might not be prop.parent; also, a subclass mapper may
120 # be called here before a superclass mapper. That is, can't depend
121 # on mappers not already being set up so we have to check each one.
122
123 for m in mapper.self_and_descendants:
124 if prop is m._props.get(
125 prop.key
126 ) and not m.class_manager._attr_has_impl(prop.key):
127 desc = attributes._register_attribute_impl(
128 m.class_,
129 prop.key,
130 parent_token=prop,
131 uselist=uselist,
132 compare_function=compare_function,
133 useobject=useobject,
134 trackparent=useobject
135 and (
136 prop.single_parent
137 or prop.direction is interfaces.ONETOMANY
138 ),
139 typecallable=typecallable,
140 callable_=callable_,
141 active_history=active_history,
142 default_scalar_value=default_scalar_value,
143 impl_class=impl_class,
144 send_modified_events=not useobject or not prop.viewonly,
145 doc=prop.doc,
146 **kw,
147 )
148
149 for hook in listen_hooks:
150 hook(desc, prop)
151
152
153@properties.ColumnProperty.strategy_for(instrument=False, deferred=False)
154class _UninstrumentedColumnLoader(LoaderStrategy):
155 """Represent a non-instrumented MapperProperty.
156
157 The polymorphic_on argument of mapper() often results in this,
158 if the argument is against the with_polymorphic selectable.
159
160 """
161
162 __slots__ = ("columns",)
163
164 def __init__(self, parent, strategy_key):
165 super().__init__(parent, strategy_key)
166 self.columns = self.parent_property.columns
167
168 def setup_query(
169 self,
170 compile_state,
171 query_entity,
172 path,
173 loadopt,
174 adapter,
175 column_collection=None,
176 **kwargs,
177 ):
178 for c in self.columns:
179 if adapter:
180 c = adapter.columns[c]
181 compile_state._append_dedupe_col_collection(c, column_collection)
182
183 def create_row_processor(
184 self,
185 context,
186 query_entity,
187 path,
188 loadopt,
189 mapper,
190 result,
191 adapter,
192 populators,
193 ):
194 pass
195
196
197@log.class_logger
198@properties.ColumnProperty.strategy_for(instrument=True, deferred=False)
199class _ColumnLoader(LoaderStrategy):
200 """Provide loading behavior for a :class:`.ColumnProperty`."""
201
202 __slots__ = "columns", "is_composite"
203
204 def __init__(self, parent, strategy_key):
205 super().__init__(parent, strategy_key)
206 self.columns = self.parent_property.columns
207 self.is_composite = hasattr(self.parent_property, "composite_class")
208
209 def setup_query(
210 self,
211 compile_state,
212 query_entity,
213 path,
214 loadopt,
215 adapter,
216 column_collection,
217 memoized_populators,
218 check_for_adapt=False,
219 **kwargs,
220 ):
221 for c in self.columns:
222 if adapter:
223 if check_for_adapt:
224 c = adapter.adapt_check_present(c)
225 if c is None:
226 return
227 else:
228 c = adapter.columns[c]
229
230 compile_state._append_dedupe_col_collection(c, column_collection)
231
232 fetch = self.columns[0]
233 if adapter:
234 fetch = adapter.columns[fetch]
235 if fetch is None:
236 # None happens here only for dml bulk_persistence cases
237 # when context.DMLReturningColFilter is used
238 return
239
240 memoized_populators[self.parent_property] = fetch
241
242 def init_class_attribute(self, mapper):
243 self.is_class_level = True
244 coltype = self.columns[0].type
245 # TODO: check all columns ? check for foreign key as well?
246 active_history = (
247 self.parent_property.active_history
248 or self.columns[0].primary_key
249 or (
250 mapper.version_id_col is not None
251 and mapper._columntoproperty.get(mapper.version_id_col, None)
252 is self.parent_property
253 )
254 )
255
256 _register_attribute(
257 self.parent_property,
258 mapper,
259 useobject=False,
260 compare_function=coltype.compare_values,
261 active_history=active_history,
262 default_scalar_value=self.parent_property._default_scalar_value,
263 )
264
265 def create_row_processor(
266 self,
267 context,
268 query_entity,
269 path,
270 loadopt,
271 mapper,
272 result,
273 adapter,
274 populators,
275 ):
276 # look through list of columns represented here
277 # to see which, if any, is present in the row.
278
279 for col in self.columns:
280 if adapter:
281 col = adapter.columns[col]
282 getter = result._getter(col, False)
283 if getter:
284 populators["quick"].append((self.key, getter))
285 break
286 else:
287 populators["expire"].append((self.key, True))
288
289
290@log.class_logger
291@properties.ColumnProperty.strategy_for(query_expression=True)
292class _ExpressionColumnLoader(_ColumnLoader):
293 def __init__(self, parent, strategy_key):
294 super().__init__(parent, strategy_key)
295
296 # compare to the "default" expression that is mapped in
297 # the column. If it's sql.null, we don't need to render
298 # unless an expr is passed in the options.
299 null = sql.null().label(None)
300 self._have_default_expression = any(
301 not c.compare(null) for c in self.parent_property.columns
302 )
303
304 def setup_query(
305 self,
306 compile_state,
307 query_entity,
308 path,
309 loadopt,
310 adapter,
311 column_collection,
312 memoized_populators,
313 **kwargs,
314 ):
315 columns = None
316 if loadopt and loadopt._extra_criteria:
317 columns = loadopt._extra_criteria
318
319 elif self._have_default_expression:
320 columns = self.parent_property.columns
321
322 if columns is None:
323 return
324
325 for c in columns:
326 if adapter:
327 c = adapter.columns[c]
328 compile_state._append_dedupe_col_collection(c, column_collection)
329
330 fetch = columns[0]
331 if adapter:
332 fetch = adapter.columns[fetch]
333 if fetch is None:
334 # None is not expected to be the result of any
335 # adapter implementation here, however there may be theoretical
336 # usages of returning() with context.DMLReturningColFilter
337 return
338
339 memoized_populators[self.parent_property] = fetch
340
341 def create_row_processor(
342 self,
343 context,
344 query_entity,
345 path,
346 loadopt,
347 mapper,
348 result,
349 adapter,
350 populators,
351 ):
352 # look through list of columns represented here
353 # to see which, if any, is present in the row.
354 if loadopt and loadopt._extra_criteria:
355 columns = loadopt._extra_criteria
356
357 for col in columns:
358 if adapter:
359 col = adapter.columns[col]
360 getter = result._getter(col, False)
361 if getter:
362 populators["quick"].append((self.key, getter))
363 break
364 else:
365 populators["expire"].append((self.key, True))
366
367 def init_class_attribute(self, mapper):
368 self.is_class_level = True
369
370 _register_attribute(
371 self.parent_property,
372 mapper,
373 useobject=False,
374 compare_function=self.columns[0].type.compare_values,
375 accepts_scalar_loader=False,
376 default_scalar_value=self.parent_property._default_scalar_value,
377 )
378
379
380@log.class_logger
381@properties.ColumnProperty.strategy_for(deferred=True, instrument=True)
382@properties.ColumnProperty.strategy_for(
383 deferred=True, instrument=True, raiseload=True
384)
385@properties.ColumnProperty.strategy_for(do_nothing=True)
386class _DeferredColumnLoader(LoaderStrategy):
387 """Provide loading behavior for a deferred :class:`.ColumnProperty`."""
388
389 __slots__ = "columns", "group", "raiseload"
390
391 def __init__(self, parent, strategy_key):
392 super().__init__(parent, strategy_key)
393 if hasattr(self.parent_property, "composite_class"):
394 raise NotImplementedError(
395 "Deferred loading for composite types not implemented yet"
396 )
397 self.raiseload = self.strategy_opts.get("raiseload", False)
398 self.columns = self.parent_property.columns
399 self.group = self.parent_property.group
400
401 def create_row_processor(
402 self,
403 context,
404 query_entity,
405 path,
406 loadopt,
407 mapper,
408 result,
409 adapter,
410 populators,
411 ):
412 # for a DeferredColumnLoader, this method is only used during a
413 # "row processor only" query; see test_deferred.py ->
414 # tests with "rowproc_only" in their name. As of the 1.0 series,
415 # loading._instance_processor doesn't use a "row processing" function
416 # to populate columns, instead it uses data in the "populators"
417 # dictionary. Normally, the DeferredColumnLoader.setup_query()
418 # sets up that data in the "memoized_populators" dictionary
419 # and "create_row_processor()" here is never invoked.
420
421 if (
422 context.refresh_state
423 and context.query._compile_options._only_load_props
424 and self.key in context.query._compile_options._only_load_props
425 ):
426 self.parent_property._get_strategy(
427 (("deferred", False), ("instrument", True))
428 ).create_row_processor(
429 context,
430 query_entity,
431 path,
432 loadopt,
433 mapper,
434 result,
435 adapter,
436 populators,
437 )
438
439 elif not self.is_class_level:
440 if self.raiseload:
441 set_deferred_for_local_state = (
442 self.parent_property._raise_column_loader
443 )
444 else:
445 set_deferred_for_local_state = (
446 self.parent_property._deferred_column_loader
447 )
448 populators["new"].append((self.key, set_deferred_for_local_state))
449 else:
450 populators["expire"].append((self.key, False))
451
452 def init_class_attribute(self, mapper):
453 self.is_class_level = True
454
455 _register_attribute(
456 self.parent_property,
457 mapper,
458 useobject=False,
459 compare_function=self.columns[0].type.compare_values,
460 callable_=self._load_for_state,
461 load_on_unexpire=False,
462 default_scalar_value=self.parent_property._default_scalar_value,
463 )
464
465 def setup_query(
466 self,
467 compile_state,
468 query_entity,
469 path,
470 loadopt,
471 adapter,
472 column_collection,
473 memoized_populators,
474 only_load_props=None,
475 **kw,
476 ):
477 if (
478 (
479 compile_state.compile_options._render_for_subquery
480 and self.parent_property._renders_in_subqueries
481 )
482 or (
483 loadopt
484 and set(self.columns).intersection(
485 self.parent._should_undefer_in_wildcard
486 )
487 )
488 or (
489 loadopt
490 and self.group
491 and loadopt.local_opts.get(
492 "undefer_group_%s" % self.group, False
493 )
494 )
495 or (only_load_props and self.key in only_load_props)
496 ):
497 self.parent_property._get_strategy(
498 (("deferred", False), ("instrument", True))
499 ).setup_query(
500 compile_state,
501 query_entity,
502 path,
503 loadopt,
504 adapter,
505 column_collection,
506 memoized_populators,
507 **kw,
508 )
509 elif self.is_class_level:
510 memoized_populators[self.parent_property] = _SET_DEFERRED_EXPIRED
511 elif not self.raiseload:
512 memoized_populators[self.parent_property] = _DEFER_FOR_STATE
513 else:
514 memoized_populators[self.parent_property] = _RAISE_FOR_STATE
515
516 def _load_for_state(self, state, passive):
517 if not state.key:
518 return LoaderCallableStatus.ATTR_EMPTY
519
520 if not passive & PassiveFlag.SQL_OK:
521 return LoaderCallableStatus.PASSIVE_NO_RESULT
522
523 localparent = state.manager.mapper
524
525 if self.group:
526 toload = [
527 p.key
528 for p in localparent.iterate_properties
529 if isinstance(p, StrategizedProperty)
530 and isinstance(p.strategy, _DeferredColumnLoader)
531 and p.group == self.group
532 ]
533 else:
534 toload = [self.key]
535
536 # narrow the keys down to just those which have no history
537 group = [k for k in toload if k in state.unmodified]
538
539 session = _state_session(state)
540 if session is None:
541 raise orm_exc.DetachedInstanceError(
542 "Parent instance %s is not bound to a Session; "
543 "deferred load operation of attribute '%s' cannot proceed"
544 % (orm_util.state_str(state), self.key)
545 )
546
547 if self.raiseload:
548 self._invoke_raise_load(state, passive, "raise")
549
550 loading._load_scalar_attributes(
551 state.mapper, state, set(group), PASSIVE_OFF
552 )
553
554 return LoaderCallableStatus.ATTR_WAS_SET
555
556 def _invoke_raise_load(self, state, passive, lazy):
557 raise sa_exc.InvalidRequestError(
558 "'%s' is not available due to raiseload=True" % (self,)
559 )
560
561
562class _LoadDeferredColumns:
563 """serializable loader object used by DeferredColumnLoader"""
564
565 def __init__(self, key: str, raiseload: bool = False):
566 self.key = key
567 self.raiseload = raiseload
568
569 def __call__(self, state, passive=attributes.PASSIVE_OFF):
570 key = self.key
571
572 localparent = state.manager.mapper
573 prop = localparent._props[key]
574 if self.raiseload:
575 strategy_key = (
576 ("deferred", True),
577 ("instrument", True),
578 ("raiseload", True),
579 )
580 else:
581 strategy_key = (("deferred", True), ("instrument", True))
582 strategy = prop._get_strategy(strategy_key)
583 return strategy._load_for_state(state, passive)
584
585
586class _AbstractRelationshipLoader(LoaderStrategy):
587 """LoaderStratgies which deal with related objects."""
588
589 __slots__ = "mapper", "target", "uselist", "entity"
590
591 def __init__(self, parent, strategy_key):
592 super().__init__(parent, strategy_key)
593 self.mapper = self.parent_property.mapper
594 self.entity = self.parent_property.entity
595 self.target = self.parent_property.target
596 self.uselist = self.parent_property.uselist
597
598 def _immediateload_create_row_processor(
599 self,
600 context,
601 query_entity,
602 path,
603 loadopt,
604 mapper,
605 result,
606 adapter,
607 populators,
608 ):
609 return self.parent_property._get_strategy(
610 (("lazy", "immediate"),)
611 ).create_row_processor(
612 context,
613 query_entity,
614 path,
615 loadopt,
616 mapper,
617 result,
618 adapter,
619 populators,
620 )
621
622
623@log.class_logger
624@relationships.RelationshipProperty.strategy_for(do_nothing=True)
625class _DoNothingLoader(LoaderStrategy):
626 """Relationship loader that makes no change to the object's state.
627
628 Compared to NoLoader, this loader does not initialize the
629 collection/attribute to empty/none; the usual default LazyLoader will
630 take effect.
631
632 """
633
634
635@log.class_logger
636@relationships.RelationshipProperty.strategy_for(lazy="noload")
637@relationships.RelationshipProperty.strategy_for(lazy=None)
638class _NoLoader(_AbstractRelationshipLoader):
639 """Provide loading behavior for a :class:`.Relationship`
640 with "lazy=None".
641
642 """
643
644 __slots__ = ()
645
646 @util.deprecated(
647 "2.1",
648 "The ``noload`` loader strategy is deprecated and will be removed "
649 "in a future release. This option "
650 "produces incorrect results by returning ``None`` for related "
651 "items.",
652 )
653 def init_class_attribute(self, mapper):
654 self.is_class_level = True
655
656 _register_attribute(
657 self.parent_property,
658 mapper,
659 useobject=True,
660 typecallable=self.parent_property.collection_class,
661 )
662
663 def create_row_processor(
664 self,
665 context,
666 query_entity,
667 path,
668 loadopt,
669 mapper,
670 result,
671 adapter,
672 populators,
673 ):
674 def invoke_no_load(state, dict_, row):
675 if self.uselist:
676 attributes.init_state_collection(state, dict_, self.key)
677 else:
678 dict_[self.key] = None
679
680 populators["new"].append((self.key, invoke_no_load))
681
682
683@log.class_logger
684@relationships.RelationshipProperty.strategy_for(lazy=True)
685@relationships.RelationshipProperty.strategy_for(lazy="select")
686@relationships.RelationshipProperty.strategy_for(lazy="raise")
687@relationships.RelationshipProperty.strategy_for(lazy="raise_on_sql")
688@relationships.RelationshipProperty.strategy_for(lazy="baked_select")
689class _LazyLoader(
690 _AbstractRelationshipLoader, util.MemoizedSlots, log.Identified
691):
692 """Provide loading behavior for a :class:`.Relationship`
693 with "lazy=True", that is loads when first accessed.
694
695 """
696
697 __slots__ = (
698 "_lazywhere",
699 "_rev_lazywhere",
700 "_lazyload_reverse_option",
701 "_order_by",
702 "use_get",
703 "is_aliased_class",
704 "_bind_to_col",
705 "_equated_columns",
706 "_rev_bind_to_col",
707 "_rev_equated_columns",
708 "_simple_lazy_clause",
709 "_raise_always",
710 "_raise_on_sql",
711 )
712
713 _lazywhere: ColumnElement[bool]
714 _bind_to_col: Dict[str, ColumnElement[Any]]
715 _rev_lazywhere: ColumnElement[bool]
716 _rev_bind_to_col: Dict[str, ColumnElement[Any]]
717
718 parent_property: RelationshipProperty[Any]
719
720 def __init__(
721 self, parent: RelationshipProperty[Any], strategy_key: Tuple[Any, ...]
722 ):
723 super().__init__(parent, strategy_key)
724 self._raise_always = self.strategy_opts["lazy"] == "raise"
725 self._raise_on_sql = self.strategy_opts["lazy"] == "raise_on_sql"
726
727 self.is_aliased_class = inspect(self.entity).is_aliased_class
728
729 join_condition = self.parent_property._join_condition
730 (
731 self._lazywhere,
732 self._bind_to_col,
733 self._equated_columns,
734 ) = join_condition.create_lazy_clause()
735
736 (
737 self._rev_lazywhere,
738 self._rev_bind_to_col,
739 self._rev_equated_columns,
740 ) = join_condition.create_lazy_clause(reverse_direction=True)
741
742 if self.parent_property.order_by:
743 self._order_by = util.to_list(self.parent_property.order_by)
744 else:
745 self._order_by = None
746
747 self.logger.info("%s lazy loading clause %s", self, self._lazywhere)
748
749 # determine if our "lazywhere" clause is the same as the mapper's
750 # get() clause. then we can just use mapper.get()
751 #
752 # TODO: the "not self.uselist" can be taken out entirely; a m2o
753 # load that populates for a list (very unusual, but is possible with
754 # the API) can still set for "None" and the attribute system will
755 # populate as an empty list.
756 self.use_get = (
757 not self.is_aliased_class
758 and not self.uselist
759 and self.entity._get_clause[0].compare(
760 self._lazywhere,
761 use_proxies=True,
762 compare_keys=False,
763 equivalents=self.mapper._equivalent_columns,
764 )
765 )
766
767 if self.use_get:
768 for col in list(self._equated_columns):
769 if col in self.mapper._equivalent_columns:
770 for c in self.mapper._equivalent_columns[col]:
771 self._equated_columns[c] = self._equated_columns[col]
772
773 self.logger.info(
774 "%s will use Session.get() to optimize instance loads", self
775 )
776
777 def init_class_attribute(self, mapper):
778 self.is_class_level = True
779
780 _legacy_inactive_history_style = (
781 self.parent_property._legacy_inactive_history_style
782 )
783
784 if self.parent_property.active_history:
785 active_history = True
786 _deferred_history = False
787
788 elif (
789 self.parent_property.direction is not interfaces.MANYTOONE
790 or not self.use_get
791 ):
792 if _legacy_inactive_history_style:
793 active_history = True
794 _deferred_history = False
795 else:
796 active_history = False
797 _deferred_history = True
798 else:
799 active_history = _deferred_history = False
800
801 _register_attribute(
802 self.parent_property,
803 mapper,
804 useobject=True,
805 callable_=self._load_for_state,
806 typecallable=self.parent_property.collection_class,
807 active_history=active_history,
808 _deferred_history=_deferred_history,
809 )
810
811 def _memoized_attr__simple_lazy_clause(self):
812 lazywhere = self._lazywhere
813
814 criterion, bind_to_col = (lazywhere, self._bind_to_col)
815
816 params = []
817
818 def visit_bindparam(bindparam):
819 bindparam.unique = False
820
821 visitors.traverse(criterion, {}, {"bindparam": visit_bindparam})
822
823 def visit_bindparam(bindparam):
824 if bindparam._identifying_key in bind_to_col:
825 params.append(
826 (
827 bindparam.key,
828 bind_to_col[bindparam._identifying_key],
829 None,
830 )
831 )
832 elif bindparam.callable is None:
833 params.append((bindparam.key, None, bindparam.value))
834
835 criterion = visitors.cloned_traverse(
836 criterion, {}, {"bindparam": visit_bindparam}
837 )
838
839 return criterion, params
840
841 def _generate_lazy_clause(self, state, passive):
842 criterion, param_keys = self._simple_lazy_clause
843
844 if state is None:
845 return sql_util.adapt_criterion_to_null(
846 criterion, [key for key, ident, value in param_keys]
847 )
848
849 mapper = self.parent_property.parent
850
851 o = state.obj() # strong ref
852 dict_ = attributes.instance_dict(o)
853
854 if passive & PassiveFlag.INIT_OK:
855 passive ^= PassiveFlag.INIT_OK
856
857 params = {}
858 for key, ident, value in param_keys:
859 if ident is not None:
860 if passive and passive & PassiveFlag.LOAD_AGAINST_COMMITTED:
861 value = mapper._get_committed_state_attr_by_column(
862 state, dict_, ident, passive
863 )
864 else:
865 value = mapper._get_state_attr_by_column(
866 state, dict_, ident, passive
867 )
868
869 params[key] = value
870
871 return criterion, params
872
873 def _invoke_raise_load(self, state, passive, lazy):
874 raise sa_exc.InvalidRequestError(
875 "'%s' is not available due to lazy='%s'" % (self, lazy)
876 )
877
878 def _load_for_state(
879 self,
880 state,
881 passive,
882 loadopt=None,
883 extra_criteria=(),
884 extra_options=(),
885 alternate_effective_path=None,
886 execution_options=util.EMPTY_DICT,
887 ):
888 if not state.key and (
889 (
890 not self.parent_property.load_on_pending
891 and not state._load_pending
892 )
893 or not state.session_id
894 ):
895 return LoaderCallableStatus.ATTR_EMPTY
896
897 pending = not state.key
898 primary_key_identity = None
899
900 use_get = self.use_get and (not loadopt or not loadopt._extra_criteria)
901
902 if (not passive & PassiveFlag.SQL_OK and not use_get) or (
903 not passive & attributes.NON_PERSISTENT_OK and pending
904 ):
905 return LoaderCallableStatus.PASSIVE_NO_RESULT
906
907 if (
908 # we were given lazy="raise"
909 self._raise_always
910 # the no_raise history-related flag was not passed
911 and not passive & PassiveFlag.NO_RAISE
912 and (
913 # if we are use_get and related_object_ok is disabled,
914 # which means we are at most looking in the identity map
915 # for history purposes or otherwise returning
916 # PASSIVE_NO_RESULT, don't raise. This is also a
917 # history-related flag
918 not use_get
919 or passive & PassiveFlag.RELATED_OBJECT_OK
920 )
921 ):
922 self._invoke_raise_load(state, passive, "raise")
923
924 session = _state_session(state)
925 if not session:
926 if passive & PassiveFlag.NO_RAISE:
927 return LoaderCallableStatus.PASSIVE_NO_RESULT
928
929 raise orm_exc.DetachedInstanceError(
930 "Parent instance %s is not bound to a Session; "
931 "lazy load operation of attribute '%s' cannot proceed"
932 % (orm_util.state_str(state), self.key)
933 )
934
935 # if we have a simple primary key load, check the
936 # identity map without generating a Query at all
937 if use_get:
938 primary_key_identity = self._get_ident_for_use_get(
939 session, state, passive
940 )
941 if LoaderCallableStatus.PASSIVE_NO_RESULT in primary_key_identity:
942 return LoaderCallableStatus.PASSIVE_NO_RESULT
943 elif LoaderCallableStatus.NEVER_SET in primary_key_identity:
944 return LoaderCallableStatus.NEVER_SET
945
946 # test for None alone in primary_key_identity based on
947 # allow_partial_pks preference. PASSIVE_NO_RESULT and NEVER_SET
948 # have already been tested above
949 if not self.mapper.allow_partial_pks:
950 if _none_only_set.intersection(primary_key_identity):
951 return None
952 else:
953 if _none_only_set.issuperset(primary_key_identity):
954 return None
955
956 if (
957 self.key in state.dict
958 and not passive & PassiveFlag.DEFERRED_HISTORY_LOAD
959 ):
960 return LoaderCallableStatus.ATTR_WAS_SET
961
962 # look for this identity in the identity map. Delegate to the
963 # Query class in use, as it may have special rules for how it
964 # does this, including how it decides what the correct
965 # identity_token would be for this identity.
966
967 instance = session._identity_lookup(
968 self.entity,
969 primary_key_identity,
970 passive=passive,
971 lazy_loaded_from=state,
972 )
973
974 if instance is not None:
975 if instance is LoaderCallableStatus.PASSIVE_CLASS_MISMATCH:
976 return None
977 else:
978 return instance
979 elif (
980 not passive & PassiveFlag.SQL_OK
981 or not passive & PassiveFlag.RELATED_OBJECT_OK
982 ):
983 return LoaderCallableStatus.PASSIVE_NO_RESULT
984
985 return self._emit_lazyload(
986 session,
987 state,
988 primary_key_identity,
989 passive,
990 loadopt,
991 extra_criteria,
992 extra_options,
993 alternate_effective_path,
994 execution_options,
995 )
996
997 def _get_ident_for_use_get(self, session, state, passive):
998 instance_mapper = state.manager.mapper
999
1000 if passive & PassiveFlag.LOAD_AGAINST_COMMITTED:
1001 get_attr = instance_mapper._get_committed_state_attr_by_column
1002 else:
1003 get_attr = instance_mapper._get_state_attr_by_column
1004
1005 dict_ = state.dict
1006
1007 return [
1008 get_attr(state, dict_, self._equated_columns[pk], passive=passive)
1009 for pk in self.mapper.primary_key
1010 ]
1011
1012 @util.preload_module("sqlalchemy.orm.strategy_options")
1013 def _emit_lazyload(
1014 self,
1015 session,
1016 state,
1017 primary_key_identity,
1018 passive,
1019 loadopt,
1020 extra_criteria,
1021 extra_options,
1022 alternate_effective_path,
1023 execution_options,
1024 ):
1025 strategy_options = util.preloaded.orm_strategy_options
1026
1027 clauseelement = self.entity.__clause_element__()
1028 stmt = Select._create_raw_select(
1029 _raw_columns=[clauseelement],
1030 _propagate_attrs=clauseelement._propagate_attrs,
1031 _compile_options=_ORMCompileState.default_compile_options,
1032 )
1033 load_options = QueryContext.default_load_options
1034
1035 load_options += {
1036 "_invoke_all_eagers": False,
1037 "_lazy_loaded_from": state,
1038 }
1039
1040 if self.parent_property.secondary is not None:
1041 stmt = stmt.select_from(
1042 self.mapper, self.parent_property.secondary
1043 )
1044
1045 pending = not state.key
1046
1047 # don't autoflush on pending
1048 if pending or passive & attributes.NO_AUTOFLUSH:
1049 stmt._execution_options = util.immutabledict({"autoflush": False})
1050
1051 use_get = self.use_get
1052
1053 if state.load_options or (loadopt and loadopt._extra_criteria):
1054 if alternate_effective_path is None:
1055 effective_path = state.load_path[self.parent_property]
1056 else:
1057 effective_path = alternate_effective_path[self.parent_property]
1058
1059 opts = state.load_options
1060
1061 if loadopt and loadopt._extra_criteria:
1062 use_get = False
1063 opts += (
1064 orm_util.LoaderCriteriaOption(self.entity, extra_criteria),
1065 )
1066
1067 stmt._with_options = opts
1068 elif alternate_effective_path is None:
1069 # this path is used if there are not already any options
1070 # in the query, but an event may want to add them
1071 effective_path = state.mapper._path_registry[self.parent_property]
1072 else:
1073 # added by immediateloader
1074 effective_path = alternate_effective_path[self.parent_property]
1075
1076 if extra_options:
1077 stmt._with_options += extra_options
1078
1079 stmt._compile_options += {"_current_path": effective_path}
1080
1081 if use_get:
1082 if self._raise_on_sql and not passive & PassiveFlag.NO_RAISE:
1083 self._invoke_raise_load(state, passive, "raise_on_sql")
1084
1085 return loading._load_on_pk_identity(
1086 session,
1087 stmt,
1088 primary_key_identity,
1089 load_options=load_options,
1090 execution_options=execution_options,
1091 )
1092
1093 if self._order_by:
1094 stmt._order_by_clauses = self._order_by
1095
1096 def _lazyload_reverse(compile_context):
1097 for rev in self.parent_property._reverse_property:
1098 # reverse props that are MANYTOONE are loading *this*
1099 # object from get(), so don't need to eager out to those.
1100 if (
1101 rev.direction is interfaces.MANYTOONE
1102 and rev._use_get
1103 and not isinstance(rev.strategy, _LazyLoader)
1104 ):
1105 strategy_options.Load._construct_for_existing_path(
1106 compile_context.compile_options._current_path[
1107 rev.parent
1108 ]
1109 ).lazyload(rev).process_compile_state(compile_context)
1110
1111 stmt = stmt._add_compile_state_func(
1112 _lazyload_reverse, self.parent_property
1113 )
1114
1115 lazy_clause, params = self._generate_lazy_clause(state, passive)
1116
1117 if execution_options:
1118 execution_options = util.EMPTY_DICT.merge_with(
1119 execution_options,
1120 {
1121 "_sa_orm_load_options": load_options,
1122 },
1123 )
1124 else:
1125 execution_options = {
1126 "_sa_orm_load_options": load_options,
1127 }
1128
1129 if (
1130 self.key in state.dict
1131 and not passive & PassiveFlag.DEFERRED_HISTORY_LOAD
1132 ):
1133 return LoaderCallableStatus.ATTR_WAS_SET
1134
1135 if pending:
1136 if util.has_intersection(orm_util._none_set, params.values()):
1137 return None
1138
1139 elif util.has_intersection(orm_util._never_set, params.values()):
1140 return None
1141
1142 if self._raise_on_sql and not passive & PassiveFlag.NO_RAISE:
1143 self._invoke_raise_load(state, passive, "raise_on_sql")
1144
1145 stmt._where_criteria = (lazy_clause,)
1146
1147 result = session.execute(
1148 stmt, params, execution_options=execution_options
1149 )
1150
1151 result = result.unique().scalars().all()
1152
1153 if self.uselist:
1154 return result
1155 else:
1156 l = len(result)
1157 if l:
1158 if l > 1:
1159 util.warn(
1160 "Multiple rows returned with "
1161 "uselist=False for lazily-loaded attribute '%s' "
1162 % self.parent_property
1163 )
1164
1165 return result[0]
1166 else:
1167 return None
1168
1169 def create_row_processor(
1170 self,
1171 context,
1172 query_entity,
1173 path,
1174 loadopt,
1175 mapper,
1176 result,
1177 adapter,
1178 populators,
1179 ):
1180 key = self.key
1181
1182 if (
1183 context.load_options._is_user_refresh
1184 and context.query._compile_options._only_load_props
1185 and self.key in context.query._compile_options._only_load_props
1186 ):
1187 return self._immediateload_create_row_processor(
1188 context,
1189 query_entity,
1190 path,
1191 loadopt,
1192 mapper,
1193 result,
1194 adapter,
1195 populators,
1196 )
1197
1198 if not self.is_class_level or (loadopt and loadopt._extra_criteria):
1199 # we are not the primary manager for this attribute
1200 # on this class - set up a
1201 # per-instance lazyloader, which will override the
1202 # class-level behavior.
1203 # this currently only happens when using a
1204 # "lazyload" option on a "no load"
1205 # attribute - "eager" attributes always have a
1206 # class-level lazyloader installed.
1207 set_lazy_callable = (
1208 InstanceState._instance_level_callable_processor
1209 )(
1210 mapper.class_manager,
1211 _LoadLazyAttribute(
1212 key,
1213 self,
1214 loadopt,
1215 (
1216 loadopt._generate_extra_criteria(context)
1217 if loadopt._extra_criteria
1218 else None
1219 ),
1220 ),
1221 key,
1222 )
1223
1224 populators["new"].append((self.key, set_lazy_callable))
1225 elif context.populate_existing or mapper.always_refresh:
1226
1227 def reset_for_lazy_callable(state, dict_, row):
1228 # we are the primary manager for this attribute on
1229 # this class - reset its
1230 # per-instance attribute state, so that the class-level
1231 # lazy loader is
1232 # executed when next referenced on this instance.
1233 # this is needed in
1234 # populate_existing() types of scenarios to reset
1235 # any existing state.
1236 state._reset(dict_, key)
1237
1238 populators["new"].append((self.key, reset_for_lazy_callable))
1239
1240
1241class _LoadLazyAttribute:
1242 """semi-serializable loader object used by LazyLoader
1243
1244 Historically, this object would be carried along with instances that
1245 needed to run lazyloaders, so it had to be serializable to support
1246 cached instances.
1247
1248 this is no longer a general requirement, and the case where this object
1249 is used is exactly the case where we can't really serialize easily,
1250 which is when extra criteria in the loader option is present.
1251
1252 We can't reliably serialize that as it refers to mapped entities and
1253 AliasedClass objects that are local to the current process, which would
1254 need to be matched up on deserialize e.g. the sqlalchemy.ext.serializer
1255 approach.
1256
1257 """
1258
1259 def __init__(self, key, initiating_strategy, loadopt, extra_criteria):
1260 self.key = key
1261 self.strategy_key = initiating_strategy.strategy_key
1262 self.loadopt = loadopt
1263 self.extra_criteria = extra_criteria
1264
1265 def __getstate__(self):
1266 if self.extra_criteria is not None:
1267 util.warn(
1268 "Can't reliably serialize a lazyload() option that "
1269 "contains additional criteria; please use eager loading "
1270 "for this case"
1271 )
1272 return {
1273 "key": self.key,
1274 "strategy_key": self.strategy_key,
1275 "loadopt": self.loadopt,
1276 "extra_criteria": (),
1277 }
1278
1279 def __call__(self, state, passive=attributes.PASSIVE_OFF):
1280 key = self.key
1281 instance_mapper = state.manager.mapper
1282 prop = instance_mapper._props[key]
1283 strategy = prop._strategies[self.strategy_key]
1284
1285 return strategy._load_for_state(
1286 state,
1287 passive,
1288 loadopt=self.loadopt,
1289 extra_criteria=self.extra_criteria,
1290 )
1291
1292
1293class _PostLoader(_AbstractRelationshipLoader):
1294 """A relationship loader that emits a second SELECT statement."""
1295
1296 __slots__ = ()
1297
1298 def _setup_for_recursion(self, context, path, loadopt, join_depth=None):
1299 effective_path = (
1300 context.compile_state.current_path or orm_util.PathRegistry.root
1301 ) + path
1302
1303 top_level_context = context._get_top_level_context()
1304 execution_options = util.immutabledict(
1305 {"sa_top_level_orm_context": top_level_context}
1306 )
1307
1308 if loadopt:
1309 recursion_depth = loadopt.local_opts.get("recursion_depth", None)
1310 unlimited_recursion = recursion_depth == -1
1311 else:
1312 recursion_depth = None
1313 unlimited_recursion = False
1314
1315 if recursion_depth is not None:
1316 if not self.parent_property._is_self_referential:
1317 raise sa_exc.InvalidRequestError(
1318 f"recursion_depth option on relationship "
1319 f"{self.parent_property} not valid for "
1320 "non-self-referential relationship"
1321 )
1322 recursion_depth = context.execution_options.get(
1323 f"_recursion_depth_{id(self)}", recursion_depth
1324 )
1325
1326 if not unlimited_recursion and recursion_depth < 0:
1327 return (
1328 effective_path,
1329 False,
1330 execution_options,
1331 recursion_depth,
1332 )
1333
1334 if not unlimited_recursion:
1335 execution_options = execution_options.union(
1336 {
1337 f"_recursion_depth_{id(self)}": recursion_depth - 1,
1338 }
1339 )
1340
1341 if loading._PostLoad.path_exists(
1342 context, effective_path, self.parent_property
1343 ):
1344 return effective_path, False, execution_options, recursion_depth
1345
1346 path_w_prop = path[self.parent_property]
1347 effective_path_w_prop = effective_path[self.parent_property]
1348
1349 if not path_w_prop.contains(context.attributes, "loader"):
1350 if join_depth:
1351 if effective_path_w_prop.length / 2 > join_depth:
1352 return (
1353 effective_path,
1354 False,
1355 execution_options,
1356 recursion_depth,
1357 )
1358 elif effective_path_w_prop.contains_mapper(self.mapper):
1359 return (
1360 effective_path,
1361 False,
1362 execution_options,
1363 recursion_depth,
1364 )
1365
1366 return effective_path, True, execution_options, recursion_depth
1367
1368
1369@relationships.RelationshipProperty.strategy_for(lazy="immediate")
1370class _ImmediateLoader(_PostLoader):
1371 __slots__ = ("join_depth",)
1372
1373 def __init__(self, parent, strategy_key):
1374 super().__init__(parent, strategy_key)
1375 self.join_depth = self.parent_property.join_depth
1376
1377 def init_class_attribute(self, mapper):
1378 self.parent_property._get_strategy(
1379 (("lazy", "select"),)
1380 ).init_class_attribute(mapper)
1381
1382 def create_row_processor(
1383 self,
1384 context,
1385 query_entity,
1386 path,
1387 loadopt,
1388 mapper,
1389 result,
1390 adapter,
1391 populators,
1392 ):
1393 if not context.compile_state.compile_options._enable_eagerloads:
1394 return
1395
1396 (
1397 effective_path,
1398 run_loader,
1399 execution_options,
1400 recursion_depth,
1401 ) = self._setup_for_recursion(context, path, loadopt, self.join_depth)
1402
1403 if not run_loader:
1404 # this will not emit SQL and will only emit for a many-to-one
1405 # "use get" load. the "_RELATED" part means it may return
1406 # instance even if its expired, since this is a mutually-recursive
1407 # load operation.
1408 flags = attributes.PASSIVE_NO_FETCH_RELATED | PassiveFlag.NO_RAISE
1409 else:
1410 flags = attributes.PASSIVE_OFF | PassiveFlag.NO_RAISE
1411
1412 loading._PostLoad.callable_for_path(
1413 context,
1414 effective_path,
1415 self.parent,
1416 self.parent_property,
1417 self._load_for_path,
1418 loadopt,
1419 flags,
1420 recursion_depth,
1421 execution_options,
1422 )
1423
1424 def _load_for_path(
1425 self,
1426 context,
1427 path,
1428 states,
1429 load_only,
1430 loadopt,
1431 flags,
1432 recursion_depth,
1433 execution_options,
1434 ):
1435 if recursion_depth:
1436 new_opt = Load(loadopt.path.entity)
1437 new_opt.context = (
1438 loadopt,
1439 loadopt._recurse(),
1440 )
1441 alternate_effective_path = path._truncate_recursive()
1442 extra_options = (new_opt,)
1443 else:
1444 alternate_effective_path = path
1445 extra_options = ()
1446
1447 key = self.key
1448 lazyloader = self.parent_property._get_strategy((("lazy", "select"),))
1449 for state, overwrite in states:
1450 dict_ = state.dict
1451
1452 if overwrite or key not in dict_:
1453 value = lazyloader._load_for_state(
1454 state,
1455 flags,
1456 extra_options=extra_options,
1457 alternate_effective_path=alternate_effective_path,
1458 execution_options=execution_options,
1459 )
1460 if value not in (
1461 ATTR_WAS_SET,
1462 LoaderCallableStatus.PASSIVE_NO_RESULT,
1463 ):
1464 state.get_impl(key).set_committed_value(
1465 state, dict_, value
1466 )
1467
1468
1469@log.class_logger
1470@relationships.RelationshipProperty.strategy_for(lazy="subquery")
1471class _SubqueryLoader(_PostLoader):
1472 __slots__ = ("join_depth",)
1473
1474 def __init__(self, parent, strategy_key):
1475 super().__init__(parent, strategy_key)
1476 self.join_depth = self.parent_property.join_depth
1477
1478 def init_class_attribute(self, mapper):
1479 self.parent_property._get_strategy(
1480 (("lazy", "select"),)
1481 ).init_class_attribute(mapper)
1482
1483 def _get_leftmost(
1484 self,
1485 orig_query_entity_index,
1486 subq_path,
1487 current_compile_state,
1488 is_root,
1489 ):
1490 given_subq_path = subq_path
1491 subq_path = subq_path.path
1492 subq_mapper = orm_util._class_to_mapper(subq_path[0])
1493
1494 # determine attributes of the leftmost mapper
1495 if (
1496 self.parent.isa(subq_mapper)
1497 and self.parent_property is subq_path[1]
1498 ):
1499 leftmost_mapper, leftmost_prop = self.parent, self.parent_property
1500 else:
1501 leftmost_mapper, leftmost_prop = subq_mapper, subq_path[1]
1502
1503 if is_root:
1504 # the subq_path is also coming from cached state, so when we start
1505 # building up this path, it has to also be converted to be in terms
1506 # of the current state. this is for the specific case of the entity
1507 # is an AliasedClass against a subquery that's not otherwise going
1508 # to adapt
1509 new_subq_path = current_compile_state._entities[
1510 orig_query_entity_index
1511 ].entity_zero._path_registry[leftmost_prop]
1512 additional = len(subq_path) - len(new_subq_path)
1513 if additional:
1514 new_subq_path += path_registry.PathRegistry.coerce(
1515 subq_path[-additional:]
1516 )
1517 else:
1518 new_subq_path = given_subq_path
1519
1520 leftmost_cols = leftmost_prop.local_columns
1521
1522 leftmost_attr = [
1523 getattr(
1524 new_subq_path.path[0].entity,
1525 leftmost_mapper._columntoproperty[c].key,
1526 )
1527 for c in leftmost_cols
1528 ]
1529
1530 return leftmost_mapper, leftmost_attr, leftmost_prop, new_subq_path
1531
1532 def _generate_from_original_query(
1533 self,
1534 orig_compile_state,
1535 orig_query,
1536 leftmost_mapper,
1537 leftmost_attr,
1538 leftmost_relationship,
1539 orig_entity,
1540 ):
1541 # reformat the original query
1542 # to look only for significant columns
1543 q = orig_query._clone().correlate(None)
1544
1545 # LEGACY: make a Query back from the select() !!
1546 # This suits at least two legacy cases:
1547 # 1. applications which expect before_compile() to be called
1548 # below when we run .subquery() on this query (Keystone)
1549 # 2. applications which are doing subqueryload with complex
1550 # from_self() queries, as query.subquery() / .statement
1551 # has to do the full compile context for multiply-nested
1552 # from_self() (Neutron) - see test_subqload_from_self
1553 # for demo.
1554 q2 = query.Query.__new__(query.Query)
1555 q2.__dict__.update(q.__dict__)
1556 q = q2
1557
1558 # set the query's "FROM" list explicitly to what the
1559 # FROM list would be in any case, as we will be limiting
1560 # the columns in the SELECT list which may no longer include
1561 # all entities mentioned in things like WHERE, JOIN, etc.
1562 if not q._from_obj:
1563 q._enable_assertions = False
1564 q.select_from.non_generative(
1565 q,
1566 *{
1567 ent["entity"]
1568 for ent in _column_descriptions(
1569 orig_query, compile_state=orig_compile_state
1570 )
1571 if ent["entity"] is not None
1572 },
1573 )
1574
1575 # select from the identity columns of the outer (specifically, these
1576 # are the 'local_cols' of the property). This will remove other
1577 # columns from the query that might suggest the right entity which is
1578 # why we do set select_from above. The attributes we have are
1579 # coerced and adapted using the original query's adapter, which is
1580 # needed only for the case of adapting a subclass column to
1581 # that of a polymorphic selectable, e.g. we have
1582 # Engineer.primary_language and the entity is Person. All other
1583 # adaptations, e.g. from_self, select_entity_from(), will occur
1584 # within the new query when it compiles, as the compile_state we are
1585 # using here is only a partial one. If the subqueryload is from a
1586 # with_polymorphic() or other aliased() object, left_attr will already
1587 # be the correct attributes so no adaptation is needed.
1588 target_cols = orig_compile_state._adapt_col_list(
1589 [
1590 sql.coercions.expect(sql.roles.ColumnsClauseRole, o)
1591 for o in leftmost_attr
1592 ],
1593 orig_compile_state._get_current_adapter(),
1594 )
1595 q._raw_columns = target_cols
1596
1597 distinct_target_key = leftmost_relationship.distinct_target_key
1598
1599 if distinct_target_key is True:
1600 q._distinct = True
1601 elif distinct_target_key is None:
1602 # if target_cols refer to a non-primary key or only
1603 # part of a composite primary key, set the q as distinct
1604 for t in {c.table for c in target_cols}:
1605 if not set(target_cols).issuperset(t.primary_key):
1606 q._distinct = True
1607 break
1608
1609 # don't need ORDER BY if no limit/offset
1610 if not q._has_row_limiting_clause:
1611 q._order_by_clauses = ()
1612
1613 if q._distinct is True and q._order_by_clauses:
1614 # the logic to automatically add the order by columns to the query
1615 # when distinct is True is deprecated in the query
1616 to_add = sql_util.expand_column_list_from_order_by(
1617 target_cols, q._order_by_clauses
1618 )
1619 if to_add:
1620 q._set_entities(target_cols + to_add)
1621
1622 # the original query now becomes a subquery
1623 # which we'll join onto.
1624 # LEGACY: as "q" is a Query, the before_compile() event is invoked
1625 # here.
1626 embed_q = q.set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL).subquery()
1627 left_alias = orm_util.AliasedClass(
1628 leftmost_mapper, embed_q, use_mapper_path=True
1629 )
1630 return left_alias
1631
1632 def _prep_for_joins(self, left_alias, subq_path):
1633 # figure out what's being joined. a.k.a. the fun part
1634 to_join = []
1635 pairs = list(subq_path.pairs())
1636
1637 for i, (mapper, prop) in enumerate(pairs):
1638 if i > 0:
1639 # look at the previous mapper in the chain -
1640 # if it is as or more specific than this prop's
1641 # mapper, use that instead.
1642 # note we have an assumption here that
1643 # the non-first element is always going to be a mapper,
1644 # not an AliasedClass
1645
1646 prev_mapper = pairs[i - 1][1].mapper
1647 to_append = prev_mapper if prev_mapper.isa(mapper) else mapper
1648 else:
1649 to_append = mapper
1650
1651 to_join.append((to_append, prop.key))
1652
1653 # determine the immediate parent class we are joining from,
1654 # which needs to be aliased.
1655
1656 if len(to_join) < 2:
1657 # in the case of a one level eager load, this is the
1658 # leftmost "left_alias".
1659 parent_alias = left_alias
1660 else:
1661 info = inspect(to_join[-1][0])
1662 if info.is_aliased_class:
1663 parent_alias = info.entity
1664 else:
1665 # alias a plain mapper as we may be
1666 # joining multiple times
1667 parent_alias = orm_util.AliasedClass(
1668 info.entity, use_mapper_path=True
1669 )
1670
1671 local_cols = self.parent_property.local_columns
1672
1673 local_attr = [
1674 getattr(parent_alias, self.parent._columntoproperty[c].key)
1675 for c in local_cols
1676 ]
1677 return to_join, local_attr, parent_alias
1678
1679 def _apply_joins(
1680 self, q, to_join, left_alias, parent_alias, effective_entity
1681 ):
1682 ltj = len(to_join)
1683 if ltj == 1:
1684 to_join = [
1685 getattr(left_alias, to_join[0][1]).of_type(effective_entity)
1686 ]
1687 elif ltj == 2:
1688 to_join = [
1689 getattr(left_alias, to_join[0][1]).of_type(parent_alias),
1690 getattr(parent_alias, to_join[-1][1]).of_type(
1691 effective_entity
1692 ),
1693 ]
1694 elif ltj > 2:
1695 middle = [
1696 (
1697 (
1698 orm_util.AliasedClass(item[0])
1699 if not inspect(item[0]).is_aliased_class
1700 else item[0].entity
1701 ),
1702 item[1],
1703 )
1704 for item in to_join[1:-1]
1705 ]
1706 inner = []
1707
1708 while middle:
1709 item = middle.pop(0)
1710 attr = getattr(item[0], item[1])
1711 if middle:
1712 attr = attr.of_type(middle[0][0])
1713 else:
1714 attr = attr.of_type(parent_alias)
1715
1716 inner.append(attr)
1717
1718 to_join = (
1719 [getattr(left_alias, to_join[0][1]).of_type(inner[0].parent)]
1720 + inner
1721 + [
1722 getattr(parent_alias, to_join[-1][1]).of_type(
1723 effective_entity
1724 )
1725 ]
1726 )
1727
1728 for attr in to_join:
1729 q = q.join(attr)
1730
1731 return q
1732
1733 def _setup_options(
1734 self,
1735 context,
1736 q,
1737 subq_path,
1738 rewritten_path,
1739 orig_query,
1740 effective_entity,
1741 loadopt,
1742 ):
1743 # note that because the subqueryload object
1744 # does not re-use the cached query, instead always making
1745 # use of the current invoked query, while we have two queries
1746 # here (orig and context.query), they are both non-cached
1747 # queries and we can transfer the options as is without
1748 # adjusting for new criteria. Some work on #6881 / #6889
1749 # brought this into question.
1750 new_options = orig_query._with_options
1751
1752 if loadopt and loadopt._extra_criteria:
1753 new_options += (
1754 orm_util.LoaderCriteriaOption(
1755 self.entity,
1756 loadopt._generate_extra_criteria(context),
1757 ),
1758 )
1759
1760 # propagate loader options etc. to the new query.
1761 # these will fire relative to subq_path.
1762 q = q._with_current_path(rewritten_path)
1763 q = q.options(*new_options)
1764
1765 return q
1766
1767 def _setup_outermost_orderby(self, q):
1768 if self.parent_property.order_by:
1769
1770 def _setup_outermost_orderby(compile_context):
1771 compile_context.eager_order_by += tuple(
1772 util.to_list(self.parent_property.order_by)
1773 )
1774
1775 q = q._add_compile_state_func(
1776 _setup_outermost_orderby, self.parent_property
1777 )
1778
1779 return q
1780
1781 class _SubqCollections:
1782 """Given a :class:`_query.Query` used to emit the "subquery load",
1783 provide a load interface that executes the query at the
1784 first moment a value is needed.
1785
1786 """
1787
1788 __slots__ = (
1789 "session",
1790 "execution_options",
1791 "load_options",
1792 "params",
1793 "subq",
1794 "_data",
1795 )
1796
1797 def __init__(self, context, subq):
1798 # avoid creating a cycle by storing context
1799 # even though that's preferable
1800 self.session = context.session
1801 self.execution_options = context.execution_options
1802 self.load_options = context.load_options
1803 self.params = context.params or {}
1804 self.subq = subq
1805 self._data = None
1806
1807 def get(self, key, default):
1808 if self._data is None:
1809 self._load()
1810 return self._data.get(key, default)
1811
1812 def _load(self):
1813 self._data = collections.defaultdict(list)
1814
1815 q = self.subq
1816 assert q.session is None
1817
1818 q = q.with_session(self.session)
1819
1820 if self.load_options._populate_existing:
1821 q = q.populate_existing()
1822 # to work with baked query, the parameters may have been
1823 # updated since this query was created, so take these into account
1824
1825 rows = list(q.params(self.params))
1826 for k, v in itertools.groupby(rows, lambda x: x[1:]):
1827 self._data[k].extend(vv[0] for vv in v)
1828
1829 def loader(self, state, dict_, row):
1830 if self._data is None:
1831 self._load()
1832
1833 def _setup_query_from_rowproc(
1834 self,
1835 context,
1836 query_entity,
1837 path,
1838 entity,
1839 loadopt,
1840 adapter,
1841 ):
1842 compile_state = context.compile_state
1843 if (
1844 not compile_state.compile_options._enable_eagerloads
1845 or compile_state.compile_options._for_refresh_state
1846 ):
1847 return
1848
1849 orig_query_entity_index = compile_state._entities.index(query_entity)
1850 context.loaders_require_buffering = True
1851
1852 path = path[self.parent_property]
1853
1854 # build up a path indicating the path from the leftmost
1855 # entity to the thing we're subquery loading.
1856 with_poly_entity = path.get(
1857 compile_state.attributes, "path_with_polymorphic", None
1858 )
1859 if with_poly_entity is not None:
1860 effective_entity = with_poly_entity
1861 else:
1862 effective_entity = self.entity
1863
1864 subq_path, rewritten_path = context.query._execution_options.get(
1865 ("subquery_paths", None),
1866 (orm_util.PathRegistry.root, orm_util.PathRegistry.root),
1867 )
1868 is_root = subq_path is orm_util.PathRegistry.root
1869 subq_path = subq_path + path
1870 rewritten_path = rewritten_path + path
1871
1872 # use the current query being invoked, not the compile state
1873 # one. this is so that we get the current parameters. however,
1874 # it means we can't use the existing compile state, we have to make
1875 # a new one. other approaches include possibly using the
1876 # compiled query but swapping the params, seems only marginally
1877 # less time spent but more complicated
1878 orig_query = context.query._execution_options.get(
1879 ("orig_query", _SubqueryLoader), context.query
1880 )
1881
1882 # make a new compile_state for the query that's probably cached, but
1883 # we're sort of undoing a bit of that caching :(
1884 compile_state_cls = _ORMCompileState._get_plugin_class_for_plugin(
1885 orig_query, "orm"
1886 )
1887
1888 if orig_query._is_lambda_element:
1889 if context.load_options._lazy_loaded_from is None:
1890 util.warn(
1891 'subqueryloader for "%s" must invoke lambda callable '
1892 "at %r in "
1893 "order to produce a new query, decreasing the efficiency "
1894 "of caching for this statement. Consider using "
1895 "selectinload() for more effective full-lambda caching"
1896 % (self, orig_query)
1897 )
1898 orig_query = orig_query._resolved
1899
1900 # this is the more "quick" version, however it's not clear how
1901 # much of this we need. in particular I can't get a test to
1902 # fail if the "set_base_alias" is missing and not sure why that is.
1903 orig_compile_state = compile_state_cls._create_entities_collection(
1904 orig_query, legacy=False
1905 )
1906
1907 (
1908 leftmost_mapper,
1909 leftmost_attr,
1910 leftmost_relationship,
1911 rewritten_path,
1912 ) = self._get_leftmost(
1913 orig_query_entity_index,
1914 rewritten_path,
1915 orig_compile_state,
1916 is_root,
1917 )
1918
1919 # generate a new Query from the original, then
1920 # produce a subquery from it.
1921 left_alias = self._generate_from_original_query(
1922 orig_compile_state,
1923 orig_query,
1924 leftmost_mapper,
1925 leftmost_attr,
1926 leftmost_relationship,
1927 entity,
1928 )
1929
1930 # generate another Query that will join the
1931 # left alias to the target relationships.
1932 # basically doing a longhand
1933 # "from_self()". (from_self() itself not quite industrial
1934 # strength enough for all contingencies...but very close)
1935
1936 q = query.Query(effective_entity)
1937
1938 q._execution_options = context.query._execution_options.merge_with(
1939 context.execution_options,
1940 {
1941 ("orig_query", _SubqueryLoader): orig_query,
1942 ("subquery_paths", None): (subq_path, rewritten_path),
1943 },
1944 )
1945
1946 q = q._set_enable_single_crit(False)
1947 to_join, local_attr, parent_alias = self._prep_for_joins(
1948 left_alias, subq_path
1949 )
1950
1951 q = q.add_columns(*local_attr)
1952 q = self._apply_joins(
1953 q, to_join, left_alias, parent_alias, effective_entity
1954 )
1955
1956 q = self._setup_options(
1957 context,
1958 q,
1959 subq_path,
1960 rewritten_path,
1961 orig_query,
1962 effective_entity,
1963 loadopt,
1964 )
1965 q = self._setup_outermost_orderby(q)
1966
1967 return q
1968
1969 def create_row_processor(
1970 self,
1971 context,
1972 query_entity,
1973 path,
1974 loadopt,
1975 mapper,
1976 result,
1977 adapter,
1978 populators,
1979 ):
1980 if (
1981 loadopt
1982 and context.compile_state.statement is not None
1983 and context.compile_state.statement.is_dml
1984 ):
1985 util.warn_deprecated(
1986 "The subqueryload loader option is not compatible with DML "
1987 "statements such as INSERT, UPDATE. Only SELECT may be used."
1988 "This warning will become an exception in a future release.",
1989 "2.0",
1990 )
1991
1992 if context.refresh_state:
1993 return self._immediateload_create_row_processor(
1994 context,
1995 query_entity,
1996 path,
1997 loadopt,
1998 mapper,
1999 result,
2000 adapter,
2001 populators,
2002 )
2003
2004 _, run_loader, _, _ = self._setup_for_recursion(
2005 context, path, loadopt, self.join_depth
2006 )
2007 if not run_loader:
2008 return
2009
2010 if not isinstance(context.compile_state, _ORMSelectCompileState):
2011 # issue 7505 - subqueryload() in 1.3 and previous would silently
2012 # degrade for from_statement() without warning. this behavior
2013 # is restored here
2014 return
2015
2016 if not self.parent.class_manager[self.key].impl.supports_population:
2017 raise sa_exc.InvalidRequestError(
2018 "'%s' does not support object "
2019 "population - eager loading cannot be applied." % self
2020 )
2021
2022 # a little dance here as the "path" is still something that only
2023 # semi-tracks the exact series of things we are loading, still not
2024 # telling us about with_polymorphic() and stuff like that when it's at
2025 # the root.. the initial MapperEntity is more accurate for this case.
2026 if len(path) == 1:
2027 if not orm_util._entity_isa(query_entity.entity_zero, self.parent):
2028 return
2029 elif not orm_util._entity_isa(path[-1], self.parent):
2030 return
2031
2032 subq = self._setup_query_from_rowproc(
2033 context,
2034 query_entity,
2035 path,
2036 path[-1],
2037 loadopt,
2038 adapter,
2039 )
2040
2041 if subq is None:
2042 return
2043
2044 assert subq.session is None
2045
2046 path = path[self.parent_property]
2047
2048 local_cols = self.parent_property.local_columns
2049
2050 # cache the loaded collections in the context
2051 # so that inheriting mappers don't re-load when they
2052 # call upon create_row_processor again
2053 collections = path.get(context.attributes, "collections")
2054 if collections is None:
2055 collections = self._SubqCollections(context, subq)
2056 path.set(context.attributes, "collections", collections)
2057
2058 if adapter:
2059 local_cols = [adapter.columns[c] for c in local_cols]
2060
2061 if self.uselist:
2062 self._create_collection_loader(
2063 context, result, collections, local_cols, populators
2064 )
2065 else:
2066 self._create_scalar_loader(
2067 context, result, collections, local_cols, populators
2068 )
2069
2070 def _create_collection_loader(
2071 self, context, result, collections, local_cols, populators
2072 ):
2073 tuple_getter = result._tuple_getter(local_cols)
2074
2075 def load_collection_from_subq(state, dict_, row):
2076 collection = collections.get(tuple_getter(row), ())
2077 state.get_impl(self.key).set_committed_value(
2078 state, dict_, collection
2079 )
2080
2081 def load_collection_from_subq_existing_row(state, dict_, row):
2082 if self.key not in dict_:
2083 load_collection_from_subq(state, dict_, row)
2084
2085 populators["new"].append((self.key, load_collection_from_subq))
2086 populators["existing"].append(
2087 (self.key, load_collection_from_subq_existing_row)
2088 )
2089
2090 if context.invoke_all_eagers:
2091 populators["eager"].append((self.key, collections.loader))
2092
2093 def _create_scalar_loader(
2094 self, context, result, collections, local_cols, populators
2095 ):
2096 tuple_getter = result._tuple_getter(local_cols)
2097
2098 def load_scalar_from_subq(state, dict_, row):
2099 collection = collections.get(tuple_getter(row), (None,))
2100 if len(collection) > 1:
2101 util.warn(
2102 "Multiple rows returned with "
2103 "uselist=False for eagerly-loaded attribute '%s' " % self
2104 )
2105
2106 scalar = collection[0]
2107 state.get_impl(self.key).set_committed_value(state, dict_, scalar)
2108
2109 def load_scalar_from_subq_existing_row(state, dict_, row):
2110 if self.key not in dict_:
2111 load_scalar_from_subq(state, dict_, row)
2112
2113 populators["new"].append((self.key, load_scalar_from_subq))
2114 populators["existing"].append(
2115 (self.key, load_scalar_from_subq_existing_row)
2116 )
2117 if context.invoke_all_eagers:
2118 populators["eager"].append((self.key, collections.loader))
2119
2120
2121@log.class_logger
2122@relationships.RelationshipProperty.strategy_for(lazy="joined")
2123@relationships.RelationshipProperty.strategy_for(lazy=False)
2124class _JoinedLoader(_AbstractRelationshipLoader):
2125 """Provide loading behavior for a :class:`.Relationship`
2126 using joined eager loading.
2127
2128 """
2129
2130 __slots__ = "join_depth"
2131
2132 def __init__(self, parent, strategy_key):
2133 super().__init__(parent, strategy_key)
2134 self.join_depth = self.parent_property.join_depth
2135
2136 def init_class_attribute(self, mapper):
2137 self.parent_property._get_strategy(
2138 (("lazy", "select"),)
2139 ).init_class_attribute(mapper)
2140
2141 def setup_query(
2142 self,
2143 compile_state,
2144 query_entity,
2145 path,
2146 loadopt,
2147 adapter,
2148 column_collection=None,
2149 parentmapper=None,
2150 chained_from_outerjoin=False,
2151 **kwargs,
2152 ):
2153 """Add a left outer join to the statement that's being constructed."""
2154
2155 if not compile_state.compile_options._enable_eagerloads:
2156 return
2157 elif (
2158 loadopt
2159 and compile_state.statement is not None
2160 and compile_state.statement.is_dml
2161 ):
2162 util.warn_deprecated(
2163 "The joinedload loader option is not compatible with DML "
2164 "statements such as INSERT, UPDATE. Only SELECT may be used."
2165 "This warning will become an exception in a future release.",
2166 "2.0",
2167 )
2168 elif self.uselist:
2169 compile_state.multi_row_eager_loaders = True
2170
2171 path = path[self.parent_property]
2172
2173 user_defined_adapter = (
2174 self._init_user_defined_eager_proc(
2175 loadopt, compile_state, compile_state.attributes
2176 )
2177 if loadopt
2178 else False
2179 )
2180
2181 if user_defined_adapter is not False:
2182 # setup an adapter but dont create any JOIN, assume it's already
2183 # in the query
2184 (
2185 clauses,
2186 adapter,
2187 add_to_collection,
2188 ) = self._setup_query_on_user_defined_adapter(
2189 compile_state,
2190 query_entity,
2191 path,
2192 adapter,
2193 user_defined_adapter,
2194 )
2195
2196 # don't do "wrap" for multi-row, we want to wrap
2197 # limited/distinct SELECT,
2198 # because we want to put the JOIN on the outside.
2199
2200 else:
2201 # if not via query option, check for
2202 # a cycle
2203 if not path.contains(compile_state.attributes, "loader"):
2204 if self.join_depth:
2205 if path.length / 2 > self.join_depth:
2206 return
2207 elif path.contains_mapper(self.mapper):
2208 return
2209
2210 # add the JOIN and create an adapter
2211 (
2212 clauses,
2213 adapter,
2214 add_to_collection,
2215 chained_from_outerjoin,
2216 ) = self._generate_row_adapter(
2217 compile_state,
2218 query_entity,
2219 path,
2220 loadopt,
2221 adapter,
2222 column_collection,
2223 parentmapper,
2224 chained_from_outerjoin,
2225 )
2226
2227 # for multi-row, we want to wrap limited/distinct SELECT,
2228 # because we want to put the JOIN on the outside.
2229 compile_state.eager_adding_joins = True
2230
2231 with_poly_entity = path.get(
2232 compile_state.attributes, "path_with_polymorphic", None
2233 )
2234 if with_poly_entity is not None:
2235 with_polymorphic = inspect(
2236 with_poly_entity
2237 ).with_polymorphic_mappers
2238 else:
2239 with_polymorphic = None
2240
2241 path = path[self.entity]
2242
2243 loading._setup_entity_query(
2244 compile_state,
2245 self.mapper,
2246 query_entity,
2247 path,
2248 clauses,
2249 add_to_collection,
2250 with_polymorphic=with_polymorphic,
2251 parentmapper=self.mapper,
2252 chained_from_outerjoin=chained_from_outerjoin,
2253 )
2254
2255 has_nones = util.NONE_SET.intersection(compile_state.secondary_columns)
2256
2257 if has_nones:
2258 if with_poly_entity is not None:
2259 raise sa_exc.InvalidRequestError(
2260 "Detected unaliased columns when generating joined "
2261 "load. Make sure to use aliased=True or flat=True "
2262 "when using joined loading with with_polymorphic()."
2263 )
2264 else:
2265 compile_state.secondary_columns = [
2266 c for c in compile_state.secondary_columns if c is not None
2267 ]
2268
2269 def _init_user_defined_eager_proc(
2270 self, loadopt, compile_state, target_attributes
2271 ):
2272 # check if the opt applies at all
2273 if "eager_from_alias" not in loadopt.local_opts:
2274 # nope
2275 return False
2276
2277 path = loadopt.path.parent
2278
2279 # the option applies. check if the "user_defined_eager_row_processor"
2280 # has been built up.
2281 adapter = path.get(
2282 compile_state.attributes, "user_defined_eager_row_processor", False
2283 )
2284 if adapter is not False:
2285 # just return it
2286 return adapter
2287
2288 # otherwise figure it out.
2289 alias = loadopt.local_opts["eager_from_alias"]
2290 root_mapper, prop = path[-2:]
2291
2292 if alias is not None:
2293 if isinstance(alias, str):
2294 alias = prop.target.alias(alias)
2295 adapter = orm_util.ORMAdapter(
2296 orm_util._TraceAdaptRole.JOINEDLOAD_USER_DEFINED_ALIAS,
2297 prop.mapper,
2298 selectable=alias,
2299 equivalents=prop.mapper._equivalent_columns,
2300 limit_on_entity=False,
2301 )
2302 else:
2303 if path.contains(
2304 compile_state.attributes, "path_with_polymorphic"
2305 ):
2306 with_poly_entity = path.get(
2307 compile_state.attributes, "path_with_polymorphic"
2308 )
2309 adapter = orm_util.ORMAdapter(
2310 orm_util._TraceAdaptRole.JOINEDLOAD_PATH_WITH_POLYMORPHIC,
2311 with_poly_entity,
2312 equivalents=prop.mapper._equivalent_columns,
2313 )
2314 else:
2315 adapter = compile_state._polymorphic_adapters.get(
2316 prop.mapper, None
2317 )
2318 path.set(
2319 target_attributes,
2320 "user_defined_eager_row_processor",
2321 adapter,
2322 )
2323
2324 return adapter
2325
2326 def _setup_query_on_user_defined_adapter(
2327 self, context, entity, path, adapter, user_defined_adapter
2328 ):
2329 # apply some more wrapping to the "user defined adapter"
2330 # if we are setting up the query for SQL render.
2331 adapter = entity._get_entity_clauses(context)
2332
2333 if adapter and user_defined_adapter:
2334 user_defined_adapter = user_defined_adapter.wrap(adapter)
2335 path.set(
2336 context.attributes,
2337 "user_defined_eager_row_processor",
2338 user_defined_adapter,
2339 )
2340 elif adapter:
2341 user_defined_adapter = adapter
2342 path.set(
2343 context.attributes,
2344 "user_defined_eager_row_processor",
2345 user_defined_adapter,
2346 )
2347
2348 add_to_collection = context.primary_columns
2349 return user_defined_adapter, adapter, add_to_collection
2350
2351 def _generate_row_adapter(
2352 self,
2353 compile_state,
2354 entity,
2355 path,
2356 loadopt,
2357 adapter,
2358 column_collection,
2359 parentmapper,
2360 chained_from_outerjoin,
2361 ):
2362 with_poly_entity = path.get(
2363 compile_state.attributes, "path_with_polymorphic", None
2364 )
2365 if with_poly_entity:
2366 to_adapt = with_poly_entity
2367 else:
2368 insp = inspect(self.entity)
2369 if insp.is_aliased_class:
2370 alt_selectable = insp.selectable
2371 else:
2372 alt_selectable = None
2373
2374 to_adapt = orm_util.AliasedClass(
2375 self.mapper,
2376 alias=(
2377 alt_selectable._anonymous_fromclause(flat=True)
2378 if alt_selectable is not None
2379 else None
2380 ),
2381 flat=True,
2382 use_mapper_path=True,
2383 )
2384
2385 to_adapt_insp = inspect(to_adapt)
2386
2387 clauses = to_adapt_insp._memo(
2388 ("joinedloader_ormadapter", self),
2389 orm_util.ORMAdapter,
2390 orm_util._TraceAdaptRole.JOINEDLOAD_MEMOIZED_ADAPTER,
2391 to_adapt_insp,
2392 equivalents=self.mapper._equivalent_columns,
2393 adapt_required=True,
2394 allow_label_resolve=False,
2395 anonymize_labels=True,
2396 )
2397
2398 assert clauses.is_aliased_class
2399
2400 innerjoin = (
2401 loadopt.local_opts.get("innerjoin", self.parent_property.innerjoin)
2402 if loadopt is not None
2403 else self.parent_property.innerjoin
2404 )
2405
2406 if not innerjoin:
2407 # if this is an outer join, all non-nested eager joins from
2408 # this path must also be outer joins
2409 chained_from_outerjoin = True
2410
2411 compile_state.create_eager_joins.append(
2412 (
2413 self._create_eager_join,
2414 entity,
2415 path,
2416 adapter,
2417 parentmapper,
2418 clauses,
2419 innerjoin,
2420 chained_from_outerjoin,
2421 loadopt._extra_criteria if loadopt else (),
2422 )
2423 )
2424
2425 add_to_collection = compile_state.secondary_columns
2426 path.set(compile_state.attributes, "eager_row_processor", clauses)
2427
2428 return clauses, adapter, add_to_collection, chained_from_outerjoin
2429
2430 def _create_eager_join(
2431 self,
2432 compile_state,
2433 query_entity,
2434 path,
2435 adapter,
2436 parentmapper,
2437 clauses,
2438 innerjoin,
2439 chained_from_outerjoin,
2440 extra_criteria,
2441 ):
2442 if parentmapper is None:
2443 localparent = query_entity.mapper
2444 else:
2445 localparent = parentmapper
2446
2447 # whether or not the Query will wrap the selectable in a subquery,
2448 # and then attach eager load joins to that (i.e., in the case of
2449 # LIMIT/OFFSET etc.)
2450 should_nest_selectable = compile_state._should_nest_selectable
2451
2452 query_entity_key = None
2453
2454 if (
2455 query_entity not in compile_state.eager_joins
2456 and not should_nest_selectable
2457 and compile_state.from_clauses
2458 ):
2459 indexes = sql_util.find_left_clause_that_matches_given(
2460 compile_state.from_clauses, query_entity.selectable
2461 )
2462
2463 if len(indexes) > 1:
2464 # for the eager load case, I can't reproduce this right
2465 # now. For query.join() I can.
2466 raise sa_exc.InvalidRequestError(
2467 "Can't identify which query entity in which to joined "
2468 "eager load from. Please use an exact match when "
2469 "specifying the join path."
2470 )
2471
2472 if indexes:
2473 clause = compile_state.from_clauses[indexes[0]]
2474 # join to an existing FROM clause on the query.
2475 # key it to its list index in the eager_joins dict.
2476 # Query._compile_context will adapt as needed and
2477 # append to the FROM clause of the select().
2478 query_entity_key, default_towrap = indexes[0], clause
2479
2480 if query_entity_key is None:
2481 query_entity_key, default_towrap = (
2482 query_entity,
2483 query_entity.selectable,
2484 )
2485
2486 towrap = compile_state.eager_joins.setdefault(
2487 query_entity_key, default_towrap
2488 )
2489
2490 if adapter:
2491 if getattr(adapter, "is_aliased_class", False):
2492 # joining from an adapted entity. The adapted entity
2493 # might be a "with_polymorphic", so resolve that to our
2494 # specific mapper's entity before looking for our attribute
2495 # name on it.
2496 efm = adapter.aliased_insp._entity_for_mapper(
2497 localparent
2498 if localparent.isa(self.parent)
2499 else self.parent
2500 )
2501
2502 # look for our attribute on the adapted entity, else fall back
2503 # to our straight property
2504 onclause = getattr(efm.entity, self.key, self.parent_property)
2505 else:
2506 onclause = getattr(
2507 orm_util.AliasedClass(
2508 self.parent, adapter.selectable, use_mapper_path=True
2509 ),
2510 self.key,
2511 self.parent_property,
2512 )
2513
2514 else:
2515 onclause = self.parent_property
2516
2517 assert clauses.is_aliased_class
2518
2519 attach_on_outside = (
2520 not chained_from_outerjoin
2521 or not innerjoin
2522 or innerjoin == "unnested"
2523 or query_entity.entity_zero.represents_outer_join
2524 )
2525
2526 extra_join_criteria = extra_criteria
2527 additional_entity_criteria = compile_state.global_attributes.get(
2528 ("additional_entity_criteria", self.mapper), ()
2529 )
2530 if additional_entity_criteria:
2531 extra_join_criteria += tuple(
2532 ae._resolve_where_criteria(self.mapper)
2533 for ae in additional_entity_criteria
2534 if ae.propagate_to_loaders
2535 )
2536
2537 if attach_on_outside:
2538 # this is the "classic" eager join case.
2539 eagerjoin = orm_util._ORMJoin(
2540 towrap,
2541 clauses.aliased_insp,
2542 onclause,
2543 isouter=not innerjoin
2544 or query_entity.entity_zero.represents_outer_join
2545 or (chained_from_outerjoin and isinstance(towrap, sql.Join)),
2546 _left_memo=self.parent,
2547 _right_memo=path[self.mapper],
2548 _extra_criteria=extra_join_criteria,
2549 )
2550 else:
2551 # all other cases are innerjoin=='nested' approach
2552 eagerjoin = self._splice_nested_inner_join(
2553 path, path[-2], towrap, clauses, onclause, extra_join_criteria
2554 )
2555
2556 compile_state.eager_joins[query_entity_key] = eagerjoin
2557
2558 # send a hint to the Query as to where it may "splice" this join
2559 eagerjoin.stop_on = query_entity.selectable
2560
2561 if not parentmapper:
2562 # for parentclause that is the non-eager end of the join,
2563 # ensure all the parent cols in the primaryjoin are actually
2564 # in the
2565 # columns clause (i.e. are not deferred), so that aliasing applied
2566 # by the Query propagates those columns outward.
2567 # This has the effect
2568 # of "undefering" those columns.
2569 for col in sql_util._find_columns(
2570 self.parent_property.primaryjoin
2571 ):
2572 if localparent.persist_selectable.c.contains_column(col):
2573 if adapter:
2574 col = adapter.columns[col]
2575 compile_state._append_dedupe_col_collection(
2576 col, compile_state.primary_columns
2577 )
2578
2579 if self.parent_property.order_by:
2580 compile_state.eager_order_by += tuple(
2581 (eagerjoin._target_adapter.copy_and_process)(
2582 util.to_list(self.parent_property.order_by)
2583 )
2584 )
2585
2586 def _splice_nested_inner_join(
2587 self,
2588 path,
2589 entity_we_want_to_splice_onto,
2590 join_obj,
2591 clauses,
2592 onclause,
2593 extra_criteria,
2594 entity_inside_join_structure: Union[
2595 Mapper, None, Literal[False]
2596 ] = False,
2597 detected_existing_path: Optional[path_registry.PathRegistry] = None,
2598 ):
2599 # recursive fn to splice a nested join into an existing one.
2600 # entity_inside_join_structure=False means this is the outermost call,
2601 # and it should return a value. entity_inside_join_structure=<mapper>
2602 # indicates we've descended into a join and are looking at a FROM
2603 # clause representing this mapper; if this is not
2604 # entity_we_want_to_splice_onto then return None to end the recursive
2605 # branch
2606
2607 assert entity_we_want_to_splice_onto is path[-2]
2608
2609 if entity_inside_join_structure is False:
2610 assert isinstance(join_obj, orm_util._ORMJoin)
2611
2612 if isinstance(join_obj, sql.selectable.FromGrouping):
2613 # FromGrouping - continue descending into the structure
2614 return self._splice_nested_inner_join(
2615 path,
2616 entity_we_want_to_splice_onto,
2617 join_obj.element,
2618 clauses,
2619 onclause,
2620 extra_criteria,
2621 entity_inside_join_structure,
2622 )
2623 elif isinstance(join_obj, orm_util._ORMJoin):
2624 # _ORMJoin - continue descending into the structure
2625
2626 join_right_path = join_obj._right_memo
2627
2628 # see if right side of join is viable
2629 target_join = self._splice_nested_inner_join(
2630 path,
2631 entity_we_want_to_splice_onto,
2632 join_obj.right,
2633 clauses,
2634 onclause,
2635 extra_criteria,
2636 entity_inside_join_structure=(
2637 join_right_path[-1].mapper
2638 if join_right_path is not None
2639 else None
2640 ),
2641 )
2642
2643 if target_join is not None:
2644 # for a right splice, attempt to flatten out
2645 # a JOIN b JOIN c JOIN .. to avoid needless
2646 # parenthesis nesting
2647 if not join_obj.isouter and not target_join.isouter:
2648 eagerjoin = join_obj._splice_into_center(target_join)
2649 else:
2650 eagerjoin = orm_util._ORMJoin(
2651 join_obj.left,
2652 target_join,
2653 join_obj.onclause,
2654 isouter=join_obj.isouter,
2655 _left_memo=join_obj._left_memo,
2656 )
2657
2658 eagerjoin._target_adapter = target_join._target_adapter
2659 return eagerjoin
2660
2661 else:
2662 # see if left side of join is viable
2663 target_join = self._splice_nested_inner_join(
2664 path,
2665 entity_we_want_to_splice_onto,
2666 join_obj.left,
2667 clauses,
2668 onclause,
2669 extra_criteria,
2670 entity_inside_join_structure=join_obj._left_memo,
2671 detected_existing_path=join_right_path,
2672 )
2673
2674 if target_join is not None:
2675 eagerjoin = orm_util._ORMJoin(
2676 target_join,
2677 join_obj.right,
2678 join_obj.onclause,
2679 isouter=join_obj.isouter,
2680 _right_memo=join_obj._right_memo,
2681 )
2682 eagerjoin._target_adapter = target_join._target_adapter
2683 return eagerjoin
2684
2685 # neither side viable, return None, or fail if this was the top
2686 # most call
2687 if entity_inside_join_structure is False:
2688 assert (
2689 False
2690 ), "assertion failed attempting to produce joined eager loads"
2691 return None
2692
2693 # reached an endpoint (e.g. a table that's mapped, or an alias of that
2694 # table). determine if we can use this endpoint to splice onto
2695
2696 # is this the entity we want to splice onto in the first place?
2697 if not entity_we_want_to_splice_onto.isa(entity_inside_join_structure):
2698 return None
2699
2700 # path check. if we know the path how this join endpoint got here,
2701 # lets look at our path we are satisfying and see if we're in the
2702 # wrong place. This is specifically for when our entity may
2703 # appear more than once in the path, issue #11449
2704 # updated in issue #11965.
2705 if detected_existing_path and len(detected_existing_path) > 2:
2706 # this assertion is currently based on how this call is made,
2707 # where given a join_obj, the call will have these parameters as
2708 # entity_inside_join_structure=join_obj._left_memo
2709 # and entity_inside_join_structure=join_obj._right_memo.mapper
2710 assert detected_existing_path[-3] is entity_inside_join_structure
2711
2712 # from that, see if the path we are targeting matches the
2713 # "existing" path of this join all the way up to the midpoint
2714 # of this join object (e.g. the relationship).
2715 # if not, then this is not our target
2716 #
2717 # a test condition where this test is false looks like:
2718 #
2719 # desired splice: Node->kind->Kind
2720 # path of desired splice: NodeGroup->nodes->Node->kind
2721 # path we've located: NodeGroup->nodes->Node->common_node->Node
2722 #
2723 # above, because we want to splice kind->Kind onto
2724 # NodeGroup->nodes->Node, this is not our path because it actually
2725 # goes more steps than we want into self-referential
2726 # ->common_node->Node
2727 #
2728 # a test condition where this test is true looks like:
2729 #
2730 # desired splice: B->c2s->C2
2731 # path of desired splice: A->bs->B->c2s
2732 # path we've located: A->bs->B->c1s->C1
2733 #
2734 # above, we want to splice c2s->C2 onto B, and the located path
2735 # shows that the join ends with B->c1s->C1. so we will
2736 # add another join onto that, which would create a "branch" that
2737 # we might represent in a pseudopath as:
2738 #
2739 # B->c1s->C1
2740 # ->c2s->C2
2741 #
2742 # i.e. A JOIN B ON <bs> JOIN C1 ON <c1s>
2743 # JOIN C2 ON <c2s>
2744 #
2745
2746 if detected_existing_path[0:-2] != path.path[0:-1]:
2747 return None
2748
2749 return orm_util._ORMJoin(
2750 join_obj,
2751 clauses.aliased_insp,
2752 onclause,
2753 isouter=False,
2754 _left_memo=entity_inside_join_structure,
2755 _right_memo=path[path[-1].mapper],
2756 _extra_criteria=extra_criteria,
2757 )
2758
2759 def _create_eager_adapter(self, context, result, adapter, path, loadopt):
2760 compile_state = context.compile_state
2761
2762 user_defined_adapter = (
2763 self._init_user_defined_eager_proc(
2764 loadopt, compile_state, context.attributes
2765 )
2766 if loadopt
2767 else False
2768 )
2769
2770 if user_defined_adapter is not False:
2771 decorator = user_defined_adapter
2772 # user defined eagerloads are part of the "primary"
2773 # portion of the load.
2774 # the adapters applied to the Query should be honored.
2775 if compile_state.compound_eager_adapter and decorator:
2776 decorator = decorator.wrap(
2777 compile_state.compound_eager_adapter
2778 )
2779 elif compile_state.compound_eager_adapter:
2780 decorator = compile_state.compound_eager_adapter
2781 else:
2782 decorator = path.get(
2783 compile_state.attributes, "eager_row_processor"
2784 )
2785 if decorator is None:
2786 return False
2787
2788 if self.mapper._result_has_identity_key(result, decorator):
2789 return decorator
2790 else:
2791 # no identity key - don't return a row
2792 # processor, will cause a degrade to lazy
2793 return False
2794
2795 def create_row_processor(
2796 self,
2797 context,
2798 query_entity,
2799 path,
2800 loadopt,
2801 mapper,
2802 result,
2803 adapter,
2804 populators,
2805 ):
2806
2807 if not context.compile_state.compile_options._enable_eagerloads:
2808 return
2809
2810 if not self.parent.class_manager[self.key].impl.supports_population:
2811 raise sa_exc.InvalidRequestError(
2812 "'%s' does not support object "
2813 "population - eager loading cannot be applied." % self
2814 )
2815
2816 if self.uselist:
2817 context.loaders_require_uniquing = True
2818
2819 our_path = path[self.parent_property]
2820
2821 eager_adapter = self._create_eager_adapter(
2822 context, result, adapter, our_path, loadopt
2823 )
2824
2825 if eager_adapter is not False:
2826 key = self.key
2827
2828 _instance = loading._instance_processor(
2829 query_entity,
2830 self.mapper,
2831 context,
2832 result,
2833 our_path[self.entity],
2834 eager_adapter,
2835 )
2836
2837 if not self.uselist:
2838 self._create_scalar_loader(context, key, _instance, populators)
2839 else:
2840 self._create_collection_loader(
2841 context, key, _instance, populators
2842 )
2843 else:
2844 self.parent_property._get_strategy(
2845 (("lazy", "select"),)
2846 ).create_row_processor(
2847 context,
2848 query_entity,
2849 path,
2850 loadopt,
2851 mapper,
2852 result,
2853 adapter,
2854 populators,
2855 )
2856
2857 def _create_collection_loader(self, context, key, _instance, populators):
2858 def load_collection_from_joined_new_row(state, dict_, row):
2859 # note this must unconditionally clear out any existing collection.
2860 # an existing collection would be present only in the case of
2861 # populate_existing().
2862 collection = attributes.init_state_collection(state, dict_, key)
2863 result_list = util.UniqueAppender(
2864 collection, "append_without_event"
2865 )
2866 context.attributes[(state, key)] = result_list
2867 inst = _instance(row)
2868 if inst is not None:
2869 result_list.append(inst)
2870
2871 def load_collection_from_joined_existing_row(state, dict_, row):
2872 if (state, key) in context.attributes:
2873 result_list = context.attributes[(state, key)]
2874 else:
2875 # appender_key can be absent from context.attributes
2876 # with isnew=False when self-referential eager loading
2877 # is used; the same instance may be present in two
2878 # distinct sets of result columns
2879 collection = attributes.init_state_collection(
2880 state, dict_, key
2881 )
2882 result_list = util.UniqueAppender(
2883 collection, "append_without_event"
2884 )
2885 context.attributes[(state, key)] = result_list
2886 inst = _instance(row)
2887 if inst is not None:
2888 result_list.append(inst)
2889
2890 def load_collection_from_joined_exec(state, dict_, row):
2891 _instance(row)
2892
2893 populators["new"].append(
2894 (self.key, load_collection_from_joined_new_row)
2895 )
2896 populators["existing"].append(
2897 (self.key, load_collection_from_joined_existing_row)
2898 )
2899 if context.invoke_all_eagers:
2900 populators["eager"].append(
2901 (self.key, load_collection_from_joined_exec)
2902 )
2903
2904 def _create_scalar_loader(self, context, key, _instance, populators):
2905 def load_scalar_from_joined_new_row(state, dict_, row):
2906 # set a scalar object instance directly on the parent
2907 # object, bypassing InstrumentedAttribute event handlers.
2908 dict_[key] = _instance(row)
2909
2910 def load_scalar_from_joined_existing_row(state, dict_, row):
2911 # call _instance on the row, even though the object has
2912 # been created, so that we further descend into properties
2913 existing = _instance(row)
2914
2915 # conflicting value already loaded, this shouldn't happen
2916 if key in dict_:
2917 if existing is not dict_[key]:
2918 util.warn(
2919 "Multiple rows returned with "
2920 "uselist=False for eagerly-loaded attribute '%s' "
2921 % self
2922 )
2923 else:
2924 # this case is when one row has multiple loads of the
2925 # same entity (e.g. via aliasing), one has an attribute
2926 # that the other doesn't.
2927 dict_[key] = existing
2928
2929 def load_scalar_from_joined_exec(state, dict_, row):
2930 _instance(row)
2931
2932 populators["new"].append((self.key, load_scalar_from_joined_new_row))
2933 populators["existing"].append(
2934 (self.key, load_scalar_from_joined_existing_row)
2935 )
2936 if context.invoke_all_eagers:
2937 populators["eager"].append(
2938 (self.key, load_scalar_from_joined_exec)
2939 )
2940
2941
2942@log.class_logger
2943@relationships.RelationshipProperty.strategy_for(lazy="selectin")
2944class _SelectInLoader(_PostLoader, util.MemoizedSlots):
2945 __slots__ = (
2946 "join_depth",
2947 "omit_join",
2948 "_parent_alias",
2949 "_query_info",
2950 "_fallback_query_info",
2951 )
2952
2953 query_info = collections.namedtuple(
2954 "queryinfo",
2955 [
2956 "load_only_child",
2957 "load_with_join",
2958 "in_expr",
2959 "pk_cols",
2960 "zero_idx",
2961 "child_lookup_cols",
2962 ],
2963 )
2964
2965 _chunksize = 500
2966
2967 def __init__(self, parent, strategy_key):
2968 super().__init__(parent, strategy_key)
2969 self.join_depth = self.parent_property.join_depth
2970 is_m2o = self.parent_property.direction is interfaces.MANYTOONE
2971
2972 if self.parent_property.omit_join is not None:
2973 self.omit_join = self.parent_property.omit_join
2974 else:
2975 lazyloader = self.parent_property._get_strategy(
2976 (("lazy", "select"),)
2977 )
2978 if is_m2o:
2979 self.omit_join = lazyloader.use_get
2980 else:
2981 self.omit_join = self.parent._get_clause[0].compare(
2982 lazyloader._rev_lazywhere,
2983 use_proxies=True,
2984 compare_keys=False,
2985 equivalents=self.parent._equivalent_columns,
2986 )
2987
2988 if self.omit_join:
2989 if is_m2o:
2990 self._query_info = self._init_for_omit_join_m2o()
2991 self._fallback_query_info = self._init_for_join()
2992 else:
2993 self._query_info = self._init_for_omit_join()
2994 else:
2995 self._query_info = self._init_for_join()
2996
2997 def _init_for_omit_join(self):
2998 pk_to_fk = dict(
2999 self.parent_property._join_condition.local_remote_pairs
3000 )
3001 pk_to_fk.update(
3002 (equiv, pk_to_fk[k])
3003 for k in list(pk_to_fk)
3004 for equiv in self.parent._equivalent_columns.get(k, ())
3005 )
3006
3007 pk_cols = fk_cols = [
3008 pk_to_fk[col] for col in self.parent.primary_key if col in pk_to_fk
3009 ]
3010 if len(fk_cols) > 1:
3011 in_expr = sql.tuple_(*fk_cols)
3012 zero_idx = False
3013 else:
3014 in_expr = fk_cols[0]
3015 zero_idx = True
3016
3017 return self.query_info(False, False, in_expr, pk_cols, zero_idx, None)
3018
3019 def _init_for_omit_join_m2o(self):
3020 pk_cols = self.mapper.primary_key
3021 if len(pk_cols) > 1:
3022 in_expr = sql.tuple_(*pk_cols)
3023 zero_idx = False
3024 else:
3025 in_expr = pk_cols[0]
3026 zero_idx = True
3027
3028 lazyloader = self.parent_property._get_strategy((("lazy", "select"),))
3029 lookup_cols = [lazyloader._equated_columns[pk] for pk in pk_cols]
3030
3031 return self.query_info(
3032 True, False, in_expr, pk_cols, zero_idx, lookup_cols
3033 )
3034
3035 def _init_for_join(self):
3036 self._parent_alias = AliasedClass(self.parent.class_)
3037 pa_insp = inspect(self._parent_alias)
3038 pk_cols = [
3039 pa_insp._adapt_element(col) for col in self.parent.primary_key
3040 ]
3041 if len(pk_cols) > 1:
3042 in_expr = sql.tuple_(*pk_cols)
3043 zero_idx = False
3044 else:
3045 in_expr = pk_cols[0]
3046 zero_idx = True
3047 return self.query_info(False, True, in_expr, pk_cols, zero_idx, None)
3048
3049 def init_class_attribute(self, mapper):
3050 self.parent_property._get_strategy(
3051 (("lazy", "select"),)
3052 ).init_class_attribute(mapper)
3053
3054 def create_row_processor(
3055 self,
3056 context,
3057 query_entity,
3058 path,
3059 loadopt,
3060 mapper,
3061 result,
3062 adapter,
3063 populators,
3064 ):
3065 if context.refresh_state:
3066 return self._immediateload_create_row_processor(
3067 context,
3068 query_entity,
3069 path,
3070 loadopt,
3071 mapper,
3072 result,
3073 adapter,
3074 populators,
3075 )
3076
3077 (
3078 effective_path,
3079 run_loader,
3080 execution_options,
3081 recursion_depth,
3082 ) = self._setup_for_recursion(
3083 context, path, loadopt, join_depth=self.join_depth
3084 )
3085
3086 if not run_loader:
3087 return
3088
3089 if not context.compile_state.compile_options._enable_eagerloads:
3090 return
3091
3092 if not self.parent.class_manager[self.key].impl.supports_population:
3093 raise sa_exc.InvalidRequestError(
3094 "'%s' does not support object "
3095 "population - eager loading cannot be applied." % self
3096 )
3097
3098 # a little dance here as the "path" is still something that only
3099 # semi-tracks the exact series of things we are loading, still not
3100 # telling us about with_polymorphic() and stuff like that when it's at
3101 # the root.. the initial MapperEntity is more accurate for this case.
3102 if len(path) == 1:
3103 if not orm_util._entity_isa(query_entity.entity_zero, self.parent):
3104 return
3105 elif not orm_util._entity_isa(path[-1], self.parent):
3106 return
3107
3108 selectin_path = effective_path
3109
3110 path_w_prop = path[self.parent_property]
3111
3112 # build up a path indicating the path from the leftmost
3113 # entity to the thing we're subquery loading.
3114 with_poly_entity = path_w_prop.get(
3115 context.attributes, "path_with_polymorphic", None
3116 )
3117 if with_poly_entity is not None:
3118 effective_entity = inspect(with_poly_entity)
3119 else:
3120 effective_entity = self.entity
3121
3122 loading._PostLoad.callable_for_path(
3123 context,
3124 selectin_path,
3125 self.parent,
3126 self.parent_property,
3127 self._load_for_path,
3128 effective_entity,
3129 loadopt,
3130 recursion_depth,
3131 execution_options,
3132 )
3133
3134 def _load_for_path(
3135 self,
3136 context,
3137 path,
3138 states,
3139 load_only,
3140 effective_entity,
3141 loadopt,
3142 recursion_depth,
3143 execution_options,
3144 ):
3145 if load_only and self.key not in load_only:
3146 return
3147
3148 query_info = self._query_info
3149
3150 if query_info.load_only_child:
3151 our_states = collections.defaultdict(list)
3152 none_states = []
3153
3154 mapper = self.parent
3155
3156 for state, overwrite in states:
3157 state_dict = state.dict
3158 related_ident = tuple(
3159 mapper._get_state_attr_by_column(
3160 state,
3161 state_dict,
3162 lk,
3163 passive=attributes.PASSIVE_NO_FETCH,
3164 )
3165 for lk in query_info.child_lookup_cols
3166 )
3167 # if the loaded parent objects do not have the foreign key
3168 # to the related item loaded, then degrade into the joined
3169 # version of selectinload
3170 if LoaderCallableStatus.PASSIVE_NO_RESULT in related_ident:
3171 query_info = self._fallback_query_info
3172 break
3173
3174 # organize states into lists keyed to particular foreign
3175 # key values.
3176 if None not in related_ident:
3177 our_states[related_ident].append(
3178 (state, state_dict, overwrite)
3179 )
3180 else:
3181 # For FK values that have None, add them to a
3182 # separate collection that will be populated separately
3183 none_states.append((state, state_dict, overwrite))
3184
3185 # note the above conditional may have changed query_info
3186 if not query_info.load_only_child:
3187 our_states = [
3188 (state.key[1], state, state.dict, overwrite)
3189 for state, overwrite in states
3190 ]
3191
3192 pk_cols = query_info.pk_cols
3193 in_expr = query_info.in_expr
3194
3195 if not query_info.load_with_join:
3196 # in "omit join" mode, the primary key column and the
3197 # "in" expression are in terms of the related entity. So
3198 # if the related entity is polymorphic or otherwise aliased,
3199 # we need to adapt our "pk_cols" and "in_expr" to that
3200 # entity. in non-"omit join" mode, these are against the
3201 # parent entity and do not need adaption.
3202 if effective_entity.is_aliased_class:
3203 pk_cols = [
3204 effective_entity._adapt_element(col) for col in pk_cols
3205 ]
3206 in_expr = effective_entity._adapt_element(in_expr)
3207
3208 bundle_ent = orm_util.Bundle("pk", *pk_cols)
3209 bundle_sql = bundle_ent.__clause_element__()
3210
3211 entity_sql = effective_entity.__clause_element__()
3212 q = Select._create_raw_select(
3213 _raw_columns=[bundle_sql, entity_sql],
3214 _compile_options=_ORMCompileState.default_compile_options,
3215 _propagate_attrs={
3216 "compile_state_plugin": "orm",
3217 "plugin_subject": effective_entity,
3218 },
3219 )
3220
3221 if not query_info.load_with_join:
3222 # the Bundle we have in the "omit_join" case is against raw, non
3223 # annotated columns, so to ensure the Query knows its primary
3224 # entity, we add it explicitly. If we made the Bundle against
3225 # annotated columns, we hit a performance issue in this specific
3226 # case, which is detailed in issue #4347.
3227 q = q.select_from(effective_entity)
3228 else:
3229 # in the non-omit_join case, the Bundle is against the annotated/
3230 # mapped column of the parent entity, but the #4347 issue does not
3231 # occur in this case.
3232 q = q.select_from(self._parent_alias).join(
3233 getattr(self._parent_alias, self.parent_property.key).of_type(
3234 effective_entity
3235 )
3236 )
3237
3238 q = q.filter(in_expr.in_(sql.bindparam("primary_keys")))
3239
3240 # a test which exercises what these comments talk about is
3241 # test_selectin_relations.py -> test_twolevel_selectin_w_polymorphic
3242 #
3243 # effective_entity above is given to us in terms of the cached
3244 # statement, namely this one:
3245 orig_query = context.compile_state.select_statement
3246
3247 # the actual statement that was requested is this one:
3248 # context_query = context.user_passed_query
3249 #
3250 # that's not the cached one, however. So while it is of the identical
3251 # structure, if it has entities like AliasedInsp, which we get from
3252 # aliased() or with_polymorphic(), the AliasedInsp will likely be a
3253 # different object identity each time, and will not match up
3254 # hashing-wise to the corresponding AliasedInsp that's in the
3255 # cached query, meaning it won't match on paths and loader lookups
3256 # and loaders like this one will be skipped if it is used in options.
3257 #
3258 # as it turns out, standard loader options like selectinload(),
3259 # lazyload() that have a path need
3260 # to come from the cached query so that the AliasedInsp etc. objects
3261 # that are in the query line up with the object that's in the path
3262 # of the strategy object. however other options like
3263 # with_loader_criteria() that doesn't have a path (has a fixed entity)
3264 # and needs to have access to the latest closure state in order to
3265 # be correct, we need to use the uncached one.
3266 #
3267 # as of #8399 we let the loader option itself figure out what it
3268 # wants to do given cached and uncached version of itself.
3269
3270 effective_path = path[self.parent_property]
3271
3272 if orig_query is context.user_passed_query:
3273 new_options = orig_query._with_options
3274 else:
3275 cached_options = orig_query._with_options
3276 uncached_options = context.user_passed_query._with_options
3277
3278 # propagate compile state options from the original query,
3279 # updating their "extra_criteria" as necessary.
3280 # note this will create a different cache key than
3281 # "orig" options if extra_criteria is present, because the copy
3282 # of extra_criteria will have different boundparam than that of
3283 # the QueryableAttribute in the path
3284 new_options = [
3285 orig_opt._adapt_cached_option_to_uncached_option(
3286 context, uncached_opt
3287 )
3288 for orig_opt, uncached_opt in zip(
3289 cached_options, uncached_options
3290 )
3291 ]
3292
3293 if loadopt and loadopt._extra_criteria:
3294 new_options += (
3295 orm_util.LoaderCriteriaOption(
3296 effective_entity,
3297 loadopt._generate_extra_criteria(context),
3298 ),
3299 )
3300
3301 if recursion_depth is not None:
3302 effective_path = effective_path._truncate_recursive()
3303
3304 q = q.options(*new_options)
3305
3306 q = q._update_compile_options({"_current_path": effective_path})
3307 if context.populate_existing:
3308 q = q.execution_options(populate_existing=True)
3309
3310 if self.parent_property.order_by:
3311 if not query_info.load_with_join:
3312 eager_order_by = self.parent_property.order_by
3313 if effective_entity.is_aliased_class:
3314 eager_order_by = [
3315 effective_entity._adapt_element(elem)
3316 for elem in eager_order_by
3317 ]
3318 q = q.order_by(*eager_order_by)
3319 else:
3320
3321 def _setup_outermost_orderby(compile_context):
3322 compile_context.eager_order_by += tuple(
3323 util.to_list(self.parent_property.order_by)
3324 )
3325
3326 q = q._add_compile_state_func(
3327 _setup_outermost_orderby, self.parent_property
3328 )
3329
3330 if query_info.load_only_child:
3331 self._load_via_child(
3332 our_states,
3333 none_states,
3334 query_info,
3335 q,
3336 context,
3337 execution_options,
3338 )
3339 else:
3340 self._load_via_parent(
3341 our_states, query_info, q, context, execution_options
3342 )
3343
3344 def _load_via_child(
3345 self,
3346 our_states,
3347 none_states,
3348 query_info,
3349 q,
3350 context,
3351 execution_options,
3352 ):
3353 uselist = self.uselist
3354
3355 # this sort is really for the benefit of the unit tests
3356 our_keys = sorted(our_states)
3357 while our_keys:
3358 chunk = our_keys[0 : self._chunksize]
3359 our_keys = our_keys[self._chunksize :]
3360 data = {
3361 k: v
3362 for k, v in context.session.execute(
3363 q,
3364 params={
3365 "primary_keys": [
3366 key[0] if query_info.zero_idx else key
3367 for key in chunk
3368 ]
3369 },
3370 execution_options=execution_options,
3371 ).unique()
3372 }
3373
3374 for key in chunk:
3375 # for a real foreign key and no concurrent changes to the
3376 # DB while running this method, "key" is always present in
3377 # data. However, for primaryjoins without real foreign keys
3378 # a non-None primaryjoin condition may still refer to no
3379 # related object.
3380 related_obj = data.get(key, None)
3381 for state, dict_, overwrite in our_states[key]:
3382 if not overwrite and self.key in dict_:
3383 continue
3384
3385 state.get_impl(self.key).set_committed_value(
3386 state,
3387 dict_,
3388 related_obj if not uselist else [related_obj],
3389 )
3390 # populate none states with empty value / collection
3391 for state, dict_, overwrite in none_states:
3392 if not overwrite and self.key in dict_:
3393 continue
3394
3395 # note it's OK if this is a uselist=True attribute, the empty
3396 # collection will be populated
3397 state.get_impl(self.key).set_committed_value(state, dict_, None)
3398
3399 def _load_via_parent(
3400 self, our_states, query_info, q, context, execution_options
3401 ):
3402 uselist = self.uselist
3403 _empty_result = () if uselist else None
3404
3405 while our_states:
3406 chunk = our_states[0 : self._chunksize]
3407 our_states = our_states[self._chunksize :]
3408
3409 primary_keys = [
3410 key[0] if query_info.zero_idx else key
3411 for key, state, state_dict, overwrite in chunk
3412 ]
3413
3414 data = collections.defaultdict(list)
3415 for k, v in itertools.groupby(
3416 context.session.execute(
3417 q,
3418 params={"primary_keys": primary_keys},
3419 execution_options=execution_options,
3420 ).unique(),
3421 lambda x: x[0],
3422 ):
3423 data[k].extend(vv[1] for vv in v)
3424
3425 for key, state, state_dict, overwrite in chunk:
3426 if not overwrite and self.key in state_dict:
3427 continue
3428
3429 collection = data.get(key, _empty_result)
3430
3431 if not uselist and collection:
3432 if len(collection) > 1:
3433 util.warn(
3434 "Multiple rows returned with "
3435 "uselist=False for eagerly-loaded "
3436 "attribute '%s' " % self
3437 )
3438 state.get_impl(self.key).set_committed_value(
3439 state, state_dict, collection[0]
3440 )
3441 else:
3442 # note that empty tuple set on uselist=False sets the
3443 # value to None
3444 state.get_impl(self.key).set_committed_value(
3445 state, state_dict, collection
3446 )
3447
3448
3449def _single_parent_validator(desc, prop):
3450 def _do_check(state, value, oldvalue, initiator):
3451 if value is not None and initiator.key == prop.key:
3452 hasparent = initiator.hasparent(attributes.instance_state(value))
3453 if hasparent and oldvalue is not value:
3454 raise sa_exc.InvalidRequestError(
3455 "Instance %s is already associated with an instance "
3456 "of %s via its %s attribute, and is only allowed a "
3457 "single parent."
3458 % (orm_util.instance_str(value), state.class_, prop),
3459 code="bbf1",
3460 )
3461 return value
3462
3463 def append(state, value, initiator):
3464 return _do_check(state, value, None, initiator)
3465
3466 def set_(state, value, oldvalue, initiator):
3467 return _do_check(state, value, oldvalue, initiator)
3468
3469 event.listen(
3470 desc, "append", append, raw=True, retval=True, active_history=True
3471 )
3472 event.listen(desc, "set", set_, raw=True, retval=True, active_history=True)