1# orm/strategies.py
2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: ignore-errors
8
9
10"""sqlalchemy.orm.interfaces.LoaderStrategy
11implementations, and related MapperOptions."""
12
13from __future__ import annotations
14
15import collections
16from typing import Any
17from typing import Dict
18from typing import Literal
19from typing import Optional
20from typing import Tuple
21from typing import TYPE_CHECKING
22from typing import Union
23
24from . import attributes
25from . import exc as orm_exc
26from . import interfaces
27from . import loading
28from . import path_registry
29from . import properties
30from . import query
31from . import relationships
32from . import unitofwork
33from . import util as orm_util
34from .base import _DEFER_FOR_STATE
35from .base import _RAISE_FOR_STATE
36from .base import _SET_DEFERRED_EXPIRED
37from .base import ATTR_WAS_SET
38from .base import LoaderCallableStatus
39from .base import PASSIVE_OFF
40from .base import PassiveFlag
41from .context import _column_descriptions
42from .context import _ORMCompileState
43from .context import _ORMSelectCompileState
44from .context import QueryContext
45from .interfaces import LoaderStrategy
46from .interfaces import StrategizedProperty
47from .session import _state_session
48from .state import InstanceState
49from .strategy_options import Load
50from .util import _none_only_set
51from .util import AliasedClass
52from .. import event
53from .. import exc as sa_exc
54from .. import inspect
55from .. import log
56from .. import sql
57from .. import util
58from ..sql import util as sql_util
59from ..sql import visitors
60from ..sql.selectable import LABEL_STYLE_TABLENAME_PLUS_COL
61from ..sql.selectable import Select
62
63if TYPE_CHECKING:
64 from .mapper import Mapper
65 from .relationships import RelationshipProperty
66 from ..sql.elements import ColumnElement
67
68
69def _register_attribute(
70 prop,
71 mapper,
72 useobject,
73 compare_function=None,
74 typecallable=None,
75 callable_=None,
76 proxy_property=None,
77 active_history=False,
78 impl_class=None,
79 default_scalar_value=None,
80 **kw,
81):
82 listen_hooks = []
83
84 uselist = useobject and prop.uselist
85
86 if useobject and prop.single_parent:
87 listen_hooks.append(_single_parent_validator)
88
89 if prop.key in prop.parent.validators:
90 fn, opts = prop.parent.validators[prop.key]
91 listen_hooks.append(
92 lambda desc, prop: orm_util._validator_events(
93 desc, prop.key, fn, **opts
94 )
95 )
96
97 if useobject:
98 listen_hooks.append(unitofwork._track_cascade_events)
99
100 # need to assemble backref listeners
101 # after the singleparentvalidator, mapper validator
102 if useobject:
103 backref = prop.back_populates
104 if backref and prop._effective_sync_backref:
105 listen_hooks.append(
106 lambda desc, prop: attributes._backref_listeners(
107 desc, backref, uselist
108 )
109 )
110
111 # a single MapperProperty is shared down a class inheritance
112 # hierarchy, so we set up attribute instrumentation and backref event
113 # for each mapper down the hierarchy.
114
115 # typically, "mapper" is the same as prop.parent, due to the way
116 # the configure_mappers() process runs, however this is not strongly
117 # enforced, and in the case of a second configure_mappers() run the
118 # mapper here might not be prop.parent; also, a subclass mapper may
119 # be called here before a superclass mapper. That is, can't depend
120 # on mappers not already being set up so we have to check each one.
121
122 for m in mapper.self_and_descendants:
123 if prop is m._props.get(
124 prop.key
125 ) and not m.class_manager._attr_has_impl(prop.key):
126 desc = attributes._register_attribute_impl(
127 m.class_,
128 prop.key,
129 parent_token=prop,
130 uselist=uselist,
131 compare_function=compare_function,
132 useobject=useobject,
133 trackparent=useobject
134 and (
135 prop.single_parent
136 or prop.direction is interfaces.ONETOMANY
137 ),
138 typecallable=typecallable,
139 callable_=callable_,
140 active_history=active_history,
141 default_scalar_value=default_scalar_value,
142 impl_class=impl_class,
143 send_modified_events=not useobject or not prop.viewonly,
144 doc=prop.doc,
145 **kw,
146 )
147
148 for hook in listen_hooks:
149 hook(desc, prop)
150
151
152@properties.ColumnProperty.strategy_for(instrument=False, deferred=False)
153class _UninstrumentedColumnLoader(LoaderStrategy):
154 """Represent a non-instrumented MapperProperty.
155
156 The polymorphic_on argument of mapper() often results in this,
157 if the argument is against the with_polymorphic selectable.
158
159 """
160
161 __slots__ = ("columns",)
162
163 def __init__(self, parent, strategy_key):
164 super().__init__(parent, strategy_key)
165 self.columns = self.parent_property.columns
166
167 def setup_query(
168 self,
169 compile_state,
170 query_entity,
171 path,
172 loadopt,
173 adapter,
174 column_collection=None,
175 **kwargs,
176 ):
177 for c in self.columns:
178 if adapter:
179 c = adapter.columns[c]
180 compile_state._append_dedupe_col_collection(c, column_collection)
181
182 def create_row_processor(
183 self,
184 context,
185 query_entity,
186 path,
187 loadopt,
188 mapper,
189 result,
190 adapter,
191 populators,
192 ):
193 pass
194
195
196@log.class_logger
197@properties.ColumnProperty.strategy_for(instrument=True, deferred=False)
198class _ColumnLoader(LoaderStrategy):
199 """Provide loading behavior for a :class:`.ColumnProperty`."""
200
201 __slots__ = "columns", "is_composite"
202
203 def __init__(self, parent, strategy_key):
204 super().__init__(parent, strategy_key)
205 self.columns = self.parent_property.columns
206 self.is_composite = hasattr(self.parent_property, "composite_class")
207
208 def setup_query(
209 self,
210 compile_state,
211 query_entity,
212 path,
213 loadopt,
214 adapter,
215 column_collection,
216 memoized_populators,
217 check_for_adapt=False,
218 **kwargs,
219 ):
220 for c in self.columns:
221 if adapter:
222 if check_for_adapt:
223 c = adapter.adapt_check_present(c)
224 if c is None:
225 return
226 else:
227 c = adapter.columns[c]
228
229 compile_state._append_dedupe_col_collection(c, column_collection)
230
231 fetch = self.columns[0]
232 if adapter:
233 fetch = adapter.columns[fetch]
234 if fetch is None:
235 # None happens here only for dml bulk_persistence cases
236 # when context.DMLReturningColFilter is used
237 return
238
239 memoized_populators[self.parent_property] = fetch
240
241 def init_class_attribute(self, mapper):
242 self.is_class_level = True
243 coltype = self.columns[0].type
244 # TODO: check all columns ? check for foreign key as well?
245 active_history = (
246 self.parent_property.active_history
247 or self.columns[0].primary_key
248 or (
249 mapper.version_id_col is not None
250 and mapper._columntoproperty.get(mapper.version_id_col, None)
251 is self.parent_property
252 )
253 )
254
255 _register_attribute(
256 self.parent_property,
257 mapper,
258 useobject=False,
259 compare_function=coltype.compare_values,
260 active_history=active_history,
261 default_scalar_value=self.parent_property._default_scalar_value,
262 )
263
264 def create_row_processor(
265 self,
266 context,
267 query_entity,
268 path,
269 loadopt,
270 mapper,
271 result,
272 adapter,
273 populators,
274 ):
275 # look through list of columns represented here
276 # to see which, if any, is present in the row.
277
278 for col in self.columns:
279 if adapter:
280 col = adapter.columns[col]
281 getter = result._getter(col, False)
282 if getter:
283 populators["quick"].append((self.key, getter))
284 break
285 else:
286 populators["expire"].append((self.key, True))
287
288
289@log.class_logger
290@properties.ColumnProperty.strategy_for(query_expression=True)
291class _ExpressionColumnLoader(_ColumnLoader):
292 def __init__(self, parent, strategy_key):
293 super().__init__(parent, strategy_key)
294
295 # compare to the "default" expression that is mapped in
296 # the column. If it's sql.null, we don't need to render
297 # unless an expr is passed in the options.
298 null = sql.null().label(None)
299 self._have_default_expression = any(
300 not c.compare(null) for c in self.parent_property.columns
301 )
302
303 def setup_query(
304 self,
305 compile_state,
306 query_entity,
307 path,
308 loadopt,
309 adapter,
310 column_collection,
311 memoized_populators,
312 **kwargs,
313 ):
314 columns = None
315 if loadopt and loadopt._extra_criteria:
316 columns = loadopt._extra_criteria
317
318 elif self._have_default_expression:
319 columns = self.parent_property.columns
320
321 if columns is None:
322 return
323
324 for c in columns:
325 if adapter:
326 c = adapter.columns[c]
327 compile_state._append_dedupe_col_collection(c, column_collection)
328
329 fetch = columns[0]
330 if adapter:
331 fetch = adapter.columns[fetch]
332 if fetch is None:
333 # None is not expected to be the result of any
334 # adapter implementation here, however there may be theoretical
335 # usages of returning() with context.DMLReturningColFilter
336 return
337
338 memoized_populators[self.parent_property] = fetch
339
340 # if the column being loaded is the polymorphic discriminator,
341 # and we have a with_expression() providing the actual column,
342 # update the query_entity to use the actual column instead of
343 # the default expression
344 if (
345 query_entity._polymorphic_discriminator is self.columns[0]
346 and loadopt
347 and loadopt._extra_criteria
348 ):
349 query_entity._polymorphic_discriminator = columns[0]
350
351 def create_row_processor(
352 self,
353 context,
354 query_entity,
355 path,
356 loadopt,
357 mapper,
358 result,
359 adapter,
360 populators,
361 ):
362 # look through list of columns represented here
363 # to see which, if any, is present in the row.
364 if loadopt and loadopt._extra_criteria:
365 columns = loadopt._extra_criteria
366
367 for col in columns:
368 if adapter:
369 col = adapter.columns[col]
370 getter = result._getter(col, False)
371 if getter:
372 populators["quick"].append((self.key, getter))
373 break
374 else:
375 populators["expire"].append((self.key, True))
376
377 def init_class_attribute(self, mapper):
378 self.is_class_level = True
379
380 _register_attribute(
381 self.parent_property,
382 mapper,
383 useobject=False,
384 compare_function=self.columns[0].type.compare_values,
385 accepts_scalar_loader=False,
386 default_scalar_value=self.parent_property._default_scalar_value,
387 )
388
389
390@log.class_logger
391@properties.ColumnProperty.strategy_for(deferred=True, instrument=True)
392@properties.ColumnProperty.strategy_for(
393 deferred=True, instrument=True, raiseload=True
394)
395@properties.ColumnProperty.strategy_for(do_nothing=True)
396class _DeferredColumnLoader(LoaderStrategy):
397 """Provide loading behavior for a deferred :class:`.ColumnProperty`."""
398
399 __slots__ = "columns", "group", "raiseload"
400
401 def __init__(self, parent, strategy_key):
402 super().__init__(parent, strategy_key)
403 if hasattr(self.parent_property, "composite_class"):
404 raise NotImplementedError(
405 "Deferred loading for composite types not implemented yet"
406 )
407 self.raiseload = self.strategy_opts.get("raiseload", False)
408 self.columns = self.parent_property.columns
409 self.group = self.parent_property.group
410
411 def create_row_processor(
412 self,
413 context,
414 query_entity,
415 path,
416 loadopt,
417 mapper,
418 result,
419 adapter,
420 populators,
421 ):
422 # for a DeferredColumnLoader, this method is only used during a
423 # "row processor only" query; see test_deferred.py ->
424 # tests with "rowproc_only" in their name. As of the 1.0 series,
425 # loading._instance_processor doesn't use a "row processing" function
426 # to populate columns, instead it uses data in the "populators"
427 # dictionary. Normally, the DeferredColumnLoader.setup_query()
428 # sets up that data in the "memoized_populators" dictionary
429 # and "create_row_processor()" here is never invoked.
430
431 if (
432 context.refresh_state
433 and context.query._compile_options._only_load_props
434 and self.key in context.query._compile_options._only_load_props
435 ):
436 self.parent_property._get_strategy(
437 (("deferred", False), ("instrument", True))
438 ).create_row_processor(
439 context,
440 query_entity,
441 path,
442 loadopt,
443 mapper,
444 result,
445 adapter,
446 populators,
447 )
448
449 elif not self.is_class_level:
450 if self.raiseload:
451 set_deferred_for_local_state = (
452 self.parent_property._raise_column_loader
453 )
454 else:
455 set_deferred_for_local_state = (
456 self.parent_property._deferred_column_loader
457 )
458 populators["new"].append((self.key, set_deferred_for_local_state))
459 else:
460 populators["expire"].append((self.key, False))
461
462 def init_class_attribute(self, mapper):
463 self.is_class_level = True
464
465 _register_attribute(
466 self.parent_property,
467 mapper,
468 useobject=False,
469 compare_function=self.columns[0].type.compare_values,
470 callable_=self._load_for_state,
471 load_on_unexpire=False,
472 default_scalar_value=self.parent_property._default_scalar_value,
473 )
474
475 def setup_query(
476 self,
477 compile_state,
478 query_entity,
479 path,
480 loadopt,
481 adapter,
482 column_collection,
483 memoized_populators,
484 only_load_props=None,
485 **kw,
486 ):
487 if (
488 (
489 compile_state.compile_options._render_for_subquery
490 and self.parent_property._renders_in_subqueries
491 )
492 or (
493 loadopt
494 and set(self.columns).intersection(
495 self.parent._should_undefer_in_wildcard
496 )
497 )
498 or (
499 loadopt
500 and self.group
501 and loadopt.local_opts.get(
502 "undefer_group_%s" % self.group, False
503 )
504 )
505 or (only_load_props and self.key in only_load_props)
506 ):
507 self.parent_property._get_strategy(
508 (("deferred", False), ("instrument", True))
509 ).setup_query(
510 compile_state,
511 query_entity,
512 path,
513 loadopt,
514 adapter,
515 column_collection,
516 memoized_populators,
517 **kw,
518 )
519 elif self.is_class_level:
520 memoized_populators[self.parent_property] = _SET_DEFERRED_EXPIRED
521 elif not self.raiseload:
522 memoized_populators[self.parent_property] = _DEFER_FOR_STATE
523 else:
524 memoized_populators[self.parent_property] = _RAISE_FOR_STATE
525
526 def _load_for_state(self, state, passive):
527 if not state.key:
528 return LoaderCallableStatus.ATTR_EMPTY
529
530 if not passive & PassiveFlag.SQL_OK:
531 return LoaderCallableStatus.PASSIVE_NO_RESULT
532
533 localparent = state.manager.mapper
534
535 if self.group:
536 toload = [
537 p.key
538 for p in localparent.iterate_properties
539 if isinstance(p, StrategizedProperty)
540 and isinstance(p.strategy, _DeferredColumnLoader)
541 and p.group == self.group
542 ]
543 else:
544 toload = [self.key]
545
546 # narrow the keys down to just those which have no history
547 group = [k for k in toload if k in state.unmodified]
548
549 session = _state_session(state)
550 if session is None:
551 raise orm_exc.DetachedInstanceError(
552 "Parent instance %s is not bound to a Session; "
553 "deferred load operation of attribute '%s' cannot proceed"
554 % (orm_util.state_str(state), self.key)
555 )
556
557 if self.raiseload:
558 self._invoke_raise_load(state, passive, "raise")
559
560 loading._load_scalar_attributes(
561 state.mapper, state, set(group), PASSIVE_OFF
562 )
563
564 return LoaderCallableStatus.ATTR_WAS_SET
565
566 def _invoke_raise_load(self, state, passive, lazy):
567 raise sa_exc.InvalidRequestError(
568 "'%s' is not available due to raiseload=True" % (self,)
569 )
570
571
572class _LoadDeferredColumns:
573 """serializable loader object used by DeferredColumnLoader"""
574
575 def __init__(self, key: str, raiseload: bool = False):
576 self.key = key
577 self.raiseload = raiseload
578
579 def __call__(self, state, passive=attributes.PASSIVE_OFF):
580 key = self.key
581
582 localparent = state.manager.mapper
583 prop = localparent._props[key]
584 if self.raiseload:
585 strategy_key = (
586 ("deferred", True),
587 ("instrument", True),
588 ("raiseload", True),
589 )
590 else:
591 strategy_key = (("deferred", True), ("instrument", True))
592 strategy = prop._get_strategy(strategy_key)
593 return strategy._load_for_state(state, passive)
594
595
596class _AbstractRelationshipLoader(LoaderStrategy):
597 """LoaderStratgies which deal with related objects."""
598
599 __slots__ = "mapper", "target", "uselist", "entity"
600
601 def __init__(self, parent, strategy_key):
602 super().__init__(parent, strategy_key)
603 self.mapper = self.parent_property.mapper
604 self.entity = self.parent_property.entity
605 self.target = self.parent_property.target
606 self.uselist = self.parent_property.uselist
607
608 def _immediateload_create_row_processor(
609 self,
610 context,
611 query_entity,
612 path,
613 loadopt,
614 mapper,
615 result,
616 adapter,
617 populators,
618 ):
619 return self.parent_property._get_strategy(
620 (("lazy", "immediate"),)
621 ).create_row_processor(
622 context,
623 query_entity,
624 path,
625 loadopt,
626 mapper,
627 result,
628 adapter,
629 populators,
630 )
631
632
633@log.class_logger
634@relationships.RelationshipProperty.strategy_for(do_nothing=True)
635class _DoNothingLoader(LoaderStrategy):
636 """Relationship loader that makes no change to the object's state.
637
638 Compared to NoLoader, this loader does not initialize the
639 collection/attribute to empty/none; the usual default LazyLoader will
640 take effect.
641
642 """
643
644
645@log.class_logger
646@relationships.RelationshipProperty.strategy_for(lazy="noload")
647@relationships.RelationshipProperty.strategy_for(lazy=None)
648class _NoLoader(_AbstractRelationshipLoader):
649 """Provide loading behavior for a :class:`.Relationship`
650 with "lazy=None".
651
652 """
653
654 __slots__ = ()
655
656 @util.deprecated(
657 "2.1",
658 "The ``noload`` loader strategy is deprecated and will be removed "
659 "in a future release. This option "
660 "produces incorrect results by returning ``None`` for related "
661 "items.",
662 )
663 def init_class_attribute(self, mapper):
664 self.is_class_level = True
665
666 _register_attribute(
667 self.parent_property,
668 mapper,
669 useobject=True,
670 typecallable=self.parent_property.collection_class,
671 )
672
673 def create_row_processor(
674 self,
675 context,
676 query_entity,
677 path,
678 loadopt,
679 mapper,
680 result,
681 adapter,
682 populators,
683 ):
684 def invoke_no_load(state, dict_, row):
685 if self.uselist:
686 attributes.init_state_collection(state, dict_, self.key)
687 else:
688 dict_[self.key] = None
689
690 populators["new"].append((self.key, invoke_no_load))
691
692
693@log.class_logger
694@relationships.RelationshipProperty.strategy_for(lazy=True)
695@relationships.RelationshipProperty.strategy_for(lazy="select")
696@relationships.RelationshipProperty.strategy_for(lazy="raise")
697@relationships.RelationshipProperty.strategy_for(lazy="raise_on_sql")
698@relationships.RelationshipProperty.strategy_for(lazy="baked_select")
699class _LazyLoader(
700 _AbstractRelationshipLoader, util.MemoizedSlots, log.Identified
701):
702 """Provide loading behavior for a :class:`.Relationship`
703 with "lazy=True", that is loads when first accessed.
704
705 """
706
707 __slots__ = (
708 "_lazywhere",
709 "_rev_lazywhere",
710 "_lazyload_reverse_option",
711 "_order_by",
712 "use_get",
713 "is_aliased_class",
714 "_bind_to_col",
715 "_equated_columns",
716 "_rev_bind_to_col",
717 "_rev_equated_columns",
718 "_simple_lazy_clause",
719 "_raise_always",
720 "_raise_on_sql",
721 )
722
723 _lazywhere: ColumnElement[bool]
724 _bind_to_col: Dict[str, ColumnElement[Any]]
725 _rev_lazywhere: ColumnElement[bool]
726 _rev_bind_to_col: Dict[str, ColumnElement[Any]]
727
728 parent_property: RelationshipProperty[Any]
729
730 def __init__(
731 self, parent: RelationshipProperty[Any], strategy_key: Tuple[Any, ...]
732 ):
733 super().__init__(parent, strategy_key)
734 self._raise_always = self.strategy_opts["lazy"] == "raise"
735 self._raise_on_sql = self.strategy_opts["lazy"] == "raise_on_sql"
736
737 self.is_aliased_class = inspect(self.entity).is_aliased_class
738
739 join_condition = self.parent_property._join_condition
740 (
741 self._lazywhere,
742 self._bind_to_col,
743 self._equated_columns,
744 ) = join_condition.create_lazy_clause()
745
746 (
747 self._rev_lazywhere,
748 self._rev_bind_to_col,
749 self._rev_equated_columns,
750 ) = join_condition.create_lazy_clause(reverse_direction=True)
751
752 if self.parent_property.order_by:
753 self._order_by = util.to_list(self.parent_property.order_by)
754 else:
755 self._order_by = None
756
757 self.logger.info("%s lazy loading clause %s", self, self._lazywhere)
758
759 # determine if our "lazywhere" clause is the same as the mapper's
760 # get() clause. then we can just use mapper.get()
761 #
762 # TODO: the "not self.uselist" can be taken out entirely; a m2o
763 # load that populates for a list (very unusual, but is possible with
764 # the API) can still set for "None" and the attribute system will
765 # populate as an empty list.
766 self.use_get = (
767 not self.is_aliased_class
768 and not self.uselist
769 and self.entity._get_clause[0].compare(
770 self._lazywhere,
771 use_proxies=True,
772 compare_keys=False,
773 equivalents=self.mapper._equivalent_columns,
774 )
775 )
776
777 if self.use_get:
778 for col in list(self._equated_columns):
779 if col in self.mapper._equivalent_columns:
780 for c in self.mapper._equivalent_columns[col]:
781 self._equated_columns[c] = self._equated_columns[col]
782
783 self.logger.info(
784 "%s will use Session.get() to optimize instance loads", self
785 )
786
787 def init_class_attribute(self, mapper):
788 self.is_class_level = True
789
790 _legacy_inactive_history_style = (
791 self.parent_property._legacy_inactive_history_style
792 )
793
794 if self.parent_property.active_history:
795 active_history = True
796 _deferred_history = False
797
798 elif (
799 self.parent_property.direction is not interfaces.MANYTOONE
800 or not self.use_get
801 ):
802 if _legacy_inactive_history_style:
803 active_history = True
804 _deferred_history = False
805 else:
806 active_history = False
807 _deferred_history = True
808 else:
809 active_history = _deferred_history = False
810
811 _register_attribute(
812 self.parent_property,
813 mapper,
814 useobject=True,
815 callable_=self._load_for_state,
816 typecallable=self.parent_property.collection_class,
817 active_history=active_history,
818 _deferred_history=_deferred_history,
819 )
820
821 def _memoized_attr__simple_lazy_clause(self):
822 lazywhere = self._lazywhere
823
824 criterion, bind_to_col = (lazywhere, self._bind_to_col)
825
826 params = []
827
828 def visit_bindparam(bindparam):
829 bindparam.unique = False
830
831 visitors.traverse(criterion, {}, {"bindparam": visit_bindparam})
832
833 def visit_bindparam(bindparam):
834 if bindparam._identifying_key in bind_to_col:
835 params.append(
836 (
837 bindparam.key,
838 bind_to_col[bindparam._identifying_key],
839 None,
840 )
841 )
842 elif bindparam.callable is None:
843 params.append((bindparam.key, None, bindparam.value))
844
845 criterion = visitors.cloned_traverse(
846 criterion, {}, {"bindparam": visit_bindparam}
847 )
848
849 return criterion, params
850
851 def _generate_lazy_clause(self, state, passive):
852 criterion, param_keys = self._simple_lazy_clause
853
854 if state is None:
855 return sql_util.adapt_criterion_to_null(
856 criterion, [key for key, ident, value in param_keys]
857 )
858
859 mapper = self.parent_property.parent
860
861 o = state.obj() # strong ref
862 dict_ = attributes.instance_dict(o)
863
864 if passive & PassiveFlag.INIT_OK:
865 passive ^= PassiveFlag.INIT_OK
866
867 params = {}
868 for key, ident, value in param_keys:
869 if ident is not None:
870 if passive and passive & PassiveFlag.LOAD_AGAINST_COMMITTED:
871 value = mapper._get_committed_state_attr_by_column(
872 state, dict_, ident, passive
873 )
874 else:
875 value = mapper._get_state_attr_by_column(
876 state, dict_, ident, passive
877 )
878
879 params[key] = value
880
881 return criterion, params
882
883 def _invoke_raise_load(self, state, passive, lazy):
884 raise sa_exc.InvalidRequestError(
885 "'%s' is not available due to lazy='%s'" % (self, lazy)
886 )
887
888 def _load_for_state(
889 self,
890 state,
891 passive,
892 loadopt=None,
893 extra_criteria=(),
894 extra_options=(),
895 alternate_effective_path=None,
896 execution_options=util.EMPTY_DICT,
897 ):
898 if not state.key and (
899 (
900 not self.parent_property.load_on_pending
901 and not state._load_pending
902 )
903 or not state.session_id
904 ):
905 return LoaderCallableStatus.ATTR_EMPTY
906
907 pending = not state.key
908 primary_key_identity = None
909
910 use_get = self.use_get and (not loadopt or not loadopt._extra_criteria)
911
912 if (not passive & PassiveFlag.SQL_OK and not use_get) or (
913 not passive & attributes.NON_PERSISTENT_OK and pending
914 ):
915 return LoaderCallableStatus.PASSIVE_NO_RESULT
916
917 if (
918 # we were given lazy="raise"
919 self._raise_always
920 # the no_raise history-related flag was not passed
921 and not passive & PassiveFlag.NO_RAISE
922 and (
923 # if we are use_get and related_object_ok is disabled,
924 # which means we are at most looking in the identity map
925 # for history purposes or otherwise returning
926 # PASSIVE_NO_RESULT, don't raise. This is also a
927 # history-related flag
928 not use_get
929 or passive & PassiveFlag.RELATED_OBJECT_OK
930 )
931 ):
932 self._invoke_raise_load(state, passive, "raise")
933
934 session = _state_session(state)
935 if not session:
936 if passive & PassiveFlag.NO_RAISE:
937 return LoaderCallableStatus.PASSIVE_NO_RESULT
938
939 raise orm_exc.DetachedInstanceError(
940 "Parent instance %s is not bound to a Session; "
941 "lazy load operation of attribute '%s' cannot proceed"
942 % (orm_util.state_str(state), self.key)
943 )
944
945 # if we have a simple primary key load, check the
946 # identity map without generating a Query at all
947 if use_get:
948 primary_key_identity = self._get_ident_for_use_get(
949 session, state, passive
950 )
951 if LoaderCallableStatus.PASSIVE_NO_RESULT in primary_key_identity:
952 return LoaderCallableStatus.PASSIVE_NO_RESULT
953 elif LoaderCallableStatus.NEVER_SET in primary_key_identity:
954 return LoaderCallableStatus.NEVER_SET
955
956 # test for None alone in primary_key_identity based on
957 # allow_partial_pks preference. PASSIVE_NO_RESULT and NEVER_SET
958 # have already been tested above
959 if not self.mapper.allow_partial_pks:
960 if _none_only_set.intersection(primary_key_identity):
961 return None
962 else:
963 if _none_only_set.issuperset(primary_key_identity):
964 return None
965
966 if (
967 self.key in state.dict
968 and not passive & PassiveFlag.DEFERRED_HISTORY_LOAD
969 ):
970 return LoaderCallableStatus.ATTR_WAS_SET
971
972 # look for this identity in the identity map. Delegate to the
973 # Query class in use, as it may have special rules for how it
974 # does this, including how it decides what the correct
975 # identity_token would be for this identity.
976
977 instance = session._identity_lookup(
978 self.entity,
979 primary_key_identity,
980 passive=passive,
981 lazy_loaded_from=state,
982 )
983
984 if instance is not None:
985 if instance is LoaderCallableStatus.PASSIVE_CLASS_MISMATCH:
986 return None
987 else:
988 return instance
989 elif (
990 not passive & PassiveFlag.SQL_OK
991 or not passive & PassiveFlag.RELATED_OBJECT_OK
992 ):
993 return LoaderCallableStatus.PASSIVE_NO_RESULT
994
995 return self._emit_lazyload(
996 session,
997 state,
998 primary_key_identity,
999 passive,
1000 loadopt,
1001 extra_criteria,
1002 extra_options,
1003 alternate_effective_path,
1004 execution_options,
1005 )
1006
1007 def _get_ident_for_use_get(self, session, state, passive):
1008 instance_mapper = state.manager.mapper
1009
1010 if passive & PassiveFlag.LOAD_AGAINST_COMMITTED:
1011 get_attr = instance_mapper._get_committed_state_attr_by_column
1012 else:
1013 get_attr = instance_mapper._get_state_attr_by_column
1014
1015 dict_ = state.dict
1016
1017 return [
1018 get_attr(state, dict_, self._equated_columns[pk], passive=passive)
1019 for pk in self.mapper.primary_key
1020 ]
1021
1022 @util.preload_module("sqlalchemy.orm.strategy_options")
1023 def _emit_lazyload(
1024 self,
1025 session,
1026 state,
1027 primary_key_identity,
1028 passive,
1029 loadopt,
1030 extra_criteria,
1031 extra_options,
1032 alternate_effective_path,
1033 execution_options,
1034 ):
1035 strategy_options = util.preloaded.orm_strategy_options
1036
1037 clauseelement = self.entity.__clause_element__()
1038 stmt = Select._create_raw_select(
1039 _raw_columns=[clauseelement],
1040 _propagate_attrs=clauseelement._propagate_attrs,
1041 _compile_options=_ORMCompileState.default_compile_options,
1042 )
1043 load_options = QueryContext.default_load_options
1044
1045 load_options += {
1046 "_invoke_all_eagers": False,
1047 "_lazy_loaded_from": state,
1048 }
1049
1050 if self.parent_property.secondary is not None:
1051 stmt = stmt.select_from(
1052 self.mapper, self.parent_property.secondary
1053 )
1054
1055 pending = not state.key
1056
1057 # don't autoflush on pending
1058 if pending or passive & attributes.NO_AUTOFLUSH:
1059 stmt._execution_options = util.immutabledict({"autoflush": False})
1060
1061 use_get = self.use_get
1062
1063 if state.load_options or (loadopt and loadopt._extra_criteria):
1064 if alternate_effective_path is None:
1065 effective_path = state.load_path[self.parent_property]
1066 else:
1067 effective_path = alternate_effective_path[self.parent_property]
1068
1069 opts = state.load_options
1070
1071 if loadopt and loadopt._extra_criteria:
1072 use_get = False
1073 opts += (
1074 orm_util.LoaderCriteriaOption(self.entity, extra_criteria),
1075 )
1076
1077 stmt._with_options = opts
1078 elif alternate_effective_path is None:
1079 # this path is used if there are not already any options
1080 # in the query, but an event may want to add them
1081 effective_path = state.mapper._path_registry[self.parent_property]
1082 else:
1083 # added by immediateloader
1084 effective_path = alternate_effective_path[self.parent_property]
1085
1086 if extra_options:
1087 stmt._with_options += extra_options
1088
1089 stmt._compile_options += {"_current_path": effective_path}
1090
1091 if use_get:
1092 if self._raise_on_sql and not passive & PassiveFlag.NO_RAISE:
1093 self._invoke_raise_load(state, passive, "raise_on_sql")
1094
1095 return loading._load_on_pk_identity(
1096 session,
1097 stmt,
1098 primary_key_identity,
1099 load_options=load_options,
1100 execution_options=execution_options,
1101 )
1102
1103 if self._order_by:
1104 stmt._order_by_clauses = self._order_by
1105
1106 def _lazyload_reverse(compile_context):
1107 for rev in self.parent_property._reverse_property:
1108 # reverse props that are MANYTOONE are loading *this*
1109 # object from get(), so don't need to eager out to those.
1110 if (
1111 rev.direction is interfaces.MANYTOONE
1112 and rev._use_get
1113 and not isinstance(rev.strategy, _LazyLoader)
1114 ):
1115 strategy_options.Load._construct_for_existing_path(
1116 compile_context.compile_options._current_path[
1117 rev.parent
1118 ]
1119 ).lazyload(rev).process_compile_state(compile_context)
1120
1121 stmt = stmt._add_compile_state_func(
1122 _lazyload_reverse, self.parent_property
1123 )
1124
1125 lazy_clause, params = self._generate_lazy_clause(state, passive)
1126
1127 if execution_options:
1128 execution_options = util.EMPTY_DICT.merge_with(
1129 execution_options, {"_sa_orm_load_options": load_options}
1130 )
1131 else:
1132 execution_options = {
1133 "_sa_orm_load_options": load_options,
1134 }
1135
1136 if (
1137 self.key in state.dict
1138 and not passive & PassiveFlag.DEFERRED_HISTORY_LOAD
1139 ):
1140 return LoaderCallableStatus.ATTR_WAS_SET
1141
1142 if pending:
1143 if util.has_intersection(orm_util._none_set, params.values()):
1144 return None
1145
1146 elif util.has_intersection(orm_util._never_set, params.values()):
1147 return None
1148
1149 if self._raise_on_sql and not passive & PassiveFlag.NO_RAISE:
1150 self._invoke_raise_load(state, passive, "raise_on_sql")
1151
1152 stmt._where_criteria = (lazy_clause,)
1153
1154 result = session.execute(
1155 stmt, params, execution_options=execution_options
1156 )
1157
1158 result = result.unique().scalars().all()
1159
1160 if self.uselist:
1161 return result
1162 else:
1163 l = len(result)
1164 if l:
1165 if l > 1:
1166 util.warn(
1167 "Multiple rows returned with "
1168 "uselist=False for lazily-loaded attribute '%s' "
1169 % self.parent_property
1170 )
1171
1172 return result[0]
1173 else:
1174 return None
1175
1176 def create_row_processor(
1177 self,
1178 context,
1179 query_entity,
1180 path,
1181 loadopt,
1182 mapper,
1183 result,
1184 adapter,
1185 populators,
1186 ):
1187 key = self.key
1188
1189 if (
1190 context.load_options._is_user_refresh
1191 and context.query._compile_options._only_load_props
1192 and self.key in context.query._compile_options._only_load_props
1193 ):
1194 return self._immediateload_create_row_processor(
1195 context,
1196 query_entity,
1197 path,
1198 loadopt,
1199 mapper,
1200 result,
1201 adapter,
1202 populators,
1203 )
1204
1205 if not self.is_class_level or (loadopt and loadopt._extra_criteria):
1206 # we are not the primary manager for this attribute
1207 # on this class - set up a
1208 # per-instance lazyloader, which will override the
1209 # class-level behavior.
1210 # this currently only happens when using a
1211 # "lazyload" option on a "no load"
1212 # attribute - "eager" attributes always have a
1213 # class-level lazyloader installed.
1214 set_lazy_callable = (
1215 InstanceState._instance_level_callable_processor
1216 )(
1217 mapper.class_manager,
1218 _LoadLazyAttribute(
1219 key,
1220 self,
1221 loadopt,
1222 (
1223 loadopt._generate_extra_criteria(context)
1224 if loadopt._extra_criteria
1225 else None
1226 ),
1227 ),
1228 key,
1229 )
1230
1231 populators["new"].append((self.key, set_lazy_callable))
1232 elif context.populate_existing or mapper.always_refresh:
1233
1234 def reset_for_lazy_callable(state, dict_, row):
1235 # we are the primary manager for this attribute on
1236 # this class - reset its
1237 # per-instance attribute state, so that the class-level
1238 # lazy loader is
1239 # executed when next referenced on this instance.
1240 # this is needed in
1241 # populate_existing() types of scenarios to reset
1242 # any existing state.
1243 state._reset(dict_, key)
1244
1245 populators["new"].append((self.key, reset_for_lazy_callable))
1246
1247
1248class _LoadLazyAttribute:
1249 """semi-serializable loader object used by LazyLoader
1250
1251 Historically, this object would be carried along with instances that
1252 needed to run lazyloaders, so it had to be serializable to support
1253 cached instances.
1254
1255 this is no longer a general requirement, and the case where this object
1256 is used is exactly the case where we can't really serialize easily,
1257 which is when extra criteria in the loader option is present.
1258
1259 We can't reliably serialize that as it refers to mapped entities and
1260 AliasedClass objects that are local to the current process, which would
1261 need to be matched up on deserialize e.g. the sqlalchemy.ext.serializer
1262 approach.
1263
1264 """
1265
1266 def __init__(self, key, initiating_strategy, loadopt, extra_criteria):
1267 self.key = key
1268 self.strategy_key = initiating_strategy.strategy_key
1269 self.loadopt = loadopt
1270 self.extra_criteria = extra_criteria
1271
1272 def __getstate__(self):
1273 if self.extra_criteria is not None:
1274 util.warn(
1275 "Can't reliably serialize a lazyload() option that "
1276 "contains additional criteria; please use eager loading "
1277 "for this case"
1278 )
1279 return {
1280 "key": self.key,
1281 "strategy_key": self.strategy_key,
1282 "loadopt": self.loadopt,
1283 "extra_criteria": (),
1284 }
1285
1286 def __call__(self, state, passive=attributes.PASSIVE_OFF):
1287 key = self.key
1288 instance_mapper = state.manager.mapper
1289 prop = instance_mapper._props[key]
1290 strategy = prop._strategies[self.strategy_key]
1291
1292 return strategy._load_for_state(
1293 state,
1294 passive,
1295 loadopt=self.loadopt,
1296 extra_criteria=self.extra_criteria,
1297 )
1298
1299
1300class _PostLoader(_AbstractRelationshipLoader):
1301 """A relationship loader that emits a second SELECT statement."""
1302
1303 __slots__ = ()
1304
1305 def _setup_for_recursion(self, context, path, loadopt, join_depth=None):
1306 effective_path = (
1307 context.compile_state.current_path or orm_util.PathRegistry.root
1308 ) + path
1309
1310 top_level_context = context._get_top_level_context()
1311 execution_options = util.immutabledict(
1312 {"sa_top_level_orm_context": top_level_context}
1313 )
1314
1315 if loadopt:
1316 recursion_depth = loadopt.local_opts.get("recursion_depth", None)
1317 unlimited_recursion = recursion_depth == -1
1318 else:
1319 recursion_depth = None
1320 unlimited_recursion = False
1321
1322 if recursion_depth is not None:
1323 if not self.parent_property._is_self_referential:
1324 raise sa_exc.InvalidRequestError(
1325 f"recursion_depth option on relationship "
1326 f"{self.parent_property} not valid for "
1327 "non-self-referential relationship"
1328 )
1329 recursion_depth = context.execution_options.get(
1330 f"_recursion_depth_{id(self)}", recursion_depth
1331 )
1332
1333 if not unlimited_recursion and recursion_depth < 0:
1334 return (
1335 effective_path,
1336 False,
1337 execution_options,
1338 recursion_depth,
1339 )
1340
1341 if not unlimited_recursion:
1342 execution_options = execution_options.union(
1343 {
1344 f"_recursion_depth_{id(self)}": recursion_depth - 1,
1345 }
1346 )
1347
1348 if loading._PostLoad.path_exists(
1349 context, effective_path, self.parent_property
1350 ):
1351 return effective_path, False, execution_options, recursion_depth
1352
1353 path_w_prop = path[self.parent_property]
1354 effective_path_w_prop = effective_path[self.parent_property]
1355
1356 if not path_w_prop.contains(context.attributes, "loader"):
1357 if join_depth:
1358 if effective_path_w_prop.length / 2 > join_depth:
1359 return (
1360 effective_path,
1361 False,
1362 execution_options,
1363 recursion_depth,
1364 )
1365 elif effective_path_w_prop.contains_mapper(self.mapper):
1366 return (
1367 effective_path,
1368 False,
1369 execution_options,
1370 recursion_depth,
1371 )
1372
1373 return effective_path, True, execution_options, recursion_depth
1374
1375
1376@relationships.RelationshipProperty.strategy_for(lazy="immediate")
1377class _ImmediateLoader(_PostLoader):
1378 __slots__ = ("join_depth",)
1379
1380 def __init__(self, parent, strategy_key):
1381 super().__init__(parent, strategy_key)
1382 self.join_depth = self.parent_property.join_depth
1383
1384 def init_class_attribute(self, mapper):
1385 self.parent_property._get_strategy(
1386 (("lazy", "select"),)
1387 ).init_class_attribute(mapper)
1388
1389 def create_row_processor(
1390 self,
1391 context,
1392 query_entity,
1393 path,
1394 loadopt,
1395 mapper,
1396 result,
1397 adapter,
1398 populators,
1399 ):
1400 if not context.compile_state.compile_options._enable_eagerloads:
1401 return
1402
1403 (
1404 effective_path,
1405 run_loader,
1406 execution_options,
1407 recursion_depth,
1408 ) = self._setup_for_recursion(context, path, loadopt, self.join_depth)
1409
1410 if not run_loader:
1411 # this will not emit SQL and will only emit for a many-to-one
1412 # "use get" load. the "_RELATED" part means it may return
1413 # instance even if its expired, since this is a mutually-recursive
1414 # load operation.
1415 flags = attributes.PASSIVE_NO_FETCH_RELATED | PassiveFlag.NO_RAISE
1416 else:
1417 flags = attributes.PASSIVE_OFF | PassiveFlag.NO_RAISE
1418
1419 loading._PostLoad.callable_for_path(
1420 context,
1421 effective_path,
1422 self.parent,
1423 self.parent_property,
1424 self._load_for_path,
1425 loadopt,
1426 flags,
1427 recursion_depth,
1428 execution_options,
1429 )
1430
1431 def _load_for_path(
1432 self,
1433 context,
1434 path,
1435 states,
1436 load_only,
1437 loadopt,
1438 flags,
1439 recursion_depth,
1440 execution_options,
1441 ):
1442 if recursion_depth:
1443 new_opt = Load(loadopt.path.entity)
1444 new_opt.context = (
1445 loadopt,
1446 loadopt._recurse(),
1447 )
1448 alternate_effective_path = path._truncate_recursive()
1449 extra_options = (new_opt,)
1450 else:
1451 alternate_effective_path = path
1452 extra_options = ()
1453
1454 key = self.key
1455 lazyloader = self.parent_property._get_strategy((("lazy", "select"),))
1456 for state, overwrite in states:
1457 dict_ = state.dict
1458
1459 if overwrite or key not in dict_:
1460 value = lazyloader._load_for_state(
1461 state,
1462 flags,
1463 extra_options=extra_options,
1464 alternate_effective_path=alternate_effective_path,
1465 execution_options=execution_options,
1466 )
1467 if value not in (
1468 ATTR_WAS_SET,
1469 LoaderCallableStatus.PASSIVE_NO_RESULT,
1470 ):
1471 state.get_impl(key).set_committed_value(
1472 state, dict_, value
1473 )
1474
1475
1476@log.class_logger
1477@relationships.RelationshipProperty.strategy_for(lazy="subquery")
1478class _SubqueryLoader(_PostLoader):
1479 __slots__ = ("join_depth",)
1480
1481 def __init__(self, parent, strategy_key):
1482 super().__init__(parent, strategy_key)
1483 self.join_depth = self.parent_property.join_depth
1484
1485 def init_class_attribute(self, mapper):
1486 self.parent_property._get_strategy(
1487 (("lazy", "select"),)
1488 ).init_class_attribute(mapper)
1489
1490 def _get_leftmost(
1491 self,
1492 orig_query_entity_index,
1493 subq_path,
1494 current_compile_state,
1495 is_root,
1496 ):
1497 given_subq_path = subq_path
1498 subq_path = subq_path.path
1499 subq_mapper = orm_util._class_to_mapper(subq_path[0])
1500
1501 # determine attributes of the leftmost mapper
1502 if (
1503 self.parent.isa(subq_mapper)
1504 and self.parent_property is subq_path[1]
1505 ):
1506 leftmost_mapper, leftmost_prop = self.parent, self.parent_property
1507 else:
1508 leftmost_mapper, leftmost_prop = subq_mapper, subq_path[1]
1509
1510 if is_root:
1511 # the subq_path is also coming from cached state, so when we start
1512 # building up this path, it has to also be converted to be in terms
1513 # of the current state. this is for the specific case of the entity
1514 # is an AliasedClass against a subquery that's not otherwise going
1515 # to adapt
1516 new_subq_path = current_compile_state._entities[
1517 orig_query_entity_index
1518 ].entity_zero._path_registry[leftmost_prop]
1519 additional = len(subq_path) - len(new_subq_path)
1520 if additional:
1521 new_subq_path += path_registry.PathRegistry.coerce(
1522 subq_path[-additional:]
1523 )
1524 else:
1525 new_subq_path = given_subq_path
1526
1527 leftmost_cols = leftmost_prop.local_columns
1528
1529 leftmost_attr = [
1530 getattr(
1531 new_subq_path.path[0].entity,
1532 leftmost_mapper._columntoproperty[c].key,
1533 )
1534 for c in leftmost_cols
1535 ]
1536
1537 return leftmost_mapper, leftmost_attr, leftmost_prop, new_subq_path
1538
1539 def _generate_from_original_query(
1540 self,
1541 orig_compile_state,
1542 orig_query,
1543 leftmost_mapper,
1544 leftmost_attr,
1545 leftmost_relationship,
1546 orig_entity,
1547 ):
1548 # reformat the original query
1549 # to look only for significant columns
1550 q = orig_query._clone().correlate(None)
1551
1552 # LEGACY: make a Query back from the select() !!
1553 # This suits at least two legacy cases:
1554 # 1. applications which expect before_compile() to be called
1555 # below when we run .subquery() on this query (Keystone)
1556 # 2. applications which are doing subqueryload with complex
1557 # from_self() queries, as query.subquery() / .statement
1558 # has to do the full compile context for multiply-nested
1559 # from_self() (Neutron) - see test_subqload_from_self
1560 # for demo.
1561 q2 = query.Query.__new__(query.Query)
1562 q2.__dict__.update(q.__dict__)
1563 q = q2
1564
1565 # set the query's "FROM" list explicitly to what the
1566 # FROM list would be in any case, as we will be limiting
1567 # the columns in the SELECT list which may no longer include
1568 # all entities mentioned in things like WHERE, JOIN, etc.
1569 if not q._from_obj:
1570 q._enable_assertions = False
1571 q.select_from.non_generative(
1572 q,
1573 *{
1574 ent["entity"]
1575 for ent in _column_descriptions(
1576 orig_query, compile_state=orig_compile_state
1577 )
1578 if ent["entity"] is not None
1579 },
1580 )
1581
1582 # select from the identity columns of the outer (specifically, these
1583 # are the 'local_cols' of the property). This will remove other
1584 # columns from the query that might suggest the right entity which is
1585 # why we do set select_from above. The attributes we have are
1586 # coerced and adapted using the original query's adapter, which is
1587 # needed only for the case of adapting a subclass column to
1588 # that of a polymorphic selectable, e.g. we have
1589 # Engineer.primary_language and the entity is Person. All other
1590 # adaptations, e.g. from_self, select_entity_from(), will occur
1591 # within the new query when it compiles, as the compile_state we are
1592 # using here is only a partial one. If the subqueryload is from a
1593 # with_polymorphic() or other aliased() object, left_attr will already
1594 # be the correct attributes so no adaptation is needed.
1595 target_cols = orig_compile_state._adapt_col_list(
1596 [
1597 sql.coercions.expect(sql.roles.ColumnsClauseRole, o)
1598 for o in leftmost_attr
1599 ],
1600 orig_compile_state._get_current_adapter(),
1601 )
1602 q._raw_columns = target_cols
1603
1604 distinct_target_key = leftmost_relationship.distinct_target_key
1605
1606 if distinct_target_key is True:
1607 q._distinct = True
1608 elif distinct_target_key is None:
1609 # if target_cols refer to a non-primary key or only
1610 # part of a composite primary key, set the q as distinct
1611 for t in {c.table for c in target_cols}:
1612 if not set(target_cols).issuperset(t.primary_key):
1613 q._distinct = True
1614 break
1615
1616 # don't need ORDER BY if no limit/offset
1617 if not q._has_row_limiting_clause:
1618 q._order_by_clauses = ()
1619
1620 if q._distinct is True and q._order_by_clauses:
1621 # the logic to automatically add the order by columns to the query
1622 # when distinct is True is deprecated in the query
1623 to_add = sql_util.expand_column_list_from_order_by(
1624 target_cols, q._order_by_clauses
1625 )
1626 if to_add:
1627 q._set_entities(target_cols + to_add)
1628
1629 # the original query now becomes a subquery
1630 # which we'll join onto.
1631 # LEGACY: as "q" is a Query, the before_compile() event is invoked
1632 # here.
1633 embed_q = q.set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL).subquery()
1634 left_alias = orm_util.AliasedClass(
1635 leftmost_mapper, embed_q, use_mapper_path=True
1636 )
1637 return left_alias
1638
1639 def _prep_for_joins(self, left_alias, subq_path):
1640 # figure out what's being joined. a.k.a. the fun part
1641 to_join = []
1642 pairs = list(subq_path.pairs())
1643
1644 for i, (mapper, prop) in enumerate(pairs):
1645 if i > 0:
1646 # look at the previous mapper in the chain -
1647 # if it is as or more specific than this prop's
1648 # mapper, use that instead.
1649 # note we have an assumption here that
1650 # the non-first element is always going to be a mapper,
1651 # not an AliasedClass
1652
1653 prev_mapper = pairs[i - 1][1].mapper
1654 to_append = prev_mapper if prev_mapper.isa(mapper) else mapper
1655 else:
1656 to_append = mapper
1657
1658 to_join.append((to_append, prop.key))
1659
1660 # determine the immediate parent class we are joining from,
1661 # which needs to be aliased.
1662
1663 if len(to_join) < 2:
1664 # in the case of a one level eager load, this is the
1665 # leftmost "left_alias".
1666 parent_alias = left_alias
1667 else:
1668 info = inspect(to_join[-1][0])
1669 if info.is_aliased_class:
1670 parent_alias = info.entity
1671 else:
1672 # alias a plain mapper as we may be
1673 # joining multiple times
1674 parent_alias = orm_util.AliasedClass(
1675 info.entity, use_mapper_path=True
1676 )
1677
1678 local_cols = self.parent_property.local_columns
1679
1680 local_attr = [
1681 getattr(parent_alias, self.parent._columntoproperty[c].key)
1682 for c in local_cols
1683 ]
1684 return to_join, local_attr, parent_alias
1685
1686 def _apply_joins(
1687 self, q, to_join, left_alias, parent_alias, effective_entity
1688 ):
1689 ltj = len(to_join)
1690 if ltj == 1:
1691 to_join = [
1692 getattr(left_alias, to_join[0][1]).of_type(effective_entity)
1693 ]
1694 elif ltj == 2:
1695 to_join = [
1696 getattr(left_alias, to_join[0][1]).of_type(parent_alias),
1697 getattr(parent_alias, to_join[-1][1]).of_type(
1698 effective_entity
1699 ),
1700 ]
1701 elif ltj > 2:
1702 middle = [
1703 (
1704 (
1705 orm_util.AliasedClass(item[0])
1706 if not inspect(item[0]).is_aliased_class
1707 else item[0].entity
1708 ),
1709 item[1],
1710 )
1711 for item in to_join[1:-1]
1712 ]
1713 inner = []
1714
1715 while middle:
1716 item = middle.pop(0)
1717 attr = getattr(item[0], item[1])
1718 if middle:
1719 attr = attr.of_type(middle[0][0])
1720 else:
1721 attr = attr.of_type(parent_alias)
1722
1723 inner.append(attr)
1724
1725 to_join = (
1726 [getattr(left_alias, to_join[0][1]).of_type(inner[0].parent)]
1727 + inner
1728 + [
1729 getattr(parent_alias, to_join[-1][1]).of_type(
1730 effective_entity
1731 )
1732 ]
1733 )
1734
1735 for attr in to_join:
1736 q = q.join(attr)
1737
1738 return q
1739
1740 def _setup_options(
1741 self,
1742 context,
1743 q,
1744 subq_path,
1745 rewritten_path,
1746 orig_query,
1747 effective_entity,
1748 loadopt,
1749 ):
1750 # note that because the subqueryload object
1751 # does not reuse the cached query, instead always making
1752 # use of the current invoked query, while we have two queries
1753 # here (orig and context.query), they are both non-cached
1754 # queries and we can transfer the options as is without
1755 # adjusting for new criteria. Some work on #6881 / #6889
1756 # brought this into question.
1757 new_options = orig_query._with_options
1758
1759 if loadopt and loadopt._extra_criteria:
1760 new_options += (
1761 orm_util.LoaderCriteriaOption(
1762 effective_entity,
1763 loadopt._generate_extra_criteria(context),
1764 ),
1765 )
1766
1767 # propagate loader options etc. to the new query.
1768 # these will fire relative to subq_path.
1769 q = q._with_current_path(rewritten_path)
1770 q = q.options(*new_options)
1771
1772 return q
1773
1774 def _setup_outermost_orderby(self, q):
1775 if self.parent_property.order_by:
1776
1777 def _setup_outermost_orderby(compile_context):
1778 compile_context.eager_order_by += tuple(
1779 util.to_list(self.parent_property.order_by)
1780 )
1781
1782 q = q._add_compile_state_func(
1783 _setup_outermost_orderby, self.parent_property
1784 )
1785
1786 return q
1787
1788 class _SubqCollections:
1789 """Given a :class:`_query.Query` used to emit the "subquery load",
1790 provide a load interface that executes the query at the
1791 first moment a value is needed.
1792
1793 """
1794
1795 __slots__ = (
1796 "session",
1797 "execution_options",
1798 "load_options",
1799 "params",
1800 "subq",
1801 "_data",
1802 )
1803
1804 def __init__(self, context, subq):
1805 # avoid creating a cycle by storing context
1806 # even though that's preferable
1807 self.session = context.session
1808 self.execution_options = context.execution_options
1809 self.load_options = context.load_options
1810 self.params = context.params or {}
1811 self.subq = subq
1812 self._data = None
1813
1814 def get(self, key, default):
1815 if self._data is None:
1816 self._load()
1817 return self._data.get(key, default)
1818
1819 def _load(self):
1820 self._data = collections.defaultdict(list)
1821
1822 q = self.subq
1823 assert q.session is None
1824
1825 q = q.with_session(self.session)
1826
1827 if self.load_options._populate_existing:
1828 q = q.populate_existing()
1829 # to work with baked query, the parameters may have been
1830 # updated since this query was created, so take these into account
1831
1832 data = self._data
1833 for row in q.params(self.params):
1834 # group plain tuples rather than Row slices, which would
1835 # incur Row construction and Row.__eq__ per row
1836 tup = row._to_tuple_instance()
1837 data[tup[1:]].append(tup[0])
1838
1839 def loader(self, state, dict_, row):
1840 if self._data is None:
1841 self._load()
1842
1843 def _setup_query_from_rowproc(
1844 self,
1845 context,
1846 query_entity,
1847 path,
1848 entity,
1849 loadopt,
1850 adapter,
1851 ):
1852 compile_state = context.compile_state
1853 if (
1854 not compile_state.compile_options._enable_eagerloads
1855 or compile_state.compile_options._for_refresh_state
1856 ):
1857 return
1858
1859 orig_query_entity_index = compile_state._entities.index(query_entity)
1860 context.loaders_require_buffering = True
1861
1862 path = path[self.parent_property]
1863
1864 # build up a path indicating the path from the leftmost
1865 # entity to the thing we're subquery loading.
1866 with_poly_entity = path.get(
1867 compile_state.attributes, "path_with_polymorphic", None
1868 )
1869 if with_poly_entity is not None:
1870 effective_entity = with_poly_entity
1871 else:
1872 effective_entity = self.entity
1873
1874 subq_path, rewritten_path = context.query._execution_options.get(
1875 ("subquery_paths", None),
1876 (orm_util.PathRegistry.root, orm_util.PathRegistry.root),
1877 )
1878 is_root = subq_path is orm_util.PathRegistry.root
1879 subq_path = subq_path + path
1880 rewritten_path = rewritten_path + path
1881
1882 # use the current query being invoked, not the compile state
1883 # one. this is so that we get the current parameters. however,
1884 # it means we can't use the existing compile state, we have to make
1885 # a new one. other approaches include possibly using the
1886 # compiled query but swapping the params, seems only marginally
1887 # less time spent but more complicated
1888 orig_query = context.query._execution_options.get(
1889 ("orig_query", _SubqueryLoader), context.query
1890 )
1891
1892 # make a new compile_state for the query that's probably cached, but
1893 # we're sort of undoing a bit of that caching :(
1894 compile_state_cls = _ORMCompileState._get_plugin_class_for_plugin(
1895 orig_query, "orm"
1896 )
1897
1898 if orig_query._is_lambda_element:
1899 if context.load_options._lazy_loaded_from is None:
1900 util.warn(
1901 'subqueryloader for "%s" must invoke lambda callable '
1902 "at %r in "
1903 "order to produce a new query, decreasing the efficiency "
1904 "of caching for this statement. Consider using "
1905 "selectinload() for more effective full-lambda caching"
1906 % (self, orig_query)
1907 )
1908 orig_query = orig_query._resolved
1909
1910 # this is the more "quick" version, however it's not clear how
1911 # much of this we need. in particular I can't get a test to
1912 # fail if the "set_base_alias" is missing and not sure why that is.
1913 orig_compile_state = compile_state_cls._create_entities_collection(
1914 orig_query, legacy=False
1915 )
1916
1917 (
1918 leftmost_mapper,
1919 leftmost_attr,
1920 leftmost_relationship,
1921 rewritten_path,
1922 ) = self._get_leftmost(
1923 orig_query_entity_index,
1924 rewritten_path,
1925 orig_compile_state,
1926 is_root,
1927 )
1928
1929 # generate a new Query from the original, then
1930 # produce a subquery from it.
1931 left_alias = self._generate_from_original_query(
1932 orig_compile_state,
1933 orig_query,
1934 leftmost_mapper,
1935 leftmost_attr,
1936 leftmost_relationship,
1937 entity,
1938 )
1939
1940 # generate another Query that will join the
1941 # left alias to the target relationships.
1942 # basically doing a longhand
1943 # "from_self()". (from_self() itself not quite industrial
1944 # strength enough for all contingencies...but very close)
1945
1946 q = query.Query(effective_entity)
1947
1948 q._execution_options = context.query._execution_options.merge_with(
1949 context.execution_options,
1950 {
1951 ("orig_query", _SubqueryLoader): orig_query,
1952 ("subquery_paths", None): (subq_path, rewritten_path),
1953 },
1954 )
1955
1956 q = q._set_enable_single_crit(False)
1957 to_join, local_attr, parent_alias = self._prep_for_joins(
1958 left_alias, subq_path
1959 )
1960
1961 q = q.add_columns(*local_attr)
1962 q = self._apply_joins(
1963 q, to_join, left_alias, parent_alias, effective_entity
1964 )
1965
1966 q = self._setup_options(
1967 context,
1968 q,
1969 subq_path,
1970 rewritten_path,
1971 orig_query,
1972 effective_entity,
1973 loadopt,
1974 )
1975 q = self._setup_outermost_orderby(q)
1976
1977 return q
1978
1979 def create_row_processor(
1980 self,
1981 context,
1982 query_entity,
1983 path,
1984 loadopt,
1985 mapper,
1986 result,
1987 adapter,
1988 populators,
1989 ):
1990 if (
1991 loadopt
1992 and context.compile_state.statement is not None
1993 and context.compile_state.statement.is_dml
1994 ):
1995 util.warn_deprecated(
1996 "The subqueryload loader option is not compatible with DML "
1997 "statements such as INSERT, UPDATE. Only SELECT may be used."
1998 "This warning will become an exception in a future release.",
1999 "2.0",
2000 )
2001
2002 if context.refresh_state:
2003 return self._immediateload_create_row_processor(
2004 context,
2005 query_entity,
2006 path,
2007 loadopt,
2008 mapper,
2009 result,
2010 adapter,
2011 populators,
2012 )
2013
2014 _, run_loader, _, _ = self._setup_for_recursion(
2015 context, path, loadopt, self.join_depth
2016 )
2017 if not run_loader:
2018 return
2019
2020 if not isinstance(context.compile_state, _ORMSelectCompileState):
2021 # issue 7505 - subqueryload() in 1.3 and previous would silently
2022 # degrade for from_statement() without warning. this behavior
2023 # is restored here
2024 return
2025
2026 if not self.parent.class_manager[self.key].impl.supports_population:
2027 raise sa_exc.InvalidRequestError(
2028 "'%s' does not support object "
2029 "population - eager loading cannot be applied." % self
2030 )
2031
2032 # a little dance here as the "path" is still something that only
2033 # semi-tracks the exact series of things we are loading, still not
2034 # telling us about with_polymorphic() and stuff like that when it's at
2035 # the root.. the initial MapperEntity is more accurate for this case.
2036 if len(path) == 1:
2037 if not orm_util._entity_isa(query_entity.entity_zero, self.parent):
2038 return
2039 elif not orm_util._entity_isa(
2040 path[-1], self.parent
2041 ) and not self.parent.isa(path[-1].mapper):
2042 # second check accommodates a polymorphic entity where
2043 # the path has been normalized to the base mapper but
2044 # self.parent is a subclass mapper. Fixes #13209.
2045 return
2046
2047 subq = self._setup_query_from_rowproc(
2048 context,
2049 query_entity,
2050 path,
2051 path[-1],
2052 loadopt,
2053 adapter,
2054 )
2055
2056 if subq is None:
2057 return
2058
2059 assert subq.session is None
2060
2061 path = path[self.parent_property]
2062
2063 local_cols = self.parent_property.local_columns
2064
2065 # cache the loaded collections in the context
2066 # so that inheriting mappers don't re-load when they
2067 # call upon create_row_processor again
2068 collections = path.get(context.attributes, "collections")
2069 if collections is None:
2070 collections = self._SubqCollections(context, subq)
2071 path.set(context.attributes, "collections", collections)
2072
2073 if adapter:
2074 local_cols = [adapter.columns[c] for c in local_cols]
2075
2076 if self.uselist:
2077 self._create_collection_loader(
2078 context, result, collections, local_cols, populators
2079 )
2080 else:
2081 self._create_scalar_loader(
2082 context, result, collections, local_cols, populators
2083 )
2084
2085 def _create_collection_loader(
2086 self, context, result, collections, local_cols, populators
2087 ):
2088 tuple_getter = result._tuple_getter(local_cols)
2089
2090 def load_collection_from_subq(state, dict_, row):
2091 collection = collections.get(tuple_getter(row), ())
2092 state.get_impl(self.key).set_committed_value(
2093 state, dict_, collection
2094 )
2095
2096 def load_collection_from_subq_existing_row(state, dict_, row):
2097 if self.key not in dict_:
2098 load_collection_from_subq(state, dict_, row)
2099
2100 populators["new"].append((self.key, load_collection_from_subq))
2101 populators["existing"].append(
2102 (self.key, load_collection_from_subq_existing_row)
2103 )
2104
2105 if context.invoke_all_eagers:
2106 populators["eager"].append((self.key, collections.loader))
2107
2108 def _create_scalar_loader(
2109 self, context, result, collections, local_cols, populators
2110 ):
2111 tuple_getter = result._tuple_getter(local_cols)
2112
2113 def load_scalar_from_subq(state, dict_, row):
2114 collection = collections.get(tuple_getter(row), (None,))
2115 if len(collection) > 1:
2116 util.warn(
2117 "Multiple rows returned with "
2118 "uselist=False for eagerly-loaded attribute '%s' " % self
2119 )
2120
2121 scalar = collection[0]
2122 state.get_impl(self.key).set_committed_value(state, dict_, scalar)
2123
2124 def load_scalar_from_subq_existing_row(state, dict_, row):
2125 if self.key not in dict_:
2126 load_scalar_from_subq(state, dict_, row)
2127
2128 populators["new"].append((self.key, load_scalar_from_subq))
2129 populators["existing"].append(
2130 (self.key, load_scalar_from_subq_existing_row)
2131 )
2132 if context.invoke_all_eagers:
2133 populators["eager"].append((self.key, collections.loader))
2134
2135
2136@log.class_logger
2137@relationships.RelationshipProperty.strategy_for(lazy="joined")
2138@relationships.RelationshipProperty.strategy_for(lazy=False)
2139class _JoinedLoader(_AbstractRelationshipLoader):
2140 """Provide loading behavior for a :class:`.Relationship`
2141 using joined eager loading.
2142
2143 """
2144
2145 __slots__ = "join_depth"
2146
2147 def __init__(self, parent, strategy_key):
2148 super().__init__(parent, strategy_key)
2149 self.join_depth = self.parent_property.join_depth
2150
2151 def init_class_attribute(self, mapper):
2152 self.parent_property._get_strategy(
2153 (("lazy", "select"),)
2154 ).init_class_attribute(mapper)
2155
2156 def setup_query(
2157 self,
2158 compile_state,
2159 query_entity,
2160 path,
2161 loadopt,
2162 adapter,
2163 column_collection=None,
2164 parentmapper=None,
2165 chained_from_outerjoin=False,
2166 **kwargs,
2167 ):
2168 """Add a left outer join to the statement that's being constructed."""
2169
2170 if not compile_state.compile_options._enable_eagerloads:
2171 return
2172 elif (
2173 loadopt
2174 and compile_state.statement is not None
2175 and compile_state.statement.is_dml
2176 ):
2177 util.warn_deprecated(
2178 "The joinedload loader option is not compatible with DML "
2179 "statements such as INSERT, UPDATE. Only SELECT may be used."
2180 "This warning will become an exception in a future release.",
2181 "2.0",
2182 )
2183 elif self.uselist:
2184 compile_state.multi_row_eager_loaders = True
2185
2186 path = path[self.parent_property]
2187
2188 user_defined_adapter = (
2189 self._init_user_defined_eager_proc(
2190 loadopt, compile_state, compile_state.attributes
2191 )
2192 if loadopt
2193 else False
2194 )
2195
2196 if user_defined_adapter is not False:
2197 # setup an adapter but dont create any JOIN, assume it's already
2198 # in the query
2199 (
2200 clauses,
2201 adapter,
2202 add_to_collection,
2203 ) = self._setup_query_on_user_defined_adapter(
2204 compile_state,
2205 query_entity,
2206 path,
2207 adapter,
2208 user_defined_adapter,
2209 )
2210
2211 # don't do "wrap" for multi-row, we want to wrap
2212 # limited/distinct SELECT,
2213 # because we want to put the JOIN on the outside.
2214
2215 else:
2216 # if not via query option, check for
2217 # a cycle
2218 if not path.contains(compile_state.attributes, "loader"):
2219 if self.join_depth:
2220 if path.length / 2 > self.join_depth:
2221 return
2222 elif path.contains_mapper(self.mapper):
2223 return
2224
2225 # add the JOIN and create an adapter
2226 (
2227 clauses,
2228 adapter,
2229 add_to_collection,
2230 chained_from_outerjoin,
2231 ) = self._generate_row_adapter(
2232 compile_state,
2233 query_entity,
2234 path,
2235 loadopt,
2236 adapter,
2237 column_collection,
2238 parentmapper,
2239 chained_from_outerjoin,
2240 )
2241
2242 # for multi-row, we want to wrap limited/distinct SELECT,
2243 # because we want to put the JOIN on the outside.
2244 compile_state.eager_adding_joins = True
2245
2246 with_poly_entity = path.get(
2247 compile_state.attributes, "path_with_polymorphic", None
2248 )
2249 if with_poly_entity is not None:
2250 with_polymorphic = inspect(
2251 with_poly_entity
2252 ).with_polymorphic_mappers
2253 else:
2254 with_polymorphic = None
2255
2256 path = path[self.entity]
2257
2258 loading._setup_entity_query(
2259 compile_state,
2260 self.mapper,
2261 query_entity,
2262 path,
2263 clauses,
2264 add_to_collection,
2265 with_polymorphic=with_polymorphic,
2266 parentmapper=self.mapper,
2267 chained_from_outerjoin=chained_from_outerjoin,
2268 )
2269
2270 has_nones = util.NONE_SET.intersection(compile_state.secondary_columns)
2271
2272 if has_nones:
2273 if with_poly_entity is not None:
2274 raise sa_exc.InvalidRequestError(
2275 "Detected unaliased columns when generating joined "
2276 "load. Make sure to use aliased=True or flat=True "
2277 "when using joined loading with with_polymorphic()."
2278 )
2279 else:
2280 compile_state.secondary_columns = [
2281 c for c in compile_state.secondary_columns if c is not None
2282 ]
2283
2284 def _init_user_defined_eager_proc(
2285 self, loadopt, compile_state, target_attributes
2286 ):
2287 # check if the opt applies at all
2288 if "eager_from_alias" not in loadopt.local_opts:
2289 # nope
2290 return False
2291
2292 path = loadopt.path.parent
2293
2294 # the option applies. check if the "user_defined_eager_row_processor"
2295 # has been built up.
2296 adapter = path.get(
2297 compile_state.attributes, "user_defined_eager_row_processor", False
2298 )
2299 if adapter is not False:
2300 # just return it
2301 return adapter
2302
2303 # otherwise figure it out.
2304 alias = loadopt.local_opts["eager_from_alias"]
2305 root_mapper, prop = path[-2:]
2306
2307 if alias is not None:
2308 if isinstance(alias, str):
2309 alias = prop.target.alias(alias)
2310 adapter = orm_util.ORMAdapter(
2311 orm_util._TraceAdaptRole.JOINEDLOAD_USER_DEFINED_ALIAS,
2312 prop.mapper,
2313 selectable=alias,
2314 equivalents=prop.mapper._equivalent_columns,
2315 limit_on_entity=False,
2316 )
2317 else:
2318 if path.contains(
2319 compile_state.attributes, "path_with_polymorphic"
2320 ):
2321 with_poly_entity = path.get(
2322 compile_state.attributes, "path_with_polymorphic"
2323 )
2324 adapter = orm_util.ORMAdapter(
2325 orm_util._TraceAdaptRole.JOINEDLOAD_PATH_WITH_POLYMORPHIC,
2326 with_poly_entity,
2327 equivalents=prop.mapper._equivalent_columns,
2328 )
2329 else:
2330 adapter = compile_state._polymorphic_adapters.get(
2331 prop.mapper, None
2332 )
2333 path.set(
2334 target_attributes,
2335 "user_defined_eager_row_processor",
2336 adapter,
2337 )
2338
2339 return adapter
2340
2341 def _setup_query_on_user_defined_adapter(
2342 self, context, entity, path, adapter, user_defined_adapter
2343 ):
2344 # apply some more wrapping to the "user defined adapter"
2345 # if we are setting up the query for SQL render.
2346 adapter = entity._get_entity_clauses(context)
2347
2348 if adapter and user_defined_adapter:
2349 user_defined_adapter = user_defined_adapter.wrap(adapter)
2350 path.set(
2351 context.attributes,
2352 "user_defined_eager_row_processor",
2353 user_defined_adapter,
2354 )
2355 elif adapter:
2356 user_defined_adapter = adapter
2357 path.set(
2358 context.attributes,
2359 "user_defined_eager_row_processor",
2360 user_defined_adapter,
2361 )
2362
2363 add_to_collection = context.primary_columns
2364 return user_defined_adapter, adapter, add_to_collection
2365
2366 def _generate_row_adapter(
2367 self,
2368 compile_state,
2369 entity,
2370 path,
2371 loadopt,
2372 adapter,
2373 column_collection,
2374 parentmapper,
2375 chained_from_outerjoin,
2376 ):
2377 with_poly_entity = path.get(
2378 compile_state.attributes, "path_with_polymorphic", None
2379 )
2380 if with_poly_entity:
2381 to_adapt = with_poly_entity
2382 else:
2383 insp = inspect(self.entity)
2384 if insp.is_aliased_class:
2385 alt_selectable = insp.selectable
2386 else:
2387 alt_selectable = None
2388
2389 to_adapt = orm_util.AliasedClass(
2390 self.mapper,
2391 alias=(
2392 alt_selectable._anonymous_fromclause(flat=True)
2393 if alt_selectable is not None
2394 else None
2395 ),
2396 flat=True,
2397 use_mapper_path=True,
2398 )
2399
2400 to_adapt_insp = inspect(to_adapt)
2401
2402 clauses = to_adapt_insp._memo(
2403 ("joinedloader_ormadapter", self),
2404 orm_util.ORMAdapter,
2405 orm_util._TraceAdaptRole.JOINEDLOAD_MEMOIZED_ADAPTER,
2406 to_adapt_insp,
2407 equivalents=self.mapper._equivalent_columns,
2408 adapt_required=True,
2409 allow_label_resolve=False,
2410 anonymize_labels=True,
2411 )
2412
2413 assert clauses.is_aliased_class
2414
2415 innerjoin = (
2416 loadopt.local_opts.get("innerjoin", self.parent_property.innerjoin)
2417 if loadopt is not None
2418 else self.parent_property.innerjoin
2419 )
2420
2421 if not innerjoin:
2422 # if this is an outer join, all non-nested eager joins from
2423 # this path must also be outer joins
2424 chained_from_outerjoin = True
2425
2426 compile_state.create_eager_joins.append(
2427 (
2428 self._create_eager_join,
2429 entity,
2430 path,
2431 adapter,
2432 parentmapper,
2433 clauses,
2434 innerjoin,
2435 chained_from_outerjoin,
2436 loadopt._extra_criteria if loadopt else (),
2437 )
2438 )
2439
2440 add_to_collection = compile_state.secondary_columns
2441 path.set(compile_state.attributes, "eager_row_processor", clauses)
2442
2443 return clauses, adapter, add_to_collection, chained_from_outerjoin
2444
2445 def _create_eager_join(
2446 self,
2447 compile_state,
2448 query_entity,
2449 path,
2450 adapter,
2451 parentmapper,
2452 clauses,
2453 innerjoin,
2454 chained_from_outerjoin,
2455 extra_criteria,
2456 ):
2457 if parentmapper is None:
2458 localparent = query_entity.mapper
2459 else:
2460 localparent = parentmapper
2461
2462 # whether or not the Query will wrap the selectable in a subquery,
2463 # and then attach eager load joins to that (i.e., in the case of
2464 # LIMIT/OFFSET etc.)
2465 should_nest_selectable = compile_state._should_nest_selectable
2466
2467 query_entity_key = None
2468
2469 if (
2470 query_entity not in compile_state.eager_joins
2471 and not should_nest_selectable
2472 and compile_state.from_clauses
2473 ):
2474 indexes = sql_util.find_left_clause_that_matches_given(
2475 compile_state.from_clauses, query_entity.selectable
2476 )
2477
2478 if len(indexes) > 1:
2479 # for the eager load case, I can't reproduce this right
2480 # now. For query.join() I can.
2481 raise sa_exc.InvalidRequestError(
2482 "Can't identify which query entity in which to joined "
2483 "eager load from. Please use an exact match when "
2484 "specifying the join path."
2485 )
2486
2487 if indexes:
2488 clause = compile_state.from_clauses[indexes[0]]
2489 # join to an existing FROM clause on the query.
2490 # key it to its list index in the eager_joins dict.
2491 # Query._compile_context will adapt as needed and
2492 # append to the FROM clause of the select().
2493 query_entity_key, default_towrap = indexes[0], clause
2494
2495 if query_entity_key is None:
2496 query_entity_key, default_towrap = (
2497 query_entity,
2498 query_entity.selectable,
2499 )
2500
2501 towrap = compile_state.eager_joins.setdefault(
2502 query_entity_key, default_towrap
2503 )
2504
2505 if adapter:
2506 if getattr(adapter, "is_aliased_class", False):
2507 # joining from an adapted entity. The adapted entity
2508 # might be a "with_polymorphic", so resolve that to our
2509 # specific mapper's entity before looking for our attribute
2510 # name on it.
2511 efm = adapter.aliased_insp._entity_for_mapper(
2512 localparent
2513 if localparent.isa(self.parent)
2514 else self.parent
2515 )
2516
2517 # look for our attribute on the adapted entity, else fall back
2518 # to our straight property
2519 onclause = getattr(efm.entity, self.key, self.parent_property)
2520 else:
2521 onclause = getattr(
2522 orm_util.AliasedClass(
2523 self.parent, adapter.selectable, use_mapper_path=True
2524 ),
2525 self.key,
2526 self.parent_property,
2527 )
2528
2529 else:
2530 onclause = self.parent_property
2531
2532 assert clauses.is_aliased_class
2533
2534 attach_on_outside = (
2535 not chained_from_outerjoin
2536 or not innerjoin
2537 or innerjoin == "unnested"
2538 or query_entity.entity_zero.represents_outer_join
2539 )
2540
2541 extra_join_criteria = extra_criteria
2542 additional_entity_criteria = compile_state.global_attributes.get(
2543 ("additional_entity_criteria", self.mapper), ()
2544 )
2545 if additional_entity_criteria:
2546 extra_join_criteria += tuple(
2547 ae._resolve_where_criteria(self.mapper)
2548 for ae in additional_entity_criteria
2549 if ae.propagate_to_loaders
2550 )
2551
2552 if attach_on_outside:
2553 # this is the "classic" eager join case.
2554 eagerjoin = orm_util._ORMJoin(
2555 towrap,
2556 clauses.aliased_insp,
2557 onclause,
2558 isouter=not innerjoin
2559 or query_entity.entity_zero.represents_outer_join
2560 or (chained_from_outerjoin and isinstance(towrap, sql.Join)),
2561 _left_memo=self.parent,
2562 _right_memo=path[self.mapper],
2563 _extra_criteria=extra_join_criteria,
2564 )
2565 else:
2566 # all other cases are innerjoin=='nested' approach
2567 eagerjoin = self._splice_nested_inner_join(
2568 path, path[-2], towrap, clauses, onclause, extra_join_criteria
2569 )
2570
2571 compile_state.eager_joins[query_entity_key] = eagerjoin
2572
2573 # send a hint to the Query as to where it may "splice" this join
2574 eagerjoin.stop_on = query_entity.selectable
2575
2576 if not parentmapper:
2577 # for parentclause that is the non-eager end of the join,
2578 # ensure all the parent cols in the primaryjoin are actually
2579 # in the
2580 # columns clause (i.e. are not deferred), so that aliasing applied
2581 # by the Query propagates those columns outward.
2582 # This has the effect
2583 # of "undefering" those columns.
2584 for col in sql_util._find_columns(
2585 self.parent_property.primaryjoin
2586 ):
2587 if localparent.persist_selectable.c.contains_column(col):
2588 if adapter:
2589 col = adapter.columns[col]
2590 compile_state._append_dedupe_col_collection(
2591 col, compile_state.primary_columns
2592 )
2593
2594 if self.parent_property.order_by:
2595 compile_state.eager_order_by += tuple(
2596 (eagerjoin._target_adapter.copy_and_process)(
2597 util.to_list(self.parent_property.order_by)
2598 )
2599 )
2600
2601 def _splice_nested_inner_join(
2602 self,
2603 path,
2604 entity_we_want_to_splice_onto,
2605 join_obj,
2606 clauses,
2607 onclause,
2608 extra_criteria,
2609 entity_inside_join_structure: Union[
2610 Mapper, None, Literal[False]
2611 ] = False,
2612 detected_existing_path: Optional[path_registry.PathRegistry] = None,
2613 ):
2614 # recursive fn to splice a nested join into an existing one.
2615 # entity_inside_join_structure=False means this is the outermost call,
2616 # and it should return a value. entity_inside_join_structure=<mapper>
2617 # indicates we've descended into a join and are looking at a FROM
2618 # clause representing this mapper; if this is not
2619 # entity_we_want_to_splice_onto then return None to end the recursive
2620 # branch
2621
2622 assert entity_we_want_to_splice_onto is path[-2]
2623
2624 if entity_inside_join_structure is False:
2625 assert isinstance(join_obj, orm_util._ORMJoin)
2626
2627 if isinstance(join_obj, sql.selectable.FromGrouping):
2628 # FromGrouping - continue descending into the structure
2629 return self._splice_nested_inner_join(
2630 path,
2631 entity_we_want_to_splice_onto,
2632 join_obj.element,
2633 clauses,
2634 onclause,
2635 extra_criteria,
2636 entity_inside_join_structure,
2637 )
2638 elif isinstance(join_obj, orm_util._ORMJoin):
2639 # _ORMJoin - continue descending into the structure
2640
2641 join_right_path = join_obj._right_memo
2642
2643 # see if right side of join is viable
2644 target_join = self._splice_nested_inner_join(
2645 path,
2646 entity_we_want_to_splice_onto,
2647 join_obj.right,
2648 clauses,
2649 onclause,
2650 extra_criteria,
2651 entity_inside_join_structure=(
2652 join_right_path[-1].mapper
2653 if join_right_path is not None
2654 else None
2655 ),
2656 )
2657
2658 if target_join is not None:
2659 # for a right splice, attempt to flatten out
2660 # a JOIN b JOIN c JOIN .. to avoid needless
2661 # parenthesis nesting
2662 if not join_obj.isouter and not target_join.isouter:
2663 eagerjoin = join_obj._splice_into_center(target_join)
2664 else:
2665 eagerjoin = orm_util._ORMJoin(
2666 join_obj.left,
2667 target_join,
2668 join_obj.onclause,
2669 isouter=join_obj.isouter,
2670 _left_memo=join_obj._left_memo,
2671 )
2672
2673 eagerjoin._target_adapter = target_join._target_adapter
2674 return eagerjoin
2675
2676 else:
2677 # see if left side of join is viable
2678 target_join = self._splice_nested_inner_join(
2679 path,
2680 entity_we_want_to_splice_onto,
2681 join_obj.left,
2682 clauses,
2683 onclause,
2684 extra_criteria,
2685 entity_inside_join_structure=join_obj._left_memo,
2686 detected_existing_path=join_right_path,
2687 )
2688
2689 if target_join is not None:
2690 eagerjoin = orm_util._ORMJoin(
2691 target_join,
2692 join_obj.right,
2693 join_obj.onclause,
2694 isouter=join_obj.isouter,
2695 _right_memo=join_obj._right_memo,
2696 )
2697 eagerjoin._target_adapter = target_join._target_adapter
2698 return eagerjoin
2699
2700 # neither side viable, return None, or fail if this was the top
2701 # most call
2702 if entity_inside_join_structure is False:
2703 assert (
2704 False
2705 ), "assertion failed attempting to produce joined eager loads"
2706 return None
2707
2708 # reached an endpoint (e.g. a table that's mapped, or an alias of that
2709 # table). determine if we can use this endpoint to splice onto
2710
2711 # is this the entity we want to splice onto in the first place?
2712 if not entity_we_want_to_splice_onto.isa(entity_inside_join_structure):
2713 return None
2714
2715 # path check. if we know the path how this join endpoint got here,
2716 # lets look at our path we are satisfying and see if we're in the
2717 # wrong place. This is specifically for when our entity may
2718 # appear more than once in the path, issue #11449
2719 # updated in issue #11965.
2720 if detected_existing_path and len(detected_existing_path) > 2:
2721 # this assertion is currently based on how this call is made,
2722 # where given a join_obj, the call will have these parameters as
2723 # entity_inside_join_structure=join_obj._left_memo
2724 # and entity_inside_join_structure=join_obj._right_memo.mapper
2725 assert detected_existing_path[-3] is entity_inside_join_structure
2726
2727 # from that, see if the path we are targeting matches the
2728 # "existing" path of this join all the way up to the midpoint
2729 # of this join object (e.g. the relationship).
2730 # if not, then this is not our target
2731 #
2732 # a test condition where this test is false looks like:
2733 #
2734 # desired splice: Node->kind->Kind
2735 # path of desired splice: NodeGroup->nodes->Node->kind
2736 # path we've located: NodeGroup->nodes->Node->common_node->Node
2737 #
2738 # above, because we want to splice kind->Kind onto
2739 # NodeGroup->nodes->Node, this is not our path because it actually
2740 # goes more steps than we want into self-referential
2741 # ->common_node->Node
2742 #
2743 # a test condition where this test is true looks like:
2744 #
2745 # desired splice: B->c2s->C2
2746 # path of desired splice: A->bs->B->c2s
2747 # path we've located: A->bs->B->c1s->C1
2748 #
2749 # above, we want to splice c2s->C2 onto B, and the located path
2750 # shows that the join ends with B->c1s->C1. so we will
2751 # add another join onto that, which would create a "branch" that
2752 # we might represent in a pseudopath as:
2753 #
2754 # B->c1s->C1
2755 # ->c2s->C2
2756 #
2757 # i.e. A JOIN B ON <bs> JOIN C1 ON <c1s>
2758 # JOIN C2 ON <c2s>
2759 #
2760
2761 if detected_existing_path[0:-2] != path.path[0:-1]:
2762 return None
2763
2764 return orm_util._ORMJoin(
2765 join_obj,
2766 clauses.aliased_insp,
2767 onclause,
2768 isouter=False,
2769 _left_memo=entity_inside_join_structure,
2770 _right_memo=path[path[-1].mapper],
2771 _extra_criteria=extra_criteria,
2772 )
2773
2774 def _create_eager_adapter(self, context, result, adapter, path, loadopt):
2775 compile_state = context.compile_state
2776
2777 user_defined_adapter = (
2778 self._init_user_defined_eager_proc(
2779 loadopt, compile_state, context.attributes
2780 )
2781 if loadopt
2782 else False
2783 )
2784
2785 if user_defined_adapter is not False:
2786 decorator = user_defined_adapter
2787 # user defined eagerloads are part of the "primary"
2788 # portion of the load.
2789 # the adapters applied to the Query should be honored.
2790 if compile_state.compound_eager_adapter and decorator:
2791 decorator = decorator.wrap(
2792 compile_state.compound_eager_adapter
2793 )
2794 elif compile_state.compound_eager_adapter:
2795 decorator = compile_state.compound_eager_adapter
2796 else:
2797 decorator = path.get(
2798 compile_state.attributes, "eager_row_processor"
2799 )
2800 if decorator is None:
2801 return False
2802
2803 if self.mapper._result_has_identity_key(result, decorator):
2804 return decorator
2805 else:
2806 # no identity key - don't return a row
2807 # processor, will cause a degrade to lazy
2808 return False
2809
2810 def create_row_processor(
2811 self,
2812 context,
2813 query_entity,
2814 path,
2815 loadopt,
2816 mapper,
2817 result,
2818 adapter,
2819 populators,
2820 ):
2821
2822 if not context.compile_state.compile_options._enable_eagerloads:
2823 return
2824
2825 if not self.parent.class_manager[self.key].impl.supports_population:
2826 raise sa_exc.InvalidRequestError(
2827 "'%s' does not support object "
2828 "population - eager loading cannot be applied." % self
2829 )
2830
2831 if self.uselist:
2832 context.loaders_require_uniquing = True
2833
2834 our_path = path[self.parent_property]
2835
2836 eager_adapter = self._create_eager_adapter(
2837 context, result, adapter, our_path, loadopt
2838 )
2839
2840 if eager_adapter is not False:
2841 key = self.key
2842
2843 _instance = loading._instance_processor(
2844 query_entity,
2845 self.mapper,
2846 context,
2847 result,
2848 our_path[self.entity],
2849 eager_adapter,
2850 )
2851
2852 if not self.uselist:
2853 self._create_scalar_loader(context, key, _instance, populators)
2854 else:
2855 self._create_collection_loader(
2856 context, key, _instance, populators
2857 )
2858 else:
2859 self.parent_property._get_strategy(
2860 (("lazy", "select"),)
2861 ).create_row_processor(
2862 context,
2863 query_entity,
2864 path,
2865 loadopt,
2866 mapper,
2867 result,
2868 adapter,
2869 populators,
2870 )
2871
2872 def _create_collection_loader(self, context, key, _instance, populators):
2873 def load_collection_from_joined_new_row(state, dict_, row):
2874 # note this must unconditionally clear out any existing collection.
2875 # an existing collection would be present only in the case of
2876 # populate_existing().
2877 collection = attributes.init_state_collection(state, dict_, key)
2878 result_list = util.UniqueAppender(
2879 collection, "append_without_event"
2880 )
2881 context.attributes[(state, key)] = result_list
2882 inst = _instance(row)
2883 if inst is not None:
2884 result_list.append(inst)
2885
2886 def load_collection_from_joined_existing_row(state, dict_, row):
2887 if (state, key) in context.attributes:
2888 result_list = context.attributes[(state, key)]
2889 else:
2890 # appender_key can be absent from context.attributes
2891 # with isnew=False when self-referential eager loading
2892 # is used; the same instance may be present in two
2893 # distinct sets of result columns
2894 collection = attributes.init_state_collection(
2895 state, dict_, key
2896 )
2897 result_list = util.UniqueAppender(
2898 collection, "append_without_event"
2899 )
2900 context.attributes[(state, key)] = result_list
2901 inst = _instance(row)
2902 if inst is not None:
2903 result_list.append(inst)
2904
2905 def load_collection_from_joined_exec(state, dict_, row):
2906 _instance(row)
2907
2908 populators["new"].append(
2909 (self.key, load_collection_from_joined_new_row)
2910 )
2911 populators["existing"].append(
2912 (self.key, load_collection_from_joined_existing_row)
2913 )
2914 if context.invoke_all_eagers:
2915 populators["eager"].append(
2916 (self.key, load_collection_from_joined_exec)
2917 )
2918
2919 def _create_scalar_loader(self, context, key, _instance, populators):
2920 def load_scalar_from_joined_new_row(state, dict_, row):
2921 # set a scalar object instance directly on the parent
2922 # object, bypassing InstrumentedAttribute event handlers.
2923 dict_[key] = _instance(row)
2924
2925 def load_scalar_from_joined_existing_row(state, dict_, row):
2926 # call _instance on the row, even though the object has
2927 # been created, so that we further descend into properties
2928 existing = _instance(row)
2929
2930 # conflicting value already loaded, this shouldn't happen
2931 if key in dict_:
2932 if existing is not dict_[key]:
2933 util.warn(
2934 "Multiple rows returned with "
2935 "uselist=False for eagerly-loaded attribute '%s' "
2936 % self
2937 )
2938 else:
2939 # this case is when one row has multiple loads of the
2940 # same entity (e.g. via aliasing), one has an attribute
2941 # that the other doesn't.
2942 dict_[key] = existing
2943
2944 def load_scalar_from_joined_exec(state, dict_, row):
2945 _instance(row)
2946
2947 populators["new"].append((self.key, load_scalar_from_joined_new_row))
2948 populators["existing"].append(
2949 (self.key, load_scalar_from_joined_existing_row)
2950 )
2951 if context.invoke_all_eagers:
2952 populators["eager"].append(
2953 (self.key, load_scalar_from_joined_exec)
2954 )
2955
2956
2957@log.class_logger
2958@relationships.RelationshipProperty.strategy_for(lazy="selectin")
2959class _SelectInLoader(_PostLoader, util.MemoizedSlots):
2960 __slots__ = (
2961 "join_depth",
2962 "omit_join",
2963 "_parent_alias",
2964 "_query_info",
2965 "_fallback_query_info",
2966 )
2967
2968 query_info = collections.namedtuple(
2969 "queryinfo",
2970 [
2971 "load_only_child",
2972 "load_with_join",
2973 "in_expr",
2974 "pk_cols",
2975 "zero_idx",
2976 "n_pk",
2977 "child_lookup_cols",
2978 ],
2979 )
2980
2981 _chunksize = 500
2982
2983 @classmethod
2984 def _set_chunksize(cls, loadopt) -> int:
2985 if loadopt is None or hasattr(loadopt, "local_opts") is None:
2986 return cls._chunksize
2987
2988 user_input = loadopt.local_opts.get("chunksize", None)
2989 if user_input is None:
2990 return cls._chunksize
2991 elif not isinstance(user_input, int) or user_input < 1:
2992 raise sa_exc.ArgumentError(
2993 f"'chunksize={user_input}' is not an appropriate input, "
2994 f"please use a positive non-zero integer."
2995 )
2996 return user_input
2997
2998 def __init__(self, parent, strategy_key):
2999 super().__init__(parent, strategy_key)
3000 self.join_depth = self.parent_property.join_depth
3001 is_m2o = self.parent_property.direction is interfaces.MANYTOONE
3002 is_m2m = self.parent_property.direction is interfaces.MANYTOMANY
3003
3004 if self.parent_property.omit_join is not None:
3005 self.omit_join = self.parent_property.omit_join
3006 else:
3007 lazyloader = self.parent_property._get_strategy(
3008 (("lazy", "select"),)
3009 )
3010 if is_m2o:
3011 self.omit_join = lazyloader.use_get
3012 elif is_m2m and not self.parent_property._is_self_referential:
3013 join_cond = self.parent_property._join_condition
3014 self.omit_join = join_cond.secondary_covers_parent_primary_key
3015 else:
3016 self.omit_join = self.parent._get_clause[0].compare(
3017 lazyloader._rev_lazywhere,
3018 use_proxies=True,
3019 compare_keys=False,
3020 equivalents=self.parent._equivalent_columns,
3021 )
3022
3023 if self.omit_join:
3024 if is_m2o:
3025 self._query_info = self._init_for_omit_join_m2o()
3026 self._fallback_query_info = self._init_for_join()
3027 else:
3028 self._query_info = self._init_for_omit_join()
3029 else:
3030 self._query_info = self._init_for_join()
3031
3032 def _init_for_omit_join(self):
3033 pk_to_fk = dict(
3034 self.parent_property._join_condition.local_remote_pairs
3035 )
3036 pk_to_fk.update(
3037 (equiv, pk_to_fk[k])
3038 for k in list(pk_to_fk)
3039 for equiv in self.parent._equivalent_columns.get(k, ())
3040 )
3041
3042 pk_cols = fk_cols = [
3043 pk_to_fk[col] for col in self.parent.primary_key if col in pk_to_fk
3044 ]
3045 if len(fk_cols) > 1:
3046 in_expr = sql.tuple_(*fk_cols)
3047 zero_idx = False
3048 else:
3049 in_expr = fk_cols[0]
3050 zero_idx = True
3051
3052 return self.query_info(
3053 False, False, in_expr, pk_cols, zero_idx, len(pk_cols), None
3054 )
3055
3056 def _init_for_omit_join_m2o(self):
3057 pk_cols = self.mapper.primary_key
3058 if len(pk_cols) > 1:
3059 in_expr = sql.tuple_(*pk_cols)
3060 zero_idx = False
3061 else:
3062 in_expr = pk_cols[0]
3063 zero_idx = True
3064
3065 lazyloader = self.parent_property._get_strategy((("lazy", "select"),))
3066 lookup_cols = [lazyloader._equated_columns[pk] for pk in pk_cols]
3067
3068 return self.query_info(
3069 True, False, in_expr, pk_cols, zero_idx, len(pk_cols), lookup_cols
3070 )
3071
3072 def _init_for_join(self):
3073 self._parent_alias = AliasedClass(self.parent.class_)
3074 pa_insp = inspect(self._parent_alias)
3075 pk_cols = [
3076 pa_insp._adapt_element(col) for col in self.parent.primary_key
3077 ]
3078 if len(pk_cols) > 1:
3079 in_expr = sql.tuple_(*pk_cols)
3080 zero_idx = False
3081 else:
3082 in_expr = pk_cols[0]
3083 zero_idx = True
3084 return self.query_info(
3085 False, True, in_expr, pk_cols, zero_idx, len(pk_cols), None
3086 )
3087
3088 def init_class_attribute(self, mapper):
3089 self.parent_property._get_strategy(
3090 (("lazy", "select"),)
3091 ).init_class_attribute(mapper)
3092
3093 def create_row_processor(
3094 self,
3095 context,
3096 query_entity,
3097 path,
3098 loadopt,
3099 mapper,
3100 result,
3101 adapter,
3102 populators,
3103 ):
3104 if context.refresh_state:
3105 return self._immediateload_create_row_processor(
3106 context,
3107 query_entity,
3108 path,
3109 loadopt,
3110 mapper,
3111 result,
3112 adapter,
3113 populators,
3114 )
3115
3116 (
3117 effective_path,
3118 run_loader,
3119 execution_options,
3120 recursion_depth,
3121 ) = self._setup_for_recursion(
3122 context, path, loadopt, join_depth=self.join_depth
3123 )
3124
3125 if not run_loader:
3126 return
3127
3128 if not context.compile_state.compile_options._enable_eagerloads:
3129 return
3130
3131 if not self.parent.class_manager[self.key].impl.supports_population:
3132 raise sa_exc.InvalidRequestError(
3133 "'%s' does not support object "
3134 "population - eager loading cannot be applied." % self
3135 )
3136
3137 # a little dance here as the "path" is still something that only
3138 # semi-tracks the exact series of things we are loading, still not
3139 # telling us about with_polymorphic() and stuff like that when it's at
3140 # the root.. the initial MapperEntity is more accurate for this case.
3141 if len(path) == 1:
3142 if not orm_util._entity_isa(query_entity.entity_zero, self.parent):
3143 return
3144 elif not orm_util._entity_isa(
3145 path[-1], self.parent
3146 ) and not self.parent.isa(path[-1].mapper):
3147 # second check accommodates a polymorphic entity where
3148 # the path has been normalized to the base mapper but
3149 # self.parent is a subclass mapper, e.g.
3150 # joinedload(A.b.of_type(poly)).selectinload(poly.Sub.rel)
3151 # Fixes #13209.
3152 return
3153
3154 selectin_path = effective_path
3155
3156 path_w_prop = path[self.parent_property]
3157
3158 # build up a path indicating the path from the leftmost
3159 # entity to the thing we're subquery loading.
3160 with_poly_entity = path_w_prop.get(
3161 context.attributes, "path_with_polymorphic", None
3162 )
3163 if with_poly_entity is not None:
3164 effective_entity = inspect(with_poly_entity)
3165 else:
3166 effective_entity = self.entity
3167
3168 loading._PostLoad.callable_for_path(
3169 context,
3170 selectin_path,
3171 self.parent,
3172 self.parent_property,
3173 self._load_for_path,
3174 effective_entity,
3175 loadopt,
3176 recursion_depth,
3177 execution_options,
3178 )
3179
3180 def _load_for_path(
3181 self,
3182 context,
3183 path,
3184 states,
3185 load_only,
3186 effective_entity,
3187 loadopt,
3188 recursion_depth,
3189 execution_options,
3190 ):
3191 if load_only and self.key not in load_only:
3192 return
3193
3194 query_info = self._query_info
3195
3196 if query_info.load_only_child:
3197 our_states = collections.defaultdict(list)
3198 none_states = []
3199
3200 mapper = self.parent
3201
3202 # attribute keys for the lookup columns; when these are
3203 # present in a state's dict, reading them directly is
3204 # equivalent to the PASSIVE_NO_FETCH attribute lookup below.
3205 # whether or not a key is present can vary per state, e.g.
3206 # individual instances may have the attribute expired or
3207 # deferred, so this is determined state-by-state
3208 get_related_ident = mapper._state_ident_getter(
3209 query_info.child_lookup_cols,
3210 passive=attributes.PASSIVE_NO_FETCH,
3211 )
3212
3213 for state, overwrite in states:
3214 state_dict = state.dict
3215 related_ident = get_related_ident(state, state_dict)
3216 # if the loaded parent objects do not have the foreign key
3217 # to the related item loaded, then degrade into the joined
3218 # version of selectinload
3219 if LoaderCallableStatus.PASSIVE_NO_RESULT in related_ident:
3220 query_info = self._fallback_query_info
3221 break
3222
3223 # organize states into lists keyed to particular foreign
3224 # key values.
3225 if None not in related_ident:
3226 our_states[related_ident].append(
3227 (state, state_dict, overwrite)
3228 )
3229 else:
3230 # For FK values that have None, add them to a
3231 # separate collection that will be populated separately
3232 none_states.append((state, state_dict, overwrite))
3233
3234 # note the above conditional may have changed query_info
3235 if not query_info.load_only_child:
3236 our_states = [
3237 (state.key[1], state, state.dict, overwrite)
3238 for state, overwrite in states
3239 ]
3240
3241 pk_cols = query_info.pk_cols
3242 in_expr = query_info.in_expr
3243
3244 if not query_info.load_with_join:
3245 # in "omit join" mode, the primary key column and the
3246 # "in" expression are in terms of the related entity. So
3247 # if the related entity is polymorphic or otherwise aliased,
3248 # we need to adapt our "pk_cols" and "in_expr" to that
3249 # entity. in non-"omit join" mode, these are against the
3250 # parent entity and do not need adaption.
3251 if effective_entity.is_aliased_class:
3252 pk_cols = [
3253 effective_entity._adapt_element(col) for col in pk_cols
3254 ]
3255 in_expr = effective_entity._adapt_element(in_expr)
3256
3257 entity_sql = effective_entity.__clause_element__()
3258 q = Select._create_raw_select(
3259 _raw_columns=[*pk_cols, entity_sql],
3260 _compile_options=_ORMCompileState.default_compile_options,
3261 _propagate_attrs={
3262 "compile_state_plugin": "orm",
3263 "plugin_subject": effective_entity,
3264 },
3265 )
3266
3267 if (
3268 self.parent_property.secondary is not None
3269 and self.omit_join is True
3270 ):
3271 # The secondaryjoin condition is used to connect the
3272 # secondary table to the related entity,
3273 # and is required for composite foreign keys where SQLAlchemy
3274 # cannot determine the join condition.
3275 q = q.select_from(self.parent_property.secondary).join(
3276 entity_sql, self.parent_property._join_condition.secondaryjoin
3277 )
3278 elif not query_info.load_with_join:
3279 # the pk columns in the "omit_join" case are raw, non-annotated
3280 # columns, so to ensure the Query knows its primary entity, we
3281 # add it explicitly. Using annotated columns here would hit a
3282 # performance issue detailed in issue #4347.
3283 q = q.select_from(effective_entity)
3284 else:
3285 # in the non-omit_join case, the pk columns are against the
3286 # annotated/mapped column of the parent entity, but the #4347
3287 # issue does not occur in this case.
3288 q = q.select_from(self._parent_alias).join(
3289 getattr(self._parent_alias, self.parent_property.key).of_type(
3290 effective_entity
3291 )
3292 )
3293
3294 q = q.filter(in_expr.in_(sql.bindparam("primary_keys")))
3295
3296 # a test which exercises what these comments talk about is
3297 # test_selectin_relations.py -> test_twolevel_selectin_w_polymorphic
3298 #
3299 # effective_entity above is given to us in terms of the cached
3300 # statement, namely this one:
3301 orig_query = context.compile_state.select_statement
3302
3303 # the actual statement that was requested is this one:
3304 # context_query = context.user_passed_query
3305 #
3306 # that's not the cached one, however. So while it is of the identical
3307 # structure, if it has entities like AliasedInsp, which we get from
3308 # aliased() or with_polymorphic(), the AliasedInsp will likely be a
3309 # different object identity each time, and will not match up
3310 # hashing-wise to the corresponding AliasedInsp that's in the
3311 # cached query, meaning it won't match on paths and loader lookups
3312 # and loaders like this one will be skipped if it is used in options.
3313 #
3314 # as it turns out, standard loader options like selectinload(),
3315 # lazyload() that have a path need
3316 # to come from the cached query so that the AliasedInsp etc. objects
3317 # that are in the query line up with the object that's in the path
3318 # of the strategy object. however other options like
3319 # with_loader_criteria() that doesn't have a path (has a fixed entity)
3320 # and needs to have access to the latest closure state in order to
3321 # be correct, we need to use the uncached one.
3322 #
3323 # as of #8399 we let the loader option itself figure out what it
3324 # wants to do given cached and uncached version of itself.
3325
3326 effective_path = path[self.parent_property]
3327
3328 if orig_query is context.user_passed_query:
3329 new_options = orig_query._with_options
3330 else:
3331 cached_options = orig_query._with_options
3332 uncached_options = context.user_passed_query._with_options
3333
3334 # propagate compile state options from the original query,
3335 # updating their "extra_criteria" as necessary.
3336 # note this will create a different cache key than
3337 # "orig" options if extra_criteria is present, because the copy
3338 # of extra_criteria will have different boundparam than that of
3339 # the QueryableAttribute in the path
3340 new_options = [
3341 orig_opt._adapt_cached_option_to_uncached_option(
3342 context, uncached_opt
3343 )
3344 for orig_opt, uncached_opt in zip(
3345 cached_options, uncached_options
3346 )
3347 ]
3348
3349 if loadopt and loadopt._extra_criteria:
3350 new_options += (
3351 orm_util.LoaderCriteriaOption(
3352 effective_entity,
3353 loadopt._generate_extra_criteria(context),
3354 ),
3355 )
3356
3357 if recursion_depth is not None:
3358 effective_path = effective_path._truncate_recursive()
3359
3360 q = q.options(*new_options)
3361
3362 q = q._update_compile_options({"_current_path": effective_path})
3363 if context.populate_existing:
3364 q = q.execution_options(populate_existing=True)
3365
3366 if self.parent_property.order_by:
3367 if not query_info.load_with_join:
3368 eager_order_by = self.parent_property.order_by
3369 if effective_entity.is_aliased_class:
3370 eager_order_by = [
3371 effective_entity._adapt_element(elem)
3372 for elem in eager_order_by
3373 ]
3374 q = q.order_by(*eager_order_by)
3375 else:
3376
3377 def _setup_outermost_orderby(compile_context):
3378 compile_context.eager_order_by += tuple(
3379 util.to_list(self.parent_property.order_by)
3380 )
3381
3382 q = q._add_compile_state_func(
3383 _setup_outermost_orderby, self.parent_property
3384 )
3385
3386 chunksize = self._set_chunksize(loadopt)
3387
3388 if query_info.load_only_child:
3389 self._load_via_child(
3390 our_states,
3391 none_states,
3392 query_info,
3393 q,
3394 context,
3395 execution_options,
3396 chunksize,
3397 )
3398 else:
3399 self._load_via_parent(
3400 our_states,
3401 query_info,
3402 q,
3403 context,
3404 execution_options,
3405 chunksize,
3406 )
3407
3408 def _load_via_child(
3409 self,
3410 our_states,
3411 none_states,
3412 query_info,
3413 q,
3414 context,
3415 execution_options,
3416 chunksize,
3417 ):
3418 uselist = self.uselist
3419 n_pk = query_info.n_pk
3420
3421 # this sort is really for the benefit of the unit tests
3422 our_keys = sorted(our_states)
3423 while our_keys:
3424 chunk = our_keys[0:chunksize]
3425 our_keys = our_keys[chunksize:]
3426 primary_keys = [
3427 key[0] if query_info.zero_idx else key for key in chunk
3428 ]
3429 result = context.session.execute(
3430 q,
3431 params={"primary_keys": primary_keys},
3432 execution_options=execution_options,
3433 )
3434 if result.context is not None and result.context.requires_uniquing:
3435 rows = result.unique()
3436 else:
3437 rows = result._raw_all_tuples()
3438 data = {row[:n_pk]: row[n_pk] for row in rows}
3439
3440 for key in chunk:
3441 # for a real foreign key and no concurrent changes to the
3442 # DB while running this method, "key" is always present in
3443 # data. However, for primaryjoins without real foreign keys
3444 # a non-None primaryjoin condition may still refer to no
3445 # related object.
3446 related_obj = data.get(key, None)
3447 for state, dict_, overwrite in our_states[key]:
3448 if not overwrite and self.key in dict_:
3449 continue
3450
3451 state.get_impl(self.key).set_committed_value(
3452 state,
3453 dict_,
3454 related_obj if not uselist else [related_obj],
3455 )
3456 # populate none states with empty value / collection
3457 for state, dict_, overwrite in none_states:
3458 if not overwrite and self.key in dict_:
3459 continue
3460
3461 # note it's OK if this is a uselist=True attribute, the empty
3462 # collection will be populated
3463 state.get_impl(self.key).set_committed_value(state, dict_, None)
3464
3465 def _load_via_parent(
3466 self, our_states, query_info, q, context, execution_options, chunksize
3467 ):
3468 uselist = self.uselist
3469 n_pk = query_info.n_pk
3470 _empty_result = () if uselist else None
3471
3472 while our_states:
3473 chunk = our_states[0:chunksize]
3474 our_states = our_states[chunksize:]
3475
3476 primary_keys = [
3477 key[0] if query_info.zero_idx else key
3478 for key, state, state_dict, overwrite in chunk
3479 ]
3480
3481 result = context.session.execute(
3482 q,
3483 params={"primary_keys": primary_keys},
3484 execution_options=execution_options,
3485 )
3486 if result.context is not None and result.context.requires_uniquing:
3487 rows = result.unique()
3488 else:
3489 rows = result._raw_all_tuples()
3490 data = collections.defaultdict(list)
3491 for row in rows:
3492 data[row[:n_pk]].append(row[n_pk])
3493
3494 for key, state, state_dict, overwrite in chunk:
3495 if not overwrite and self.key in state_dict:
3496 continue
3497
3498 collection = data.get(key, _empty_result)
3499
3500 if not uselist and collection:
3501 if len(collection) > 1:
3502 util.warn(
3503 "Multiple rows returned with "
3504 "uselist=False for eagerly-loaded "
3505 "attribute '%s' " % self
3506 )
3507 state.get_impl(self.key).set_committed_value(
3508 state, state_dict, collection[0]
3509 )
3510 else:
3511 # note that empty tuple set on uselist=False sets the
3512 # value to None
3513 state.get_impl(self.key).set_committed_value(
3514 state, state_dict, collection
3515 )
3516
3517
3518def _single_parent_validator(desc, prop):
3519 def _do_check(state, value, oldvalue, initiator):
3520 if value is not None and initiator.key == prop.key:
3521 hasparent = initiator.hasparent(attributes.instance_state(value))
3522 if hasparent and oldvalue is not value:
3523 raise sa_exc.InvalidRequestError(
3524 "Instance %s is already associated with an instance "
3525 "of %s via its %s attribute, and is only allowed a "
3526 "single parent."
3527 % (orm_util.instance_str(value), state.class_, prop),
3528 code="bbf1",
3529 )
3530 return value
3531
3532 def append(state, value, initiator):
3533 return _do_check(state, value, None, initiator)
3534
3535 def set_(state, value, oldvalue, initiator):
3536 return _do_check(state, value, oldvalue, initiator)
3537
3538 event.listen(
3539 desc, "append", append, raw=True, retval=True, active_history=True
3540 )
3541 event.listen(desc, "set", set_, raw=True, retval=True, active_history=True)