1# orm/strategies.py
2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: ignore-errors
8
9
10"""sqlalchemy.orm.interfaces.LoaderStrategy
11implementations, and related MapperOptions."""
12
13from __future__ import annotations
14
15import collections
16import itertools
17from typing import Any
18from typing import Dict
19from typing import Literal
20from typing import Optional
21from typing import Tuple
22from typing import TYPE_CHECKING
23from typing import Union
24
25from . import attributes
26from . import exc as orm_exc
27from . import interfaces
28from . import loading
29from . import path_registry
30from . import properties
31from . import query
32from . import relationships
33from . import unitofwork
34from . import util as orm_util
35from .base import _DEFER_FOR_STATE
36from .base import _RAISE_FOR_STATE
37from .base import _SET_DEFERRED_EXPIRED
38from .base import ATTR_WAS_SET
39from .base import LoaderCallableStatus
40from .base import PASSIVE_OFF
41from .base import PassiveFlag
42from .context import _column_descriptions
43from .context import _ORMCompileState
44from .context import _ORMSelectCompileState
45from .context import QueryContext
46from .interfaces import LoaderStrategy
47from .interfaces import StrategizedProperty
48from .session import _state_session
49from .state import InstanceState
50from .strategy_options import Load
51from .util import _none_only_set
52from .util import AliasedClass
53from .. import event
54from .. import exc as sa_exc
55from .. import inspect
56from .. import log
57from .. import sql
58from .. import util
59from ..sql import util as sql_util
60from ..sql import visitors
61from ..sql.selectable import LABEL_STYLE_TABLENAME_PLUS_COL
62from ..sql.selectable import Select
63
64if TYPE_CHECKING:
65 from .mapper import Mapper
66 from .relationships import RelationshipProperty
67 from ..sql.elements import ColumnElement
68
69
70def _register_attribute(
71 prop,
72 mapper,
73 useobject,
74 compare_function=None,
75 typecallable=None,
76 callable_=None,
77 proxy_property=None,
78 active_history=False,
79 impl_class=None,
80 default_scalar_value=None,
81 **kw,
82):
83 listen_hooks = []
84
85 uselist = useobject and prop.uselist
86
87 if useobject and prop.single_parent:
88 listen_hooks.append(_single_parent_validator)
89
90 if prop.key in prop.parent.validators:
91 fn, opts = prop.parent.validators[prop.key]
92 listen_hooks.append(
93 lambda desc, prop: orm_util._validator_events(
94 desc, prop.key, fn, **opts
95 )
96 )
97
98 if useobject:
99 listen_hooks.append(unitofwork._track_cascade_events)
100
101 # need to assemble backref listeners
102 # after the singleparentvalidator, mapper validator
103 if useobject:
104 backref = prop.back_populates
105 if backref and prop._effective_sync_backref:
106 listen_hooks.append(
107 lambda desc, prop: attributes._backref_listeners(
108 desc, backref, uselist
109 )
110 )
111
112 # a single MapperProperty is shared down a class inheritance
113 # hierarchy, so we set up attribute instrumentation and backref event
114 # for each mapper down the hierarchy.
115
116 # typically, "mapper" is the same as prop.parent, due to the way
117 # the configure_mappers() process runs, however this is not strongly
118 # enforced, and in the case of a second configure_mappers() run the
119 # mapper here might not be prop.parent; also, a subclass mapper may
120 # be called here before a superclass mapper. That is, can't depend
121 # on mappers not already being set up so we have to check each one.
122
123 for m in mapper.self_and_descendants:
124 if prop is m._props.get(
125 prop.key
126 ) and not m.class_manager._attr_has_impl(prop.key):
127 desc = attributes._register_attribute_impl(
128 m.class_,
129 prop.key,
130 parent_token=prop,
131 uselist=uselist,
132 compare_function=compare_function,
133 useobject=useobject,
134 trackparent=useobject
135 and (
136 prop.single_parent
137 or prop.direction is interfaces.ONETOMANY
138 ),
139 typecallable=typecallable,
140 callable_=callable_,
141 active_history=active_history,
142 default_scalar_value=default_scalar_value,
143 impl_class=impl_class,
144 send_modified_events=not useobject or not prop.viewonly,
145 doc=prop.doc,
146 **kw,
147 )
148
149 for hook in listen_hooks:
150 hook(desc, prop)
151
152
153@properties.ColumnProperty.strategy_for(instrument=False, deferred=False)
154class _UninstrumentedColumnLoader(LoaderStrategy):
155 """Represent a non-instrumented MapperProperty.
156
157 The polymorphic_on argument of mapper() often results in this,
158 if the argument is against the with_polymorphic selectable.
159
160 """
161
162 __slots__ = ("columns",)
163
164 def __init__(self, parent, strategy_key):
165 super().__init__(parent, strategy_key)
166 self.columns = self.parent_property.columns
167
168 def setup_query(
169 self,
170 compile_state,
171 query_entity,
172 path,
173 loadopt,
174 adapter,
175 column_collection=None,
176 **kwargs,
177 ):
178 for c in self.columns:
179 if adapter:
180 c = adapter.columns[c]
181 compile_state._append_dedupe_col_collection(c, column_collection)
182
183 def create_row_processor(
184 self,
185 context,
186 query_entity,
187 path,
188 loadopt,
189 mapper,
190 result,
191 adapter,
192 populators,
193 ):
194 pass
195
196
197@log.class_logger
198@properties.ColumnProperty.strategy_for(instrument=True, deferred=False)
199class _ColumnLoader(LoaderStrategy):
200 """Provide loading behavior for a :class:`.ColumnProperty`."""
201
202 __slots__ = "columns", "is_composite"
203
204 def __init__(self, parent, strategy_key):
205 super().__init__(parent, strategy_key)
206 self.columns = self.parent_property.columns
207 self.is_composite = hasattr(self.parent_property, "composite_class")
208
209 def setup_query(
210 self,
211 compile_state,
212 query_entity,
213 path,
214 loadopt,
215 adapter,
216 column_collection,
217 memoized_populators,
218 check_for_adapt=False,
219 **kwargs,
220 ):
221 for c in self.columns:
222 if adapter:
223 if check_for_adapt:
224 c = adapter.adapt_check_present(c)
225 if c is None:
226 return
227 else:
228 c = adapter.columns[c]
229
230 compile_state._append_dedupe_col_collection(c, column_collection)
231
232 fetch = self.columns[0]
233 if adapter:
234 fetch = adapter.columns[fetch]
235 if fetch is None:
236 # None happens here only for dml bulk_persistence cases
237 # when context.DMLReturningColFilter is used
238 return
239
240 memoized_populators[self.parent_property] = fetch
241
242 def init_class_attribute(self, mapper):
243 self.is_class_level = True
244 coltype = self.columns[0].type
245 # TODO: check all columns ? check for foreign key as well?
246 active_history = (
247 self.parent_property.active_history
248 or self.columns[0].primary_key
249 or (
250 mapper.version_id_col is not None
251 and mapper._columntoproperty.get(mapper.version_id_col, None)
252 is self.parent_property
253 )
254 )
255
256 _register_attribute(
257 self.parent_property,
258 mapper,
259 useobject=False,
260 compare_function=coltype.compare_values,
261 active_history=active_history,
262 default_scalar_value=self.parent_property._default_scalar_value,
263 )
264
265 def create_row_processor(
266 self,
267 context,
268 query_entity,
269 path,
270 loadopt,
271 mapper,
272 result,
273 adapter,
274 populators,
275 ):
276 # look through list of columns represented here
277 # to see which, if any, is present in the row.
278
279 for col in self.columns:
280 if adapter:
281 col = adapter.columns[col]
282 getter = result._getter(col, False)
283 if getter:
284 populators["quick"].append((self.key, getter))
285 break
286 else:
287 populators["expire"].append((self.key, True))
288
289
290@log.class_logger
291@properties.ColumnProperty.strategy_for(query_expression=True)
292class _ExpressionColumnLoader(_ColumnLoader):
293 def __init__(self, parent, strategy_key):
294 super().__init__(parent, strategy_key)
295
296 # compare to the "default" expression that is mapped in
297 # the column. If it's sql.null, we don't need to render
298 # unless an expr is passed in the options.
299 null = sql.null().label(None)
300 self._have_default_expression = any(
301 not c.compare(null) for c in self.parent_property.columns
302 )
303
304 def setup_query(
305 self,
306 compile_state,
307 query_entity,
308 path,
309 loadopt,
310 adapter,
311 column_collection,
312 memoized_populators,
313 **kwargs,
314 ):
315 columns = None
316 if loadopt and loadopt._extra_criteria:
317 columns = loadopt._extra_criteria
318
319 elif self._have_default_expression:
320 columns = self.parent_property.columns
321
322 if columns is None:
323 return
324
325 for c in columns:
326 if adapter:
327 c = adapter.columns[c]
328 compile_state._append_dedupe_col_collection(c, column_collection)
329
330 fetch = columns[0]
331 if adapter:
332 fetch = adapter.columns[fetch]
333 if fetch is None:
334 # None is not expected to be the result of any
335 # adapter implementation here, however there may be theoretical
336 # usages of returning() with context.DMLReturningColFilter
337 return
338
339 memoized_populators[self.parent_property] = fetch
340
341 # if the column being loaded is the polymorphic discriminator,
342 # and we have a with_expression() providing the actual column,
343 # update the query_entity to use the actual column instead of
344 # the default expression
345 if (
346 query_entity._polymorphic_discriminator is self.columns[0]
347 and loadopt
348 and loadopt._extra_criteria
349 ):
350 query_entity._polymorphic_discriminator = columns[0]
351
352 def create_row_processor(
353 self,
354 context,
355 query_entity,
356 path,
357 loadopt,
358 mapper,
359 result,
360 adapter,
361 populators,
362 ):
363 # look through list of columns represented here
364 # to see which, if any, is present in the row.
365 if loadopt and loadopt._extra_criteria:
366 columns = loadopt._extra_criteria
367
368 for col in columns:
369 if adapter:
370 col = adapter.columns[col]
371 getter = result._getter(col, False)
372 if getter:
373 populators["quick"].append((self.key, getter))
374 break
375 else:
376 populators["expire"].append((self.key, True))
377
378 def init_class_attribute(self, mapper):
379 self.is_class_level = True
380
381 _register_attribute(
382 self.parent_property,
383 mapper,
384 useobject=False,
385 compare_function=self.columns[0].type.compare_values,
386 accepts_scalar_loader=False,
387 default_scalar_value=self.parent_property._default_scalar_value,
388 )
389
390
391@log.class_logger
392@properties.ColumnProperty.strategy_for(deferred=True, instrument=True)
393@properties.ColumnProperty.strategy_for(
394 deferred=True, instrument=True, raiseload=True
395)
396@properties.ColumnProperty.strategy_for(do_nothing=True)
397class _DeferredColumnLoader(LoaderStrategy):
398 """Provide loading behavior for a deferred :class:`.ColumnProperty`."""
399
400 __slots__ = "columns", "group", "raiseload"
401
402 def __init__(self, parent, strategy_key):
403 super().__init__(parent, strategy_key)
404 if hasattr(self.parent_property, "composite_class"):
405 raise NotImplementedError(
406 "Deferred loading for composite types not implemented yet"
407 )
408 self.raiseload = self.strategy_opts.get("raiseload", False)
409 self.columns = self.parent_property.columns
410 self.group = self.parent_property.group
411
412 def create_row_processor(
413 self,
414 context,
415 query_entity,
416 path,
417 loadopt,
418 mapper,
419 result,
420 adapter,
421 populators,
422 ):
423 # for a DeferredColumnLoader, this method is only used during a
424 # "row processor only" query; see test_deferred.py ->
425 # tests with "rowproc_only" in their name. As of the 1.0 series,
426 # loading._instance_processor doesn't use a "row processing" function
427 # to populate columns, instead it uses data in the "populators"
428 # dictionary. Normally, the DeferredColumnLoader.setup_query()
429 # sets up that data in the "memoized_populators" dictionary
430 # and "create_row_processor()" here is never invoked.
431
432 if (
433 context.refresh_state
434 and context.query._compile_options._only_load_props
435 and self.key in context.query._compile_options._only_load_props
436 ):
437 self.parent_property._get_strategy(
438 (("deferred", False), ("instrument", True))
439 ).create_row_processor(
440 context,
441 query_entity,
442 path,
443 loadopt,
444 mapper,
445 result,
446 adapter,
447 populators,
448 )
449
450 elif not self.is_class_level:
451 if self.raiseload:
452 set_deferred_for_local_state = (
453 self.parent_property._raise_column_loader
454 )
455 else:
456 set_deferred_for_local_state = (
457 self.parent_property._deferred_column_loader
458 )
459 populators["new"].append((self.key, set_deferred_for_local_state))
460 else:
461 populators["expire"].append((self.key, False))
462
463 def init_class_attribute(self, mapper):
464 self.is_class_level = True
465
466 _register_attribute(
467 self.parent_property,
468 mapper,
469 useobject=False,
470 compare_function=self.columns[0].type.compare_values,
471 callable_=self._load_for_state,
472 load_on_unexpire=False,
473 default_scalar_value=self.parent_property._default_scalar_value,
474 )
475
476 def setup_query(
477 self,
478 compile_state,
479 query_entity,
480 path,
481 loadopt,
482 adapter,
483 column_collection,
484 memoized_populators,
485 only_load_props=None,
486 **kw,
487 ):
488 if (
489 (
490 compile_state.compile_options._render_for_subquery
491 and self.parent_property._renders_in_subqueries
492 )
493 or (
494 loadopt
495 and set(self.columns).intersection(
496 self.parent._should_undefer_in_wildcard
497 )
498 )
499 or (
500 loadopt
501 and self.group
502 and loadopt.local_opts.get(
503 "undefer_group_%s" % self.group, False
504 )
505 )
506 or (only_load_props and self.key in only_load_props)
507 ):
508 self.parent_property._get_strategy(
509 (("deferred", False), ("instrument", True))
510 ).setup_query(
511 compile_state,
512 query_entity,
513 path,
514 loadopt,
515 adapter,
516 column_collection,
517 memoized_populators,
518 **kw,
519 )
520 elif self.is_class_level:
521 memoized_populators[self.parent_property] = _SET_DEFERRED_EXPIRED
522 elif not self.raiseload:
523 memoized_populators[self.parent_property] = _DEFER_FOR_STATE
524 else:
525 memoized_populators[self.parent_property] = _RAISE_FOR_STATE
526
527 def _load_for_state(self, state, passive):
528 if not state.key:
529 return LoaderCallableStatus.ATTR_EMPTY
530
531 if not passive & PassiveFlag.SQL_OK:
532 return LoaderCallableStatus.PASSIVE_NO_RESULT
533
534 localparent = state.manager.mapper
535
536 if self.group:
537 toload = [
538 p.key
539 for p in localparent.iterate_properties
540 if isinstance(p, StrategizedProperty)
541 and isinstance(p.strategy, _DeferredColumnLoader)
542 and p.group == self.group
543 ]
544 else:
545 toload = [self.key]
546
547 # narrow the keys down to just those which have no history
548 group = [k for k in toload if k in state.unmodified]
549
550 session = _state_session(state)
551 if session is None:
552 raise orm_exc.DetachedInstanceError(
553 "Parent instance %s is not bound to a Session; "
554 "deferred load operation of attribute '%s' cannot proceed"
555 % (orm_util.state_str(state), self.key)
556 )
557
558 if self.raiseload:
559 self._invoke_raise_load(state, passive, "raise")
560
561 loading._load_scalar_attributes(
562 state.mapper, state, set(group), PASSIVE_OFF
563 )
564
565 return LoaderCallableStatus.ATTR_WAS_SET
566
567 def _invoke_raise_load(self, state, passive, lazy):
568 raise sa_exc.InvalidRequestError(
569 "'%s' is not available due to raiseload=True" % (self,)
570 )
571
572
573class _LoadDeferredColumns:
574 """serializable loader object used by DeferredColumnLoader"""
575
576 def __init__(self, key: str, raiseload: bool = False):
577 self.key = key
578 self.raiseload = raiseload
579
580 def __call__(self, state, passive=attributes.PASSIVE_OFF):
581 key = self.key
582
583 localparent = state.manager.mapper
584 prop = localparent._props[key]
585 if self.raiseload:
586 strategy_key = (
587 ("deferred", True),
588 ("instrument", True),
589 ("raiseload", True),
590 )
591 else:
592 strategy_key = (("deferred", True), ("instrument", True))
593 strategy = prop._get_strategy(strategy_key)
594 return strategy._load_for_state(state, passive)
595
596
597class _AbstractRelationshipLoader(LoaderStrategy):
598 """LoaderStratgies which deal with related objects."""
599
600 __slots__ = "mapper", "target", "uselist", "entity"
601
602 def __init__(self, parent, strategy_key):
603 super().__init__(parent, strategy_key)
604 self.mapper = self.parent_property.mapper
605 self.entity = self.parent_property.entity
606 self.target = self.parent_property.target
607 self.uselist = self.parent_property.uselist
608
609 def _immediateload_create_row_processor(
610 self,
611 context,
612 query_entity,
613 path,
614 loadopt,
615 mapper,
616 result,
617 adapter,
618 populators,
619 ):
620 return self.parent_property._get_strategy(
621 (("lazy", "immediate"),)
622 ).create_row_processor(
623 context,
624 query_entity,
625 path,
626 loadopt,
627 mapper,
628 result,
629 adapter,
630 populators,
631 )
632
633
634@log.class_logger
635@relationships.RelationshipProperty.strategy_for(do_nothing=True)
636class _DoNothingLoader(LoaderStrategy):
637 """Relationship loader that makes no change to the object's state.
638
639 Compared to NoLoader, this loader does not initialize the
640 collection/attribute to empty/none; the usual default LazyLoader will
641 take effect.
642
643 """
644
645
646@log.class_logger
647@relationships.RelationshipProperty.strategy_for(lazy="noload")
648@relationships.RelationshipProperty.strategy_for(lazy=None)
649class _NoLoader(_AbstractRelationshipLoader):
650 """Provide loading behavior for a :class:`.Relationship`
651 with "lazy=None".
652
653 """
654
655 __slots__ = ()
656
657 @util.deprecated(
658 "2.1",
659 "The ``noload`` loader strategy is deprecated and will be removed "
660 "in a future release. This option "
661 "produces incorrect results by returning ``None`` for related "
662 "items.",
663 )
664 def init_class_attribute(self, mapper):
665 self.is_class_level = True
666
667 _register_attribute(
668 self.parent_property,
669 mapper,
670 useobject=True,
671 typecallable=self.parent_property.collection_class,
672 )
673
674 def create_row_processor(
675 self,
676 context,
677 query_entity,
678 path,
679 loadopt,
680 mapper,
681 result,
682 adapter,
683 populators,
684 ):
685 def invoke_no_load(state, dict_, row):
686 if self.uselist:
687 attributes.init_state_collection(state, dict_, self.key)
688 else:
689 dict_[self.key] = None
690
691 populators["new"].append((self.key, invoke_no_load))
692
693
694@log.class_logger
695@relationships.RelationshipProperty.strategy_for(lazy=True)
696@relationships.RelationshipProperty.strategy_for(lazy="select")
697@relationships.RelationshipProperty.strategy_for(lazy="raise")
698@relationships.RelationshipProperty.strategy_for(lazy="raise_on_sql")
699@relationships.RelationshipProperty.strategy_for(lazy="baked_select")
700class _LazyLoader(
701 _AbstractRelationshipLoader, util.MemoizedSlots, log.Identified
702):
703 """Provide loading behavior for a :class:`.Relationship`
704 with "lazy=True", that is loads when first accessed.
705
706 """
707
708 __slots__ = (
709 "_lazywhere",
710 "_rev_lazywhere",
711 "_lazyload_reverse_option",
712 "_order_by",
713 "use_get",
714 "is_aliased_class",
715 "_bind_to_col",
716 "_equated_columns",
717 "_rev_bind_to_col",
718 "_rev_equated_columns",
719 "_simple_lazy_clause",
720 "_raise_always",
721 "_raise_on_sql",
722 )
723
724 _lazywhere: ColumnElement[bool]
725 _bind_to_col: Dict[str, ColumnElement[Any]]
726 _rev_lazywhere: ColumnElement[bool]
727 _rev_bind_to_col: Dict[str, ColumnElement[Any]]
728
729 parent_property: RelationshipProperty[Any]
730
731 def __init__(
732 self, parent: RelationshipProperty[Any], strategy_key: Tuple[Any, ...]
733 ):
734 super().__init__(parent, strategy_key)
735 self._raise_always = self.strategy_opts["lazy"] == "raise"
736 self._raise_on_sql = self.strategy_opts["lazy"] == "raise_on_sql"
737
738 self.is_aliased_class = inspect(self.entity).is_aliased_class
739
740 join_condition = self.parent_property._join_condition
741 (
742 self._lazywhere,
743 self._bind_to_col,
744 self._equated_columns,
745 ) = join_condition.create_lazy_clause()
746
747 (
748 self._rev_lazywhere,
749 self._rev_bind_to_col,
750 self._rev_equated_columns,
751 ) = join_condition.create_lazy_clause(reverse_direction=True)
752
753 if self.parent_property.order_by:
754 self._order_by = util.to_list(self.parent_property.order_by)
755 else:
756 self._order_by = None
757
758 self.logger.info("%s lazy loading clause %s", self, self._lazywhere)
759
760 # determine if our "lazywhere" clause is the same as the mapper's
761 # get() clause. then we can just use mapper.get()
762 #
763 # TODO: the "not self.uselist" can be taken out entirely; a m2o
764 # load that populates for a list (very unusual, but is possible with
765 # the API) can still set for "None" and the attribute system will
766 # populate as an empty list.
767 self.use_get = (
768 not self.is_aliased_class
769 and not self.uselist
770 and self.entity._get_clause[0].compare(
771 self._lazywhere,
772 use_proxies=True,
773 compare_keys=False,
774 equivalents=self.mapper._equivalent_columns,
775 )
776 )
777
778 if self.use_get:
779 for col in list(self._equated_columns):
780 if col in self.mapper._equivalent_columns:
781 for c in self.mapper._equivalent_columns[col]:
782 self._equated_columns[c] = self._equated_columns[col]
783
784 self.logger.info(
785 "%s will use Session.get() to optimize instance loads", self
786 )
787
788 def init_class_attribute(self, mapper):
789 self.is_class_level = True
790
791 _legacy_inactive_history_style = (
792 self.parent_property._legacy_inactive_history_style
793 )
794
795 if self.parent_property.active_history:
796 active_history = True
797 _deferred_history = False
798
799 elif (
800 self.parent_property.direction is not interfaces.MANYTOONE
801 or not self.use_get
802 ):
803 if _legacy_inactive_history_style:
804 active_history = True
805 _deferred_history = False
806 else:
807 active_history = False
808 _deferred_history = True
809 else:
810 active_history = _deferred_history = False
811
812 _register_attribute(
813 self.parent_property,
814 mapper,
815 useobject=True,
816 callable_=self._load_for_state,
817 typecallable=self.parent_property.collection_class,
818 active_history=active_history,
819 _deferred_history=_deferred_history,
820 )
821
822 def _memoized_attr__simple_lazy_clause(self):
823 lazywhere = self._lazywhere
824
825 criterion, bind_to_col = (lazywhere, self._bind_to_col)
826
827 params = []
828
829 def visit_bindparam(bindparam):
830 bindparam.unique = False
831
832 visitors.traverse(criterion, {}, {"bindparam": visit_bindparam})
833
834 def visit_bindparam(bindparam):
835 if bindparam._identifying_key in bind_to_col:
836 params.append(
837 (
838 bindparam.key,
839 bind_to_col[bindparam._identifying_key],
840 None,
841 )
842 )
843 elif bindparam.callable is None:
844 params.append((bindparam.key, None, bindparam.value))
845
846 criterion = visitors.cloned_traverse(
847 criterion, {}, {"bindparam": visit_bindparam}
848 )
849
850 return criterion, params
851
852 def _generate_lazy_clause(self, state, passive):
853 criterion, param_keys = self._simple_lazy_clause
854
855 if state is None:
856 return sql_util.adapt_criterion_to_null(
857 criterion, [key for key, ident, value in param_keys]
858 )
859
860 mapper = self.parent_property.parent
861
862 o = state.obj() # strong ref
863 dict_ = attributes.instance_dict(o)
864
865 if passive & PassiveFlag.INIT_OK:
866 passive ^= PassiveFlag.INIT_OK
867
868 params = {}
869 for key, ident, value in param_keys:
870 if ident is not None:
871 if passive and passive & PassiveFlag.LOAD_AGAINST_COMMITTED:
872 value = mapper._get_committed_state_attr_by_column(
873 state, dict_, ident, passive
874 )
875 else:
876 value = mapper._get_state_attr_by_column(
877 state, dict_, ident, passive
878 )
879
880 params[key] = value
881
882 return criterion, params
883
884 def _invoke_raise_load(self, state, passive, lazy):
885 raise sa_exc.InvalidRequestError(
886 "'%s' is not available due to lazy='%s'" % (self, lazy)
887 )
888
889 def _load_for_state(
890 self,
891 state,
892 passive,
893 loadopt=None,
894 extra_criteria=(),
895 extra_options=(),
896 alternate_effective_path=None,
897 execution_options=util.EMPTY_DICT,
898 ):
899 if not state.key and (
900 (
901 not self.parent_property.load_on_pending
902 and not state._load_pending
903 )
904 or not state.session_id
905 ):
906 return LoaderCallableStatus.ATTR_EMPTY
907
908 pending = not state.key
909 primary_key_identity = None
910
911 use_get = self.use_get and (not loadopt or not loadopt._extra_criteria)
912
913 if (not passive & PassiveFlag.SQL_OK and not use_get) or (
914 not passive & attributes.NON_PERSISTENT_OK and pending
915 ):
916 return LoaderCallableStatus.PASSIVE_NO_RESULT
917
918 if (
919 # we were given lazy="raise"
920 self._raise_always
921 # the no_raise history-related flag was not passed
922 and not passive & PassiveFlag.NO_RAISE
923 and (
924 # if we are use_get and related_object_ok is disabled,
925 # which means we are at most looking in the identity map
926 # for history purposes or otherwise returning
927 # PASSIVE_NO_RESULT, don't raise. This is also a
928 # history-related flag
929 not use_get
930 or passive & PassiveFlag.RELATED_OBJECT_OK
931 )
932 ):
933 self._invoke_raise_load(state, passive, "raise")
934
935 session = _state_session(state)
936 if not session:
937 if passive & PassiveFlag.NO_RAISE:
938 return LoaderCallableStatus.PASSIVE_NO_RESULT
939
940 raise orm_exc.DetachedInstanceError(
941 "Parent instance %s is not bound to a Session; "
942 "lazy load operation of attribute '%s' cannot proceed"
943 % (orm_util.state_str(state), self.key)
944 )
945
946 # if we have a simple primary key load, check the
947 # identity map without generating a Query at all
948 if use_get:
949 primary_key_identity = self._get_ident_for_use_get(
950 session, state, passive
951 )
952 if LoaderCallableStatus.PASSIVE_NO_RESULT in primary_key_identity:
953 return LoaderCallableStatus.PASSIVE_NO_RESULT
954 elif LoaderCallableStatus.NEVER_SET in primary_key_identity:
955 return LoaderCallableStatus.NEVER_SET
956
957 # test for None alone in primary_key_identity based on
958 # allow_partial_pks preference. PASSIVE_NO_RESULT and NEVER_SET
959 # have already been tested above
960 if not self.mapper.allow_partial_pks:
961 if _none_only_set.intersection(primary_key_identity):
962 return None
963 else:
964 if _none_only_set.issuperset(primary_key_identity):
965 return None
966
967 if (
968 self.key in state.dict
969 and not passive & PassiveFlag.DEFERRED_HISTORY_LOAD
970 ):
971 return LoaderCallableStatus.ATTR_WAS_SET
972
973 # look for this identity in the identity map. Delegate to the
974 # Query class in use, as it may have special rules for how it
975 # does this, including how it decides what the correct
976 # identity_token would be for this identity.
977
978 instance = session._identity_lookup(
979 self.entity,
980 primary_key_identity,
981 passive=passive,
982 lazy_loaded_from=state,
983 )
984
985 if instance is not None:
986 if instance is LoaderCallableStatus.PASSIVE_CLASS_MISMATCH:
987 return None
988 else:
989 return instance
990 elif (
991 not passive & PassiveFlag.SQL_OK
992 or not passive & PassiveFlag.RELATED_OBJECT_OK
993 ):
994 return LoaderCallableStatus.PASSIVE_NO_RESULT
995
996 return self._emit_lazyload(
997 session,
998 state,
999 primary_key_identity,
1000 passive,
1001 loadopt,
1002 extra_criteria,
1003 extra_options,
1004 alternate_effective_path,
1005 execution_options,
1006 )
1007
1008 def _get_ident_for_use_get(self, session, state, passive):
1009 instance_mapper = state.manager.mapper
1010
1011 if passive & PassiveFlag.LOAD_AGAINST_COMMITTED:
1012 get_attr = instance_mapper._get_committed_state_attr_by_column
1013 else:
1014 get_attr = instance_mapper._get_state_attr_by_column
1015
1016 dict_ = state.dict
1017
1018 return [
1019 get_attr(state, dict_, self._equated_columns[pk], passive=passive)
1020 for pk in self.mapper.primary_key
1021 ]
1022
1023 @util.preload_module("sqlalchemy.orm.strategy_options")
1024 def _emit_lazyload(
1025 self,
1026 session,
1027 state,
1028 primary_key_identity,
1029 passive,
1030 loadopt,
1031 extra_criteria,
1032 extra_options,
1033 alternate_effective_path,
1034 execution_options,
1035 ):
1036 strategy_options = util.preloaded.orm_strategy_options
1037
1038 clauseelement = self.entity.__clause_element__()
1039 stmt = Select._create_raw_select(
1040 _raw_columns=[clauseelement],
1041 _propagate_attrs=clauseelement._propagate_attrs,
1042 _compile_options=_ORMCompileState.default_compile_options,
1043 )
1044 load_options = QueryContext.default_load_options
1045
1046 load_options += {
1047 "_invoke_all_eagers": False,
1048 "_lazy_loaded_from": state,
1049 }
1050
1051 if self.parent_property.secondary is not None:
1052 stmt = stmt.select_from(
1053 self.mapper, self.parent_property.secondary
1054 )
1055
1056 pending = not state.key
1057
1058 # don't autoflush on pending
1059 if pending or passive & attributes.NO_AUTOFLUSH:
1060 stmt._execution_options = util.immutabledict({"autoflush": False})
1061
1062 use_get = self.use_get
1063
1064 if state.load_options or (loadopt and loadopt._extra_criteria):
1065 if alternate_effective_path is None:
1066 effective_path = state.load_path[self.parent_property]
1067 else:
1068 effective_path = alternate_effective_path[self.parent_property]
1069
1070 opts = state.load_options
1071
1072 if loadopt and loadopt._extra_criteria:
1073 use_get = False
1074 opts += (
1075 orm_util.LoaderCriteriaOption(self.entity, extra_criteria),
1076 )
1077
1078 stmt._with_options = opts
1079 elif alternate_effective_path is None:
1080 # this path is used if there are not already any options
1081 # in the query, but an event may want to add them
1082 effective_path = state.mapper._path_registry[self.parent_property]
1083 else:
1084 # added by immediateloader
1085 effective_path = alternate_effective_path[self.parent_property]
1086
1087 if extra_options:
1088 stmt._with_options += extra_options
1089
1090 stmt._compile_options += {"_current_path": effective_path}
1091
1092 if use_get:
1093 if self._raise_on_sql and not passive & PassiveFlag.NO_RAISE:
1094 self._invoke_raise_load(state, passive, "raise_on_sql")
1095
1096 return loading._load_on_pk_identity(
1097 session,
1098 stmt,
1099 primary_key_identity,
1100 load_options=load_options,
1101 execution_options=execution_options,
1102 )
1103
1104 if self._order_by:
1105 stmt._order_by_clauses = self._order_by
1106
1107 def _lazyload_reverse(compile_context):
1108 for rev in self.parent_property._reverse_property:
1109 # reverse props that are MANYTOONE are loading *this*
1110 # object from get(), so don't need to eager out to those.
1111 if (
1112 rev.direction is interfaces.MANYTOONE
1113 and rev._use_get
1114 and not isinstance(rev.strategy, _LazyLoader)
1115 ):
1116 strategy_options.Load._construct_for_existing_path(
1117 compile_context.compile_options._current_path[
1118 rev.parent
1119 ]
1120 ).lazyload(rev).process_compile_state(compile_context)
1121
1122 stmt = stmt._add_compile_state_func(
1123 _lazyload_reverse, self.parent_property
1124 )
1125
1126 lazy_clause, params = self._generate_lazy_clause(state, passive)
1127
1128 if execution_options:
1129 execution_options = util.EMPTY_DICT.merge_with(
1130 execution_options, {"_sa_orm_load_options": load_options}
1131 )
1132 else:
1133 execution_options = {
1134 "_sa_orm_load_options": load_options,
1135 }
1136
1137 if (
1138 self.key in state.dict
1139 and not passive & PassiveFlag.DEFERRED_HISTORY_LOAD
1140 ):
1141 return LoaderCallableStatus.ATTR_WAS_SET
1142
1143 if pending:
1144 if util.has_intersection(orm_util._none_set, params.values()):
1145 return None
1146
1147 elif util.has_intersection(orm_util._never_set, params.values()):
1148 return None
1149
1150 if self._raise_on_sql and not passive & PassiveFlag.NO_RAISE:
1151 self._invoke_raise_load(state, passive, "raise_on_sql")
1152
1153 stmt._where_criteria = (lazy_clause,)
1154
1155 result = session.execute(
1156 stmt, params, execution_options=execution_options
1157 )
1158
1159 result = result.unique().scalars().all()
1160
1161 if self.uselist:
1162 return result
1163 else:
1164 l = len(result)
1165 if l:
1166 if l > 1:
1167 util.warn(
1168 "Multiple rows returned with "
1169 "uselist=False for lazily-loaded attribute '%s' "
1170 % self.parent_property
1171 )
1172
1173 return result[0]
1174 else:
1175 return None
1176
1177 def create_row_processor(
1178 self,
1179 context,
1180 query_entity,
1181 path,
1182 loadopt,
1183 mapper,
1184 result,
1185 adapter,
1186 populators,
1187 ):
1188 key = self.key
1189
1190 if (
1191 context.load_options._is_user_refresh
1192 and context.query._compile_options._only_load_props
1193 and self.key in context.query._compile_options._only_load_props
1194 ):
1195 return self._immediateload_create_row_processor(
1196 context,
1197 query_entity,
1198 path,
1199 loadopt,
1200 mapper,
1201 result,
1202 adapter,
1203 populators,
1204 )
1205
1206 if not self.is_class_level or (loadopt and loadopt._extra_criteria):
1207 # we are not the primary manager for this attribute
1208 # on this class - set up a
1209 # per-instance lazyloader, which will override the
1210 # class-level behavior.
1211 # this currently only happens when using a
1212 # "lazyload" option on a "no load"
1213 # attribute - "eager" attributes always have a
1214 # class-level lazyloader installed.
1215 set_lazy_callable = (
1216 InstanceState._instance_level_callable_processor
1217 )(
1218 mapper.class_manager,
1219 _LoadLazyAttribute(
1220 key,
1221 self,
1222 loadopt,
1223 (
1224 loadopt._generate_extra_criteria(context)
1225 if loadopt._extra_criteria
1226 else None
1227 ),
1228 ),
1229 key,
1230 )
1231
1232 populators["new"].append((self.key, set_lazy_callable))
1233 elif context.populate_existing or mapper.always_refresh:
1234
1235 def reset_for_lazy_callable(state, dict_, row):
1236 # we are the primary manager for this attribute on
1237 # this class - reset its
1238 # per-instance attribute state, so that the class-level
1239 # lazy loader is
1240 # executed when next referenced on this instance.
1241 # this is needed in
1242 # populate_existing() types of scenarios to reset
1243 # any existing state.
1244 state._reset(dict_, key)
1245
1246 populators["new"].append((self.key, reset_for_lazy_callable))
1247
1248
1249class _LoadLazyAttribute:
1250 """semi-serializable loader object used by LazyLoader
1251
1252 Historically, this object would be carried along with instances that
1253 needed to run lazyloaders, so it had to be serializable to support
1254 cached instances.
1255
1256 this is no longer a general requirement, and the case where this object
1257 is used is exactly the case where we can't really serialize easily,
1258 which is when extra criteria in the loader option is present.
1259
1260 We can't reliably serialize that as it refers to mapped entities and
1261 AliasedClass objects that are local to the current process, which would
1262 need to be matched up on deserialize e.g. the sqlalchemy.ext.serializer
1263 approach.
1264
1265 """
1266
1267 def __init__(self, key, initiating_strategy, loadopt, extra_criteria):
1268 self.key = key
1269 self.strategy_key = initiating_strategy.strategy_key
1270 self.loadopt = loadopt
1271 self.extra_criteria = extra_criteria
1272
1273 def __getstate__(self):
1274 if self.extra_criteria is not None:
1275 util.warn(
1276 "Can't reliably serialize a lazyload() option that "
1277 "contains additional criteria; please use eager loading "
1278 "for this case"
1279 )
1280 return {
1281 "key": self.key,
1282 "strategy_key": self.strategy_key,
1283 "loadopt": self.loadopt,
1284 "extra_criteria": (),
1285 }
1286
1287 def __call__(self, state, passive=attributes.PASSIVE_OFF):
1288 key = self.key
1289 instance_mapper = state.manager.mapper
1290 prop = instance_mapper._props[key]
1291 strategy = prop._strategies[self.strategy_key]
1292
1293 return strategy._load_for_state(
1294 state,
1295 passive,
1296 loadopt=self.loadopt,
1297 extra_criteria=self.extra_criteria,
1298 )
1299
1300
1301class _PostLoader(_AbstractRelationshipLoader):
1302 """A relationship loader that emits a second SELECT statement."""
1303
1304 __slots__ = ()
1305
1306 def _setup_for_recursion(self, context, path, loadopt, join_depth=None):
1307 effective_path = (
1308 context.compile_state.current_path or orm_util.PathRegistry.root
1309 ) + path
1310
1311 top_level_context = context._get_top_level_context()
1312 execution_options = util.immutabledict(
1313 {"sa_top_level_orm_context": top_level_context}
1314 )
1315
1316 if loadopt:
1317 recursion_depth = loadopt.local_opts.get("recursion_depth", None)
1318 unlimited_recursion = recursion_depth == -1
1319 else:
1320 recursion_depth = None
1321 unlimited_recursion = False
1322
1323 if recursion_depth is not None:
1324 if not self.parent_property._is_self_referential:
1325 raise sa_exc.InvalidRequestError(
1326 f"recursion_depth option on relationship "
1327 f"{self.parent_property} not valid for "
1328 "non-self-referential relationship"
1329 )
1330 recursion_depth = context.execution_options.get(
1331 f"_recursion_depth_{id(self)}", recursion_depth
1332 )
1333
1334 if not unlimited_recursion and recursion_depth < 0:
1335 return (
1336 effective_path,
1337 False,
1338 execution_options,
1339 recursion_depth,
1340 )
1341
1342 if not unlimited_recursion:
1343 execution_options = execution_options.union(
1344 {
1345 f"_recursion_depth_{id(self)}": recursion_depth - 1,
1346 }
1347 )
1348
1349 if loading._PostLoad.path_exists(
1350 context, effective_path, self.parent_property
1351 ):
1352 return effective_path, False, execution_options, recursion_depth
1353
1354 path_w_prop = path[self.parent_property]
1355 effective_path_w_prop = effective_path[self.parent_property]
1356
1357 if not path_w_prop.contains(context.attributes, "loader"):
1358 if join_depth:
1359 if effective_path_w_prop.length / 2 > join_depth:
1360 return (
1361 effective_path,
1362 False,
1363 execution_options,
1364 recursion_depth,
1365 )
1366 elif effective_path_w_prop.contains_mapper(self.mapper):
1367 return (
1368 effective_path,
1369 False,
1370 execution_options,
1371 recursion_depth,
1372 )
1373
1374 return effective_path, True, execution_options, recursion_depth
1375
1376
1377@relationships.RelationshipProperty.strategy_for(lazy="immediate")
1378class _ImmediateLoader(_PostLoader):
1379 __slots__ = ("join_depth",)
1380
1381 def __init__(self, parent, strategy_key):
1382 super().__init__(parent, strategy_key)
1383 self.join_depth = self.parent_property.join_depth
1384
1385 def init_class_attribute(self, mapper):
1386 self.parent_property._get_strategy(
1387 (("lazy", "select"),)
1388 ).init_class_attribute(mapper)
1389
1390 def create_row_processor(
1391 self,
1392 context,
1393 query_entity,
1394 path,
1395 loadopt,
1396 mapper,
1397 result,
1398 adapter,
1399 populators,
1400 ):
1401 if not context.compile_state.compile_options._enable_eagerloads:
1402 return
1403
1404 (
1405 effective_path,
1406 run_loader,
1407 execution_options,
1408 recursion_depth,
1409 ) = self._setup_for_recursion(context, path, loadopt, self.join_depth)
1410
1411 if not run_loader:
1412 # this will not emit SQL and will only emit for a many-to-one
1413 # "use get" load. the "_RELATED" part means it may return
1414 # instance even if its expired, since this is a mutually-recursive
1415 # load operation.
1416 flags = attributes.PASSIVE_NO_FETCH_RELATED | PassiveFlag.NO_RAISE
1417 else:
1418 flags = attributes.PASSIVE_OFF | PassiveFlag.NO_RAISE
1419
1420 loading._PostLoad.callable_for_path(
1421 context,
1422 effective_path,
1423 self.parent,
1424 self.parent_property,
1425 self._load_for_path,
1426 loadopt,
1427 flags,
1428 recursion_depth,
1429 execution_options,
1430 )
1431
1432 def _load_for_path(
1433 self,
1434 context,
1435 path,
1436 states,
1437 load_only,
1438 loadopt,
1439 flags,
1440 recursion_depth,
1441 execution_options,
1442 ):
1443 if recursion_depth:
1444 new_opt = Load(loadopt.path.entity)
1445 new_opt.context = (
1446 loadopt,
1447 loadopt._recurse(),
1448 )
1449 alternate_effective_path = path._truncate_recursive()
1450 extra_options = (new_opt,)
1451 else:
1452 alternate_effective_path = path
1453 extra_options = ()
1454
1455 key = self.key
1456 lazyloader = self.parent_property._get_strategy((("lazy", "select"),))
1457 for state, overwrite in states:
1458 dict_ = state.dict
1459
1460 if overwrite or key not in dict_:
1461 value = lazyloader._load_for_state(
1462 state,
1463 flags,
1464 extra_options=extra_options,
1465 alternate_effective_path=alternate_effective_path,
1466 execution_options=execution_options,
1467 )
1468 if value not in (
1469 ATTR_WAS_SET,
1470 LoaderCallableStatus.PASSIVE_NO_RESULT,
1471 ):
1472 state.get_impl(key).set_committed_value(
1473 state, dict_, value
1474 )
1475
1476
1477@log.class_logger
1478@relationships.RelationshipProperty.strategy_for(lazy="subquery")
1479class _SubqueryLoader(_PostLoader):
1480 __slots__ = ("join_depth",)
1481
1482 def __init__(self, parent, strategy_key):
1483 super().__init__(parent, strategy_key)
1484 self.join_depth = self.parent_property.join_depth
1485
1486 def init_class_attribute(self, mapper):
1487 self.parent_property._get_strategy(
1488 (("lazy", "select"),)
1489 ).init_class_attribute(mapper)
1490
1491 def _get_leftmost(
1492 self,
1493 orig_query_entity_index,
1494 subq_path,
1495 current_compile_state,
1496 is_root,
1497 ):
1498 given_subq_path = subq_path
1499 subq_path = subq_path.path
1500 subq_mapper = orm_util._class_to_mapper(subq_path[0])
1501
1502 # determine attributes of the leftmost mapper
1503 if (
1504 self.parent.isa(subq_mapper)
1505 and self.parent_property is subq_path[1]
1506 ):
1507 leftmost_mapper, leftmost_prop = self.parent, self.parent_property
1508 else:
1509 leftmost_mapper, leftmost_prop = subq_mapper, subq_path[1]
1510
1511 if is_root:
1512 # the subq_path is also coming from cached state, so when we start
1513 # building up this path, it has to also be converted to be in terms
1514 # of the current state. this is for the specific case of the entity
1515 # is an AliasedClass against a subquery that's not otherwise going
1516 # to adapt
1517 new_subq_path = current_compile_state._entities[
1518 orig_query_entity_index
1519 ].entity_zero._path_registry[leftmost_prop]
1520 additional = len(subq_path) - len(new_subq_path)
1521 if additional:
1522 new_subq_path += path_registry.PathRegistry.coerce(
1523 subq_path[-additional:]
1524 )
1525 else:
1526 new_subq_path = given_subq_path
1527
1528 leftmost_cols = leftmost_prop.local_columns
1529
1530 leftmost_attr = [
1531 getattr(
1532 new_subq_path.path[0].entity,
1533 leftmost_mapper._columntoproperty[c].key,
1534 )
1535 for c in leftmost_cols
1536 ]
1537
1538 return leftmost_mapper, leftmost_attr, leftmost_prop, new_subq_path
1539
1540 def _generate_from_original_query(
1541 self,
1542 orig_compile_state,
1543 orig_query,
1544 leftmost_mapper,
1545 leftmost_attr,
1546 leftmost_relationship,
1547 orig_entity,
1548 ):
1549 # reformat the original query
1550 # to look only for significant columns
1551 q = orig_query._clone().correlate(None)
1552
1553 # LEGACY: make a Query back from the select() !!
1554 # This suits at least two legacy cases:
1555 # 1. applications which expect before_compile() to be called
1556 # below when we run .subquery() on this query (Keystone)
1557 # 2. applications which are doing subqueryload with complex
1558 # from_self() queries, as query.subquery() / .statement
1559 # has to do the full compile context for multiply-nested
1560 # from_self() (Neutron) - see test_subqload_from_self
1561 # for demo.
1562 q2 = query.Query.__new__(query.Query)
1563 q2.__dict__.update(q.__dict__)
1564 q = q2
1565
1566 # set the query's "FROM" list explicitly to what the
1567 # FROM list would be in any case, as we will be limiting
1568 # the columns in the SELECT list which may no longer include
1569 # all entities mentioned in things like WHERE, JOIN, etc.
1570 if not q._from_obj:
1571 q._enable_assertions = False
1572 q.select_from.non_generative(
1573 q,
1574 *{
1575 ent["entity"]
1576 for ent in _column_descriptions(
1577 orig_query, compile_state=orig_compile_state
1578 )
1579 if ent["entity"] is not None
1580 },
1581 )
1582
1583 # select from the identity columns of the outer (specifically, these
1584 # are the 'local_cols' of the property). This will remove other
1585 # columns from the query that might suggest the right entity which is
1586 # why we do set select_from above. The attributes we have are
1587 # coerced and adapted using the original query's adapter, which is
1588 # needed only for the case of adapting a subclass column to
1589 # that of a polymorphic selectable, e.g. we have
1590 # Engineer.primary_language and the entity is Person. All other
1591 # adaptations, e.g. from_self, select_entity_from(), will occur
1592 # within the new query when it compiles, as the compile_state we are
1593 # using here is only a partial one. If the subqueryload is from a
1594 # with_polymorphic() or other aliased() object, left_attr will already
1595 # be the correct attributes so no adaptation is needed.
1596 target_cols = orig_compile_state._adapt_col_list(
1597 [
1598 sql.coercions.expect(sql.roles.ColumnsClauseRole, o)
1599 for o in leftmost_attr
1600 ],
1601 orig_compile_state._get_current_adapter(),
1602 )
1603 q._raw_columns = target_cols
1604
1605 distinct_target_key = leftmost_relationship.distinct_target_key
1606
1607 if distinct_target_key is True:
1608 q._distinct = True
1609 elif distinct_target_key is None:
1610 # if target_cols refer to a non-primary key or only
1611 # part of a composite primary key, set the q as distinct
1612 for t in {c.table for c in target_cols}:
1613 if not set(target_cols).issuperset(t.primary_key):
1614 q._distinct = True
1615 break
1616
1617 # don't need ORDER BY if no limit/offset
1618 if not q._has_row_limiting_clause:
1619 q._order_by_clauses = ()
1620
1621 if q._distinct is True and q._order_by_clauses:
1622 # the logic to automatically add the order by columns to the query
1623 # when distinct is True is deprecated in the query
1624 to_add = sql_util.expand_column_list_from_order_by(
1625 target_cols, q._order_by_clauses
1626 )
1627 if to_add:
1628 q._set_entities(target_cols + to_add)
1629
1630 # the original query now becomes a subquery
1631 # which we'll join onto.
1632 # LEGACY: as "q" is a Query, the before_compile() event is invoked
1633 # here.
1634 embed_q = q.set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL).subquery()
1635 left_alias = orm_util.AliasedClass(
1636 leftmost_mapper, embed_q, use_mapper_path=True
1637 )
1638 return left_alias
1639
1640 def _prep_for_joins(self, left_alias, subq_path):
1641 # figure out what's being joined. a.k.a. the fun part
1642 to_join = []
1643 pairs = list(subq_path.pairs())
1644
1645 for i, (mapper, prop) in enumerate(pairs):
1646 if i > 0:
1647 # look at the previous mapper in the chain -
1648 # if it is as or more specific than this prop's
1649 # mapper, use that instead.
1650 # note we have an assumption here that
1651 # the non-first element is always going to be a mapper,
1652 # not an AliasedClass
1653
1654 prev_mapper = pairs[i - 1][1].mapper
1655 to_append = prev_mapper if prev_mapper.isa(mapper) else mapper
1656 else:
1657 to_append = mapper
1658
1659 to_join.append((to_append, prop.key))
1660
1661 # determine the immediate parent class we are joining from,
1662 # which needs to be aliased.
1663
1664 if len(to_join) < 2:
1665 # in the case of a one level eager load, this is the
1666 # leftmost "left_alias".
1667 parent_alias = left_alias
1668 else:
1669 info = inspect(to_join[-1][0])
1670 if info.is_aliased_class:
1671 parent_alias = info.entity
1672 else:
1673 # alias a plain mapper as we may be
1674 # joining multiple times
1675 parent_alias = orm_util.AliasedClass(
1676 info.entity, use_mapper_path=True
1677 )
1678
1679 local_cols = self.parent_property.local_columns
1680
1681 local_attr = [
1682 getattr(parent_alias, self.parent._columntoproperty[c].key)
1683 for c in local_cols
1684 ]
1685 return to_join, local_attr, parent_alias
1686
1687 def _apply_joins(
1688 self, q, to_join, left_alias, parent_alias, effective_entity
1689 ):
1690 ltj = len(to_join)
1691 if ltj == 1:
1692 to_join = [
1693 getattr(left_alias, to_join[0][1]).of_type(effective_entity)
1694 ]
1695 elif ltj == 2:
1696 to_join = [
1697 getattr(left_alias, to_join[0][1]).of_type(parent_alias),
1698 getattr(parent_alias, to_join[-1][1]).of_type(
1699 effective_entity
1700 ),
1701 ]
1702 elif ltj > 2:
1703 middle = [
1704 (
1705 (
1706 orm_util.AliasedClass(item[0])
1707 if not inspect(item[0]).is_aliased_class
1708 else item[0].entity
1709 ),
1710 item[1],
1711 )
1712 for item in to_join[1:-1]
1713 ]
1714 inner = []
1715
1716 while middle:
1717 item = middle.pop(0)
1718 attr = getattr(item[0], item[1])
1719 if middle:
1720 attr = attr.of_type(middle[0][0])
1721 else:
1722 attr = attr.of_type(parent_alias)
1723
1724 inner.append(attr)
1725
1726 to_join = (
1727 [getattr(left_alias, to_join[0][1]).of_type(inner[0].parent)]
1728 + inner
1729 + [
1730 getattr(parent_alias, to_join[-1][1]).of_type(
1731 effective_entity
1732 )
1733 ]
1734 )
1735
1736 for attr in to_join:
1737 q = q.join(attr)
1738
1739 return q
1740
1741 def _setup_options(
1742 self,
1743 context,
1744 q,
1745 subq_path,
1746 rewritten_path,
1747 orig_query,
1748 effective_entity,
1749 loadopt,
1750 ):
1751 # note that because the subqueryload object
1752 # does not reuse the cached query, instead always making
1753 # use of the current invoked query, while we have two queries
1754 # here (orig and context.query), they are both non-cached
1755 # queries and we can transfer the options as is without
1756 # adjusting for new criteria. Some work on #6881 / #6889
1757 # brought this into question.
1758 new_options = orig_query._with_options
1759
1760 if loadopt and loadopt._extra_criteria:
1761 new_options += (
1762 orm_util.LoaderCriteriaOption(
1763 self.entity,
1764 loadopt._generate_extra_criteria(context),
1765 ),
1766 )
1767
1768 # propagate loader options etc. to the new query.
1769 # these will fire relative to subq_path.
1770 q = q._with_current_path(rewritten_path)
1771 q = q.options(*new_options)
1772
1773 return q
1774
1775 def _setup_outermost_orderby(self, q):
1776 if self.parent_property.order_by:
1777
1778 def _setup_outermost_orderby(compile_context):
1779 compile_context.eager_order_by += tuple(
1780 util.to_list(self.parent_property.order_by)
1781 )
1782
1783 q = q._add_compile_state_func(
1784 _setup_outermost_orderby, self.parent_property
1785 )
1786
1787 return q
1788
1789 class _SubqCollections:
1790 """Given a :class:`_query.Query` used to emit the "subquery load",
1791 provide a load interface that executes the query at the
1792 first moment a value is needed.
1793
1794 """
1795
1796 __slots__ = (
1797 "session",
1798 "execution_options",
1799 "load_options",
1800 "params",
1801 "subq",
1802 "_data",
1803 )
1804
1805 def __init__(self, context, subq):
1806 # avoid creating a cycle by storing context
1807 # even though that's preferable
1808 self.session = context.session
1809 self.execution_options = context.execution_options
1810 self.load_options = context.load_options
1811 self.params = context.params or {}
1812 self.subq = subq
1813 self._data = None
1814
1815 def get(self, key, default):
1816 if self._data is None:
1817 self._load()
1818 return self._data.get(key, default)
1819
1820 def _load(self):
1821 self._data = collections.defaultdict(list)
1822
1823 q = self.subq
1824 assert q.session is None
1825
1826 q = q.with_session(self.session)
1827
1828 if self.load_options._populate_existing:
1829 q = q.populate_existing()
1830 # to work with baked query, the parameters may have been
1831 # updated since this query was created, so take these into account
1832
1833 rows = list(q.params(self.params))
1834 for k, v in itertools.groupby(rows, lambda x: x[1:]):
1835 self._data[k].extend(vv[0] for vv in v)
1836
1837 def loader(self, state, dict_, row):
1838 if self._data is None:
1839 self._load()
1840
1841 def _setup_query_from_rowproc(
1842 self,
1843 context,
1844 query_entity,
1845 path,
1846 entity,
1847 loadopt,
1848 adapter,
1849 ):
1850 compile_state = context.compile_state
1851 if (
1852 not compile_state.compile_options._enable_eagerloads
1853 or compile_state.compile_options._for_refresh_state
1854 ):
1855 return
1856
1857 orig_query_entity_index = compile_state._entities.index(query_entity)
1858 context.loaders_require_buffering = True
1859
1860 path = path[self.parent_property]
1861
1862 # build up a path indicating the path from the leftmost
1863 # entity to the thing we're subquery loading.
1864 with_poly_entity = path.get(
1865 compile_state.attributes, "path_with_polymorphic", None
1866 )
1867 if with_poly_entity is not None:
1868 effective_entity = with_poly_entity
1869 else:
1870 effective_entity = self.entity
1871
1872 subq_path, rewritten_path = context.query._execution_options.get(
1873 ("subquery_paths", None),
1874 (orm_util.PathRegistry.root, orm_util.PathRegistry.root),
1875 )
1876 is_root = subq_path is orm_util.PathRegistry.root
1877 subq_path = subq_path + path
1878 rewritten_path = rewritten_path + path
1879
1880 # use the current query being invoked, not the compile state
1881 # one. this is so that we get the current parameters. however,
1882 # it means we can't use the existing compile state, we have to make
1883 # a new one. other approaches include possibly using the
1884 # compiled query but swapping the params, seems only marginally
1885 # less time spent but more complicated
1886 orig_query = context.query._execution_options.get(
1887 ("orig_query", _SubqueryLoader), context.query
1888 )
1889
1890 # make a new compile_state for the query that's probably cached, but
1891 # we're sort of undoing a bit of that caching :(
1892 compile_state_cls = _ORMCompileState._get_plugin_class_for_plugin(
1893 orig_query, "orm"
1894 )
1895
1896 if orig_query._is_lambda_element:
1897 if context.load_options._lazy_loaded_from is None:
1898 util.warn(
1899 'subqueryloader for "%s" must invoke lambda callable '
1900 "at %r in "
1901 "order to produce a new query, decreasing the efficiency "
1902 "of caching for this statement. Consider using "
1903 "selectinload() for more effective full-lambda caching"
1904 % (self, orig_query)
1905 )
1906 orig_query = orig_query._resolved
1907
1908 # this is the more "quick" version, however it's not clear how
1909 # much of this we need. in particular I can't get a test to
1910 # fail if the "set_base_alias" is missing and not sure why that is.
1911 orig_compile_state = compile_state_cls._create_entities_collection(
1912 orig_query, legacy=False
1913 )
1914
1915 (
1916 leftmost_mapper,
1917 leftmost_attr,
1918 leftmost_relationship,
1919 rewritten_path,
1920 ) = self._get_leftmost(
1921 orig_query_entity_index,
1922 rewritten_path,
1923 orig_compile_state,
1924 is_root,
1925 )
1926
1927 # generate a new Query from the original, then
1928 # produce a subquery from it.
1929 left_alias = self._generate_from_original_query(
1930 orig_compile_state,
1931 orig_query,
1932 leftmost_mapper,
1933 leftmost_attr,
1934 leftmost_relationship,
1935 entity,
1936 )
1937
1938 # generate another Query that will join the
1939 # left alias to the target relationships.
1940 # basically doing a longhand
1941 # "from_self()". (from_self() itself not quite industrial
1942 # strength enough for all contingencies...but very close)
1943
1944 q = query.Query(effective_entity)
1945
1946 q._execution_options = context.query._execution_options.merge_with(
1947 context.execution_options,
1948 {
1949 ("orig_query", _SubqueryLoader): orig_query,
1950 ("subquery_paths", None): (subq_path, rewritten_path),
1951 },
1952 )
1953
1954 q = q._set_enable_single_crit(False)
1955 to_join, local_attr, parent_alias = self._prep_for_joins(
1956 left_alias, subq_path
1957 )
1958
1959 q = q.add_columns(*local_attr)
1960 q = self._apply_joins(
1961 q, to_join, left_alias, parent_alias, effective_entity
1962 )
1963
1964 q = self._setup_options(
1965 context,
1966 q,
1967 subq_path,
1968 rewritten_path,
1969 orig_query,
1970 effective_entity,
1971 loadopt,
1972 )
1973 q = self._setup_outermost_orderby(q)
1974
1975 return q
1976
1977 def create_row_processor(
1978 self,
1979 context,
1980 query_entity,
1981 path,
1982 loadopt,
1983 mapper,
1984 result,
1985 adapter,
1986 populators,
1987 ):
1988 if (
1989 loadopt
1990 and context.compile_state.statement is not None
1991 and context.compile_state.statement.is_dml
1992 ):
1993 util.warn_deprecated(
1994 "The subqueryload loader option is not compatible with DML "
1995 "statements such as INSERT, UPDATE. Only SELECT may be used."
1996 "This warning will become an exception in a future release.",
1997 "2.0",
1998 )
1999
2000 if context.refresh_state:
2001 return self._immediateload_create_row_processor(
2002 context,
2003 query_entity,
2004 path,
2005 loadopt,
2006 mapper,
2007 result,
2008 adapter,
2009 populators,
2010 )
2011
2012 _, run_loader, _, _ = self._setup_for_recursion(
2013 context, path, loadopt, self.join_depth
2014 )
2015 if not run_loader:
2016 return
2017
2018 if not isinstance(context.compile_state, _ORMSelectCompileState):
2019 # issue 7505 - subqueryload() in 1.3 and previous would silently
2020 # degrade for from_statement() without warning. this behavior
2021 # is restored here
2022 return
2023
2024 if not self.parent.class_manager[self.key].impl.supports_population:
2025 raise sa_exc.InvalidRequestError(
2026 "'%s' does not support object "
2027 "population - eager loading cannot be applied." % self
2028 )
2029
2030 # a little dance here as the "path" is still something that only
2031 # semi-tracks the exact series of things we are loading, still not
2032 # telling us about with_polymorphic() and stuff like that when it's at
2033 # the root.. the initial MapperEntity is more accurate for this case.
2034 if len(path) == 1:
2035 if not orm_util._entity_isa(query_entity.entity_zero, self.parent):
2036 return
2037 elif not orm_util._entity_isa(
2038 path[-1], self.parent
2039 ) and not self.parent.isa(path[-1].mapper):
2040 # second check accommodates a polymorphic entity where
2041 # the path has been normalized to the base mapper but
2042 # self.parent is a subclass mapper. Fixes #13209.
2043 return
2044
2045 subq = self._setup_query_from_rowproc(
2046 context,
2047 query_entity,
2048 path,
2049 path[-1],
2050 loadopt,
2051 adapter,
2052 )
2053
2054 if subq is None:
2055 return
2056
2057 assert subq.session is None
2058
2059 path = path[self.parent_property]
2060
2061 local_cols = self.parent_property.local_columns
2062
2063 # cache the loaded collections in the context
2064 # so that inheriting mappers don't re-load when they
2065 # call upon create_row_processor again
2066 collections = path.get(context.attributes, "collections")
2067 if collections is None:
2068 collections = self._SubqCollections(context, subq)
2069 path.set(context.attributes, "collections", collections)
2070
2071 if adapter:
2072 local_cols = [adapter.columns[c] for c in local_cols]
2073
2074 if self.uselist:
2075 self._create_collection_loader(
2076 context, result, collections, local_cols, populators
2077 )
2078 else:
2079 self._create_scalar_loader(
2080 context, result, collections, local_cols, populators
2081 )
2082
2083 def _create_collection_loader(
2084 self, context, result, collections, local_cols, populators
2085 ):
2086 tuple_getter = result._tuple_getter(local_cols)
2087
2088 def load_collection_from_subq(state, dict_, row):
2089 collection = collections.get(tuple_getter(row), ())
2090 state.get_impl(self.key).set_committed_value(
2091 state, dict_, collection
2092 )
2093
2094 def load_collection_from_subq_existing_row(state, dict_, row):
2095 if self.key not in dict_:
2096 load_collection_from_subq(state, dict_, row)
2097
2098 populators["new"].append((self.key, load_collection_from_subq))
2099 populators["existing"].append(
2100 (self.key, load_collection_from_subq_existing_row)
2101 )
2102
2103 if context.invoke_all_eagers:
2104 populators["eager"].append((self.key, collections.loader))
2105
2106 def _create_scalar_loader(
2107 self, context, result, collections, local_cols, populators
2108 ):
2109 tuple_getter = result._tuple_getter(local_cols)
2110
2111 def load_scalar_from_subq(state, dict_, row):
2112 collection = collections.get(tuple_getter(row), (None,))
2113 if len(collection) > 1:
2114 util.warn(
2115 "Multiple rows returned with "
2116 "uselist=False for eagerly-loaded attribute '%s' " % self
2117 )
2118
2119 scalar = collection[0]
2120 state.get_impl(self.key).set_committed_value(state, dict_, scalar)
2121
2122 def load_scalar_from_subq_existing_row(state, dict_, row):
2123 if self.key not in dict_:
2124 load_scalar_from_subq(state, dict_, row)
2125
2126 populators["new"].append((self.key, load_scalar_from_subq))
2127 populators["existing"].append(
2128 (self.key, load_scalar_from_subq_existing_row)
2129 )
2130 if context.invoke_all_eagers:
2131 populators["eager"].append((self.key, collections.loader))
2132
2133
2134@log.class_logger
2135@relationships.RelationshipProperty.strategy_for(lazy="joined")
2136@relationships.RelationshipProperty.strategy_for(lazy=False)
2137class _JoinedLoader(_AbstractRelationshipLoader):
2138 """Provide loading behavior for a :class:`.Relationship`
2139 using joined eager loading.
2140
2141 """
2142
2143 __slots__ = "join_depth"
2144
2145 def __init__(self, parent, strategy_key):
2146 super().__init__(parent, strategy_key)
2147 self.join_depth = self.parent_property.join_depth
2148
2149 def init_class_attribute(self, mapper):
2150 self.parent_property._get_strategy(
2151 (("lazy", "select"),)
2152 ).init_class_attribute(mapper)
2153
2154 def setup_query(
2155 self,
2156 compile_state,
2157 query_entity,
2158 path,
2159 loadopt,
2160 adapter,
2161 column_collection=None,
2162 parentmapper=None,
2163 chained_from_outerjoin=False,
2164 **kwargs,
2165 ):
2166 """Add a left outer join to the statement that's being constructed."""
2167
2168 if not compile_state.compile_options._enable_eagerloads:
2169 return
2170 elif (
2171 loadopt
2172 and compile_state.statement is not None
2173 and compile_state.statement.is_dml
2174 ):
2175 util.warn_deprecated(
2176 "The joinedload loader option is not compatible with DML "
2177 "statements such as INSERT, UPDATE. Only SELECT may be used."
2178 "This warning will become an exception in a future release.",
2179 "2.0",
2180 )
2181 elif self.uselist:
2182 compile_state.multi_row_eager_loaders = True
2183
2184 path = path[self.parent_property]
2185
2186 user_defined_adapter = (
2187 self._init_user_defined_eager_proc(
2188 loadopt, compile_state, compile_state.attributes
2189 )
2190 if loadopt
2191 else False
2192 )
2193
2194 if user_defined_adapter is not False:
2195 # setup an adapter but dont create any JOIN, assume it's already
2196 # in the query
2197 (
2198 clauses,
2199 adapter,
2200 add_to_collection,
2201 ) = self._setup_query_on_user_defined_adapter(
2202 compile_state,
2203 query_entity,
2204 path,
2205 adapter,
2206 user_defined_adapter,
2207 )
2208
2209 # don't do "wrap" for multi-row, we want to wrap
2210 # limited/distinct SELECT,
2211 # because we want to put the JOIN on the outside.
2212
2213 else:
2214 # if not via query option, check for
2215 # a cycle
2216 if not path.contains(compile_state.attributes, "loader"):
2217 if self.join_depth:
2218 if path.length / 2 > self.join_depth:
2219 return
2220 elif path.contains_mapper(self.mapper):
2221 return
2222
2223 # add the JOIN and create an adapter
2224 (
2225 clauses,
2226 adapter,
2227 add_to_collection,
2228 chained_from_outerjoin,
2229 ) = self._generate_row_adapter(
2230 compile_state,
2231 query_entity,
2232 path,
2233 loadopt,
2234 adapter,
2235 column_collection,
2236 parentmapper,
2237 chained_from_outerjoin,
2238 )
2239
2240 # for multi-row, we want to wrap limited/distinct SELECT,
2241 # because we want to put the JOIN on the outside.
2242 compile_state.eager_adding_joins = True
2243
2244 with_poly_entity = path.get(
2245 compile_state.attributes, "path_with_polymorphic", None
2246 )
2247 if with_poly_entity is not None:
2248 with_polymorphic = inspect(
2249 with_poly_entity
2250 ).with_polymorphic_mappers
2251 else:
2252 with_polymorphic = None
2253
2254 path = path[self.entity]
2255
2256 loading._setup_entity_query(
2257 compile_state,
2258 self.mapper,
2259 query_entity,
2260 path,
2261 clauses,
2262 add_to_collection,
2263 with_polymorphic=with_polymorphic,
2264 parentmapper=self.mapper,
2265 chained_from_outerjoin=chained_from_outerjoin,
2266 )
2267
2268 has_nones = util.NONE_SET.intersection(compile_state.secondary_columns)
2269
2270 if has_nones:
2271 if with_poly_entity is not None:
2272 raise sa_exc.InvalidRequestError(
2273 "Detected unaliased columns when generating joined "
2274 "load. Make sure to use aliased=True or flat=True "
2275 "when using joined loading with with_polymorphic()."
2276 )
2277 else:
2278 compile_state.secondary_columns = [
2279 c for c in compile_state.secondary_columns if c is not None
2280 ]
2281
2282 def _init_user_defined_eager_proc(
2283 self, loadopt, compile_state, target_attributes
2284 ):
2285 # check if the opt applies at all
2286 if "eager_from_alias" not in loadopt.local_opts:
2287 # nope
2288 return False
2289
2290 path = loadopt.path.parent
2291
2292 # the option applies. check if the "user_defined_eager_row_processor"
2293 # has been built up.
2294 adapter = path.get(
2295 compile_state.attributes, "user_defined_eager_row_processor", False
2296 )
2297 if adapter is not False:
2298 # just return it
2299 return adapter
2300
2301 # otherwise figure it out.
2302 alias = loadopt.local_opts["eager_from_alias"]
2303 root_mapper, prop = path[-2:]
2304
2305 if alias is not None:
2306 if isinstance(alias, str):
2307 alias = prop.target.alias(alias)
2308 adapter = orm_util.ORMAdapter(
2309 orm_util._TraceAdaptRole.JOINEDLOAD_USER_DEFINED_ALIAS,
2310 prop.mapper,
2311 selectable=alias,
2312 equivalents=prop.mapper._equivalent_columns,
2313 limit_on_entity=False,
2314 )
2315 else:
2316 if path.contains(
2317 compile_state.attributes, "path_with_polymorphic"
2318 ):
2319 with_poly_entity = path.get(
2320 compile_state.attributes, "path_with_polymorphic"
2321 )
2322 adapter = orm_util.ORMAdapter(
2323 orm_util._TraceAdaptRole.JOINEDLOAD_PATH_WITH_POLYMORPHIC,
2324 with_poly_entity,
2325 equivalents=prop.mapper._equivalent_columns,
2326 )
2327 else:
2328 adapter = compile_state._polymorphic_adapters.get(
2329 prop.mapper, None
2330 )
2331 path.set(
2332 target_attributes,
2333 "user_defined_eager_row_processor",
2334 adapter,
2335 )
2336
2337 return adapter
2338
2339 def _setup_query_on_user_defined_adapter(
2340 self, context, entity, path, adapter, user_defined_adapter
2341 ):
2342 # apply some more wrapping to the "user defined adapter"
2343 # if we are setting up the query for SQL render.
2344 adapter = entity._get_entity_clauses(context)
2345
2346 if adapter and user_defined_adapter:
2347 user_defined_adapter = user_defined_adapter.wrap(adapter)
2348 path.set(
2349 context.attributes,
2350 "user_defined_eager_row_processor",
2351 user_defined_adapter,
2352 )
2353 elif adapter:
2354 user_defined_adapter = adapter
2355 path.set(
2356 context.attributes,
2357 "user_defined_eager_row_processor",
2358 user_defined_adapter,
2359 )
2360
2361 add_to_collection = context.primary_columns
2362 return user_defined_adapter, adapter, add_to_collection
2363
2364 def _generate_row_adapter(
2365 self,
2366 compile_state,
2367 entity,
2368 path,
2369 loadopt,
2370 adapter,
2371 column_collection,
2372 parentmapper,
2373 chained_from_outerjoin,
2374 ):
2375 with_poly_entity = path.get(
2376 compile_state.attributes, "path_with_polymorphic", None
2377 )
2378 if with_poly_entity:
2379 to_adapt = with_poly_entity
2380 else:
2381 insp = inspect(self.entity)
2382 if insp.is_aliased_class:
2383 alt_selectable = insp.selectable
2384 else:
2385 alt_selectable = None
2386
2387 to_adapt = orm_util.AliasedClass(
2388 self.mapper,
2389 alias=(
2390 alt_selectable._anonymous_fromclause(flat=True)
2391 if alt_selectable is not None
2392 else None
2393 ),
2394 flat=True,
2395 use_mapper_path=True,
2396 )
2397
2398 to_adapt_insp = inspect(to_adapt)
2399
2400 clauses = to_adapt_insp._memo(
2401 ("joinedloader_ormadapter", self),
2402 orm_util.ORMAdapter,
2403 orm_util._TraceAdaptRole.JOINEDLOAD_MEMOIZED_ADAPTER,
2404 to_adapt_insp,
2405 equivalents=self.mapper._equivalent_columns,
2406 adapt_required=True,
2407 allow_label_resolve=False,
2408 anonymize_labels=True,
2409 )
2410
2411 assert clauses.is_aliased_class
2412
2413 innerjoin = (
2414 loadopt.local_opts.get("innerjoin", self.parent_property.innerjoin)
2415 if loadopt is not None
2416 else self.parent_property.innerjoin
2417 )
2418
2419 if not innerjoin:
2420 # if this is an outer join, all non-nested eager joins from
2421 # this path must also be outer joins
2422 chained_from_outerjoin = True
2423
2424 compile_state.create_eager_joins.append(
2425 (
2426 self._create_eager_join,
2427 entity,
2428 path,
2429 adapter,
2430 parentmapper,
2431 clauses,
2432 innerjoin,
2433 chained_from_outerjoin,
2434 loadopt._extra_criteria if loadopt else (),
2435 )
2436 )
2437
2438 add_to_collection = compile_state.secondary_columns
2439 path.set(compile_state.attributes, "eager_row_processor", clauses)
2440
2441 return clauses, adapter, add_to_collection, chained_from_outerjoin
2442
2443 def _create_eager_join(
2444 self,
2445 compile_state,
2446 query_entity,
2447 path,
2448 adapter,
2449 parentmapper,
2450 clauses,
2451 innerjoin,
2452 chained_from_outerjoin,
2453 extra_criteria,
2454 ):
2455 if parentmapper is None:
2456 localparent = query_entity.mapper
2457 else:
2458 localparent = parentmapper
2459
2460 # whether or not the Query will wrap the selectable in a subquery,
2461 # and then attach eager load joins to that (i.e., in the case of
2462 # LIMIT/OFFSET etc.)
2463 should_nest_selectable = compile_state._should_nest_selectable
2464
2465 query_entity_key = None
2466
2467 if (
2468 query_entity not in compile_state.eager_joins
2469 and not should_nest_selectable
2470 and compile_state.from_clauses
2471 ):
2472 indexes = sql_util.find_left_clause_that_matches_given(
2473 compile_state.from_clauses, query_entity.selectable
2474 )
2475
2476 if len(indexes) > 1:
2477 # for the eager load case, I can't reproduce this right
2478 # now. For query.join() I can.
2479 raise sa_exc.InvalidRequestError(
2480 "Can't identify which query entity in which to joined "
2481 "eager load from. Please use an exact match when "
2482 "specifying the join path."
2483 )
2484
2485 if indexes:
2486 clause = compile_state.from_clauses[indexes[0]]
2487 # join to an existing FROM clause on the query.
2488 # key it to its list index in the eager_joins dict.
2489 # Query._compile_context will adapt as needed and
2490 # append to the FROM clause of the select().
2491 query_entity_key, default_towrap = indexes[0], clause
2492
2493 if query_entity_key is None:
2494 query_entity_key, default_towrap = (
2495 query_entity,
2496 query_entity.selectable,
2497 )
2498
2499 towrap = compile_state.eager_joins.setdefault(
2500 query_entity_key, default_towrap
2501 )
2502
2503 if adapter:
2504 if getattr(adapter, "is_aliased_class", False):
2505 # joining from an adapted entity. The adapted entity
2506 # might be a "with_polymorphic", so resolve that to our
2507 # specific mapper's entity before looking for our attribute
2508 # name on it.
2509 efm = adapter.aliased_insp._entity_for_mapper(
2510 localparent
2511 if localparent.isa(self.parent)
2512 else self.parent
2513 )
2514
2515 # look for our attribute on the adapted entity, else fall back
2516 # to our straight property
2517 onclause = getattr(efm.entity, self.key, self.parent_property)
2518 else:
2519 onclause = getattr(
2520 orm_util.AliasedClass(
2521 self.parent, adapter.selectable, use_mapper_path=True
2522 ),
2523 self.key,
2524 self.parent_property,
2525 )
2526
2527 else:
2528 onclause = self.parent_property
2529
2530 assert clauses.is_aliased_class
2531
2532 attach_on_outside = (
2533 not chained_from_outerjoin
2534 or not innerjoin
2535 or innerjoin == "unnested"
2536 or query_entity.entity_zero.represents_outer_join
2537 )
2538
2539 extra_join_criteria = extra_criteria
2540 additional_entity_criteria = compile_state.global_attributes.get(
2541 ("additional_entity_criteria", self.mapper), ()
2542 )
2543 if additional_entity_criteria:
2544 extra_join_criteria += tuple(
2545 ae._resolve_where_criteria(self.mapper)
2546 for ae in additional_entity_criteria
2547 if ae.propagate_to_loaders
2548 )
2549
2550 if attach_on_outside:
2551 # this is the "classic" eager join case.
2552 eagerjoin = orm_util._ORMJoin(
2553 towrap,
2554 clauses.aliased_insp,
2555 onclause,
2556 isouter=not innerjoin
2557 or query_entity.entity_zero.represents_outer_join
2558 or (chained_from_outerjoin and isinstance(towrap, sql.Join)),
2559 _left_memo=self.parent,
2560 _right_memo=path[self.mapper],
2561 _extra_criteria=extra_join_criteria,
2562 )
2563 else:
2564 # all other cases are innerjoin=='nested' approach
2565 eagerjoin = self._splice_nested_inner_join(
2566 path, path[-2], towrap, clauses, onclause, extra_join_criteria
2567 )
2568
2569 compile_state.eager_joins[query_entity_key] = eagerjoin
2570
2571 # send a hint to the Query as to where it may "splice" this join
2572 eagerjoin.stop_on = query_entity.selectable
2573
2574 if not parentmapper:
2575 # for parentclause that is the non-eager end of the join,
2576 # ensure all the parent cols in the primaryjoin are actually
2577 # in the
2578 # columns clause (i.e. are not deferred), so that aliasing applied
2579 # by the Query propagates those columns outward.
2580 # This has the effect
2581 # of "undefering" those columns.
2582 for col in sql_util._find_columns(
2583 self.parent_property.primaryjoin
2584 ):
2585 if localparent.persist_selectable.c.contains_column(col):
2586 if adapter:
2587 col = adapter.columns[col]
2588 compile_state._append_dedupe_col_collection(
2589 col, compile_state.primary_columns
2590 )
2591
2592 if self.parent_property.order_by:
2593 compile_state.eager_order_by += tuple(
2594 (eagerjoin._target_adapter.copy_and_process)(
2595 util.to_list(self.parent_property.order_by)
2596 )
2597 )
2598
2599 def _splice_nested_inner_join(
2600 self,
2601 path,
2602 entity_we_want_to_splice_onto,
2603 join_obj,
2604 clauses,
2605 onclause,
2606 extra_criteria,
2607 entity_inside_join_structure: Union[
2608 Mapper, None, Literal[False]
2609 ] = False,
2610 detected_existing_path: Optional[path_registry.PathRegistry] = None,
2611 ):
2612 # recursive fn to splice a nested join into an existing one.
2613 # entity_inside_join_structure=False means this is the outermost call,
2614 # and it should return a value. entity_inside_join_structure=<mapper>
2615 # indicates we've descended into a join and are looking at a FROM
2616 # clause representing this mapper; if this is not
2617 # entity_we_want_to_splice_onto then return None to end the recursive
2618 # branch
2619
2620 assert entity_we_want_to_splice_onto is path[-2]
2621
2622 if entity_inside_join_structure is False:
2623 assert isinstance(join_obj, orm_util._ORMJoin)
2624
2625 if isinstance(join_obj, sql.selectable.FromGrouping):
2626 # FromGrouping - continue descending into the structure
2627 return self._splice_nested_inner_join(
2628 path,
2629 entity_we_want_to_splice_onto,
2630 join_obj.element,
2631 clauses,
2632 onclause,
2633 extra_criteria,
2634 entity_inside_join_structure,
2635 )
2636 elif isinstance(join_obj, orm_util._ORMJoin):
2637 # _ORMJoin - continue descending into the structure
2638
2639 join_right_path = join_obj._right_memo
2640
2641 # see if right side of join is viable
2642 target_join = self._splice_nested_inner_join(
2643 path,
2644 entity_we_want_to_splice_onto,
2645 join_obj.right,
2646 clauses,
2647 onclause,
2648 extra_criteria,
2649 entity_inside_join_structure=(
2650 join_right_path[-1].mapper
2651 if join_right_path is not None
2652 else None
2653 ),
2654 )
2655
2656 if target_join is not None:
2657 # for a right splice, attempt to flatten out
2658 # a JOIN b JOIN c JOIN .. to avoid needless
2659 # parenthesis nesting
2660 if not join_obj.isouter and not target_join.isouter:
2661 eagerjoin = join_obj._splice_into_center(target_join)
2662 else:
2663 eagerjoin = orm_util._ORMJoin(
2664 join_obj.left,
2665 target_join,
2666 join_obj.onclause,
2667 isouter=join_obj.isouter,
2668 _left_memo=join_obj._left_memo,
2669 )
2670
2671 eagerjoin._target_adapter = target_join._target_adapter
2672 return eagerjoin
2673
2674 else:
2675 # see if left side of join is viable
2676 target_join = self._splice_nested_inner_join(
2677 path,
2678 entity_we_want_to_splice_onto,
2679 join_obj.left,
2680 clauses,
2681 onclause,
2682 extra_criteria,
2683 entity_inside_join_structure=join_obj._left_memo,
2684 detected_existing_path=join_right_path,
2685 )
2686
2687 if target_join is not None:
2688 eagerjoin = orm_util._ORMJoin(
2689 target_join,
2690 join_obj.right,
2691 join_obj.onclause,
2692 isouter=join_obj.isouter,
2693 _right_memo=join_obj._right_memo,
2694 )
2695 eagerjoin._target_adapter = target_join._target_adapter
2696 return eagerjoin
2697
2698 # neither side viable, return None, or fail if this was the top
2699 # most call
2700 if entity_inside_join_structure is False:
2701 assert (
2702 False
2703 ), "assertion failed attempting to produce joined eager loads"
2704 return None
2705
2706 # reached an endpoint (e.g. a table that's mapped, or an alias of that
2707 # table). determine if we can use this endpoint to splice onto
2708
2709 # is this the entity we want to splice onto in the first place?
2710 if not entity_we_want_to_splice_onto.isa(entity_inside_join_structure):
2711 return None
2712
2713 # path check. if we know the path how this join endpoint got here,
2714 # lets look at our path we are satisfying and see if we're in the
2715 # wrong place. This is specifically for when our entity may
2716 # appear more than once in the path, issue #11449
2717 # updated in issue #11965.
2718 if detected_existing_path and len(detected_existing_path) > 2:
2719 # this assertion is currently based on how this call is made,
2720 # where given a join_obj, the call will have these parameters as
2721 # entity_inside_join_structure=join_obj._left_memo
2722 # and entity_inside_join_structure=join_obj._right_memo.mapper
2723 assert detected_existing_path[-3] is entity_inside_join_structure
2724
2725 # from that, see if the path we are targeting matches the
2726 # "existing" path of this join all the way up to the midpoint
2727 # of this join object (e.g. the relationship).
2728 # if not, then this is not our target
2729 #
2730 # a test condition where this test is false looks like:
2731 #
2732 # desired splice: Node->kind->Kind
2733 # path of desired splice: NodeGroup->nodes->Node->kind
2734 # path we've located: NodeGroup->nodes->Node->common_node->Node
2735 #
2736 # above, because we want to splice kind->Kind onto
2737 # NodeGroup->nodes->Node, this is not our path because it actually
2738 # goes more steps than we want into self-referential
2739 # ->common_node->Node
2740 #
2741 # a test condition where this test is true looks like:
2742 #
2743 # desired splice: B->c2s->C2
2744 # path of desired splice: A->bs->B->c2s
2745 # path we've located: A->bs->B->c1s->C1
2746 #
2747 # above, we want to splice c2s->C2 onto B, and the located path
2748 # shows that the join ends with B->c1s->C1. so we will
2749 # add another join onto that, which would create a "branch" that
2750 # we might represent in a pseudopath as:
2751 #
2752 # B->c1s->C1
2753 # ->c2s->C2
2754 #
2755 # i.e. A JOIN B ON <bs> JOIN C1 ON <c1s>
2756 # JOIN C2 ON <c2s>
2757 #
2758
2759 if detected_existing_path[0:-2] != path.path[0:-1]:
2760 return None
2761
2762 return orm_util._ORMJoin(
2763 join_obj,
2764 clauses.aliased_insp,
2765 onclause,
2766 isouter=False,
2767 _left_memo=entity_inside_join_structure,
2768 _right_memo=path[path[-1].mapper],
2769 _extra_criteria=extra_criteria,
2770 )
2771
2772 def _create_eager_adapter(self, context, result, adapter, path, loadopt):
2773 compile_state = context.compile_state
2774
2775 user_defined_adapter = (
2776 self._init_user_defined_eager_proc(
2777 loadopt, compile_state, context.attributes
2778 )
2779 if loadopt
2780 else False
2781 )
2782
2783 if user_defined_adapter is not False:
2784 decorator = user_defined_adapter
2785 # user defined eagerloads are part of the "primary"
2786 # portion of the load.
2787 # the adapters applied to the Query should be honored.
2788 if compile_state.compound_eager_adapter and decorator:
2789 decorator = decorator.wrap(
2790 compile_state.compound_eager_adapter
2791 )
2792 elif compile_state.compound_eager_adapter:
2793 decorator = compile_state.compound_eager_adapter
2794 else:
2795 decorator = path.get(
2796 compile_state.attributes, "eager_row_processor"
2797 )
2798 if decorator is None:
2799 return False
2800
2801 if self.mapper._result_has_identity_key(result, decorator):
2802 return decorator
2803 else:
2804 # no identity key - don't return a row
2805 # processor, will cause a degrade to lazy
2806 return False
2807
2808 def create_row_processor(
2809 self,
2810 context,
2811 query_entity,
2812 path,
2813 loadopt,
2814 mapper,
2815 result,
2816 adapter,
2817 populators,
2818 ):
2819
2820 if not context.compile_state.compile_options._enable_eagerloads:
2821 return
2822
2823 if not self.parent.class_manager[self.key].impl.supports_population:
2824 raise sa_exc.InvalidRequestError(
2825 "'%s' does not support object "
2826 "population - eager loading cannot be applied." % self
2827 )
2828
2829 if self.uselist:
2830 context.loaders_require_uniquing = True
2831
2832 our_path = path[self.parent_property]
2833
2834 eager_adapter = self._create_eager_adapter(
2835 context, result, adapter, our_path, loadopt
2836 )
2837
2838 if eager_adapter is not False:
2839 key = self.key
2840
2841 _instance = loading._instance_processor(
2842 query_entity,
2843 self.mapper,
2844 context,
2845 result,
2846 our_path[self.entity],
2847 eager_adapter,
2848 )
2849
2850 if not self.uselist:
2851 self._create_scalar_loader(context, key, _instance, populators)
2852 else:
2853 self._create_collection_loader(
2854 context, key, _instance, populators
2855 )
2856 else:
2857 self.parent_property._get_strategy(
2858 (("lazy", "select"),)
2859 ).create_row_processor(
2860 context,
2861 query_entity,
2862 path,
2863 loadopt,
2864 mapper,
2865 result,
2866 adapter,
2867 populators,
2868 )
2869
2870 def _create_collection_loader(self, context, key, _instance, populators):
2871 def load_collection_from_joined_new_row(state, dict_, row):
2872 # note this must unconditionally clear out any existing collection.
2873 # an existing collection would be present only in the case of
2874 # populate_existing().
2875 collection = attributes.init_state_collection(state, dict_, key)
2876 result_list = util.UniqueAppender(
2877 collection, "append_without_event"
2878 )
2879 context.attributes[(state, key)] = result_list
2880 inst = _instance(row)
2881 if inst is not None:
2882 result_list.append(inst)
2883
2884 def load_collection_from_joined_existing_row(state, dict_, row):
2885 if (state, key) in context.attributes:
2886 result_list = context.attributes[(state, key)]
2887 else:
2888 # appender_key can be absent from context.attributes
2889 # with isnew=False when self-referential eager loading
2890 # is used; the same instance may be present in two
2891 # distinct sets of result columns
2892 collection = attributes.init_state_collection(
2893 state, dict_, key
2894 )
2895 result_list = util.UniqueAppender(
2896 collection, "append_without_event"
2897 )
2898 context.attributes[(state, key)] = result_list
2899 inst = _instance(row)
2900 if inst is not None:
2901 result_list.append(inst)
2902
2903 def load_collection_from_joined_exec(state, dict_, row):
2904 _instance(row)
2905
2906 populators["new"].append(
2907 (self.key, load_collection_from_joined_new_row)
2908 )
2909 populators["existing"].append(
2910 (self.key, load_collection_from_joined_existing_row)
2911 )
2912 if context.invoke_all_eagers:
2913 populators["eager"].append(
2914 (self.key, load_collection_from_joined_exec)
2915 )
2916
2917 def _create_scalar_loader(self, context, key, _instance, populators):
2918 def load_scalar_from_joined_new_row(state, dict_, row):
2919 # set a scalar object instance directly on the parent
2920 # object, bypassing InstrumentedAttribute event handlers.
2921 dict_[key] = _instance(row)
2922
2923 def load_scalar_from_joined_existing_row(state, dict_, row):
2924 # call _instance on the row, even though the object has
2925 # been created, so that we further descend into properties
2926 existing = _instance(row)
2927
2928 # conflicting value already loaded, this shouldn't happen
2929 if key in dict_:
2930 if existing is not dict_[key]:
2931 util.warn(
2932 "Multiple rows returned with "
2933 "uselist=False for eagerly-loaded attribute '%s' "
2934 % self
2935 )
2936 else:
2937 # this case is when one row has multiple loads of the
2938 # same entity (e.g. via aliasing), one has an attribute
2939 # that the other doesn't.
2940 dict_[key] = existing
2941
2942 def load_scalar_from_joined_exec(state, dict_, row):
2943 _instance(row)
2944
2945 populators["new"].append((self.key, load_scalar_from_joined_new_row))
2946 populators["existing"].append(
2947 (self.key, load_scalar_from_joined_existing_row)
2948 )
2949 if context.invoke_all_eagers:
2950 populators["eager"].append(
2951 (self.key, load_scalar_from_joined_exec)
2952 )
2953
2954
2955@log.class_logger
2956@relationships.RelationshipProperty.strategy_for(lazy="selectin")
2957class _SelectInLoader(_PostLoader, util.MemoizedSlots):
2958 __slots__ = (
2959 "join_depth",
2960 "omit_join",
2961 "_parent_alias",
2962 "_query_info",
2963 "_fallback_query_info",
2964 )
2965
2966 query_info = collections.namedtuple(
2967 "queryinfo",
2968 [
2969 "load_only_child",
2970 "load_with_join",
2971 "in_expr",
2972 "pk_cols",
2973 "zero_idx",
2974 "child_lookup_cols",
2975 ],
2976 )
2977
2978 _chunksize = 500
2979
2980 @classmethod
2981 def _set_chunksize(cls, loadopt) -> int:
2982 if loadopt is None or hasattr(loadopt, "local_opts") is None:
2983 return cls._chunksize
2984
2985 user_input = loadopt.local_opts.get("chunksize", None)
2986 if user_input is None:
2987 return cls._chunksize
2988 elif not isinstance(user_input, int) or user_input < 1:
2989 raise sa_exc.ArgumentError(
2990 f"'chunksize={user_input}' is not an appropriate input, "
2991 f"please use a positive non-zero integer."
2992 )
2993 return user_input
2994
2995 def __init__(self, parent, strategy_key):
2996 super().__init__(parent, strategy_key)
2997 self.join_depth = self.parent_property.join_depth
2998 is_m2o = self.parent_property.direction is interfaces.MANYTOONE
2999
3000 if self.parent_property.omit_join is not None:
3001 self.omit_join = self.parent_property.omit_join
3002 else:
3003 lazyloader = self.parent_property._get_strategy(
3004 (("lazy", "select"),)
3005 )
3006 if is_m2o:
3007 self.omit_join = lazyloader.use_get
3008 else:
3009 self.omit_join = self.parent._get_clause[0].compare(
3010 lazyloader._rev_lazywhere,
3011 use_proxies=True,
3012 compare_keys=False,
3013 equivalents=self.parent._equivalent_columns,
3014 )
3015
3016 if self.omit_join:
3017 if is_m2o:
3018 self._query_info = self._init_for_omit_join_m2o()
3019 self._fallback_query_info = self._init_for_join()
3020 else:
3021 self._query_info = self._init_for_omit_join()
3022 else:
3023 self._query_info = self._init_for_join()
3024
3025 def _init_for_omit_join(self):
3026 pk_to_fk = dict(
3027 self.parent_property._join_condition.local_remote_pairs
3028 )
3029 pk_to_fk.update(
3030 (equiv, pk_to_fk[k])
3031 for k in list(pk_to_fk)
3032 for equiv in self.parent._equivalent_columns.get(k, ())
3033 )
3034
3035 pk_cols = fk_cols = [
3036 pk_to_fk[col] for col in self.parent.primary_key if col in pk_to_fk
3037 ]
3038 if len(fk_cols) > 1:
3039 in_expr = sql.tuple_(*fk_cols)
3040 zero_idx = False
3041 else:
3042 in_expr = fk_cols[0]
3043 zero_idx = True
3044
3045 return self.query_info(False, False, in_expr, pk_cols, zero_idx, None)
3046
3047 def _init_for_omit_join_m2o(self):
3048 pk_cols = self.mapper.primary_key
3049 if len(pk_cols) > 1:
3050 in_expr = sql.tuple_(*pk_cols)
3051 zero_idx = False
3052 else:
3053 in_expr = pk_cols[0]
3054 zero_idx = True
3055
3056 lazyloader = self.parent_property._get_strategy((("lazy", "select"),))
3057 lookup_cols = [lazyloader._equated_columns[pk] for pk in pk_cols]
3058
3059 return self.query_info(
3060 True, False, in_expr, pk_cols, zero_idx, lookup_cols
3061 )
3062
3063 def _init_for_join(self):
3064 self._parent_alias = AliasedClass(self.parent.class_)
3065 pa_insp = inspect(self._parent_alias)
3066 pk_cols = [
3067 pa_insp._adapt_element(col) for col in self.parent.primary_key
3068 ]
3069 if len(pk_cols) > 1:
3070 in_expr = sql.tuple_(*pk_cols)
3071 zero_idx = False
3072 else:
3073 in_expr = pk_cols[0]
3074 zero_idx = True
3075 return self.query_info(False, True, in_expr, pk_cols, zero_idx, None)
3076
3077 def init_class_attribute(self, mapper):
3078 self.parent_property._get_strategy(
3079 (("lazy", "select"),)
3080 ).init_class_attribute(mapper)
3081
3082 def create_row_processor(
3083 self,
3084 context,
3085 query_entity,
3086 path,
3087 loadopt,
3088 mapper,
3089 result,
3090 adapter,
3091 populators,
3092 ):
3093 if context.refresh_state:
3094 return self._immediateload_create_row_processor(
3095 context,
3096 query_entity,
3097 path,
3098 loadopt,
3099 mapper,
3100 result,
3101 adapter,
3102 populators,
3103 )
3104
3105 (
3106 effective_path,
3107 run_loader,
3108 execution_options,
3109 recursion_depth,
3110 ) = self._setup_for_recursion(
3111 context, path, loadopt, join_depth=self.join_depth
3112 )
3113
3114 if not run_loader:
3115 return
3116
3117 if not context.compile_state.compile_options._enable_eagerloads:
3118 return
3119
3120 if not self.parent.class_manager[self.key].impl.supports_population:
3121 raise sa_exc.InvalidRequestError(
3122 "'%s' does not support object "
3123 "population - eager loading cannot be applied." % self
3124 )
3125
3126 # a little dance here as the "path" is still something that only
3127 # semi-tracks the exact series of things we are loading, still not
3128 # telling us about with_polymorphic() and stuff like that when it's at
3129 # the root.. the initial MapperEntity is more accurate for this case.
3130 if len(path) == 1:
3131 if not orm_util._entity_isa(query_entity.entity_zero, self.parent):
3132 return
3133 elif not orm_util._entity_isa(
3134 path[-1], self.parent
3135 ) and not self.parent.isa(path[-1].mapper):
3136 # second check accommodates a polymorphic entity where
3137 # the path has been normalized to the base mapper but
3138 # self.parent is a subclass mapper, e.g.
3139 # joinedload(A.b.of_type(poly)).selectinload(poly.Sub.rel)
3140 # Fixes #13209.
3141 return
3142
3143 selectin_path = effective_path
3144
3145 path_w_prop = path[self.parent_property]
3146
3147 # build up a path indicating the path from the leftmost
3148 # entity to the thing we're subquery loading.
3149 with_poly_entity = path_w_prop.get(
3150 context.attributes, "path_with_polymorphic", None
3151 )
3152 if with_poly_entity is not None:
3153 effective_entity = inspect(with_poly_entity)
3154 else:
3155 effective_entity = self.entity
3156
3157 loading._PostLoad.callable_for_path(
3158 context,
3159 selectin_path,
3160 self.parent,
3161 self.parent_property,
3162 self._load_for_path,
3163 effective_entity,
3164 loadopt,
3165 recursion_depth,
3166 execution_options,
3167 )
3168
3169 def _load_for_path(
3170 self,
3171 context,
3172 path,
3173 states,
3174 load_only,
3175 effective_entity,
3176 loadopt,
3177 recursion_depth,
3178 execution_options,
3179 ):
3180 if load_only and self.key not in load_only:
3181 return
3182
3183 query_info = self._query_info
3184
3185 if query_info.load_only_child:
3186 our_states = collections.defaultdict(list)
3187 none_states = []
3188
3189 mapper = self.parent
3190
3191 for state, overwrite in states:
3192 state_dict = state.dict
3193 related_ident = tuple(
3194 mapper._get_state_attr_by_column(
3195 state,
3196 state_dict,
3197 lk,
3198 passive=attributes.PASSIVE_NO_FETCH,
3199 )
3200 for lk in query_info.child_lookup_cols
3201 )
3202 # if the loaded parent objects do not have the foreign key
3203 # to the related item loaded, then degrade into the joined
3204 # version of selectinload
3205 if LoaderCallableStatus.PASSIVE_NO_RESULT in related_ident:
3206 query_info = self._fallback_query_info
3207 break
3208
3209 # organize states into lists keyed to particular foreign
3210 # key values.
3211 if None not in related_ident:
3212 our_states[related_ident].append(
3213 (state, state_dict, overwrite)
3214 )
3215 else:
3216 # For FK values that have None, add them to a
3217 # separate collection that will be populated separately
3218 none_states.append((state, state_dict, overwrite))
3219
3220 # note the above conditional may have changed query_info
3221 if not query_info.load_only_child:
3222 our_states = [
3223 (state.key[1], state, state.dict, overwrite)
3224 for state, overwrite in states
3225 ]
3226
3227 pk_cols = query_info.pk_cols
3228 in_expr = query_info.in_expr
3229
3230 if not query_info.load_with_join:
3231 # in "omit join" mode, the primary key column and the
3232 # "in" expression are in terms of the related entity. So
3233 # if the related entity is polymorphic or otherwise aliased,
3234 # we need to adapt our "pk_cols" and "in_expr" to that
3235 # entity. in non-"omit join" mode, these are against the
3236 # parent entity and do not need adaption.
3237 if effective_entity.is_aliased_class:
3238 pk_cols = [
3239 effective_entity._adapt_element(col) for col in pk_cols
3240 ]
3241 in_expr = effective_entity._adapt_element(in_expr)
3242
3243 bundle_ent = orm_util.Bundle("pk", *pk_cols)
3244 bundle_sql = bundle_ent.__clause_element__()
3245
3246 entity_sql = effective_entity.__clause_element__()
3247 q = Select._create_raw_select(
3248 _raw_columns=[bundle_sql, entity_sql],
3249 _compile_options=_ORMCompileState.default_compile_options,
3250 _propagate_attrs={
3251 "compile_state_plugin": "orm",
3252 "plugin_subject": effective_entity,
3253 },
3254 )
3255
3256 if not query_info.load_with_join:
3257 # the Bundle we have in the "omit_join" case is against raw, non
3258 # annotated columns, so to ensure the Query knows its primary
3259 # entity, we add it explicitly. If we made the Bundle against
3260 # annotated columns, we hit a performance issue in this specific
3261 # case, which is detailed in issue #4347.
3262 q = q.select_from(effective_entity)
3263 else:
3264 # in the non-omit_join case, the Bundle is against the annotated/
3265 # mapped column of the parent entity, but the #4347 issue does not
3266 # occur in this case.
3267 q = q.select_from(self._parent_alias).join(
3268 getattr(self._parent_alias, self.parent_property.key).of_type(
3269 effective_entity
3270 )
3271 )
3272
3273 q = q.filter(in_expr.in_(sql.bindparam("primary_keys")))
3274
3275 # a test which exercises what these comments talk about is
3276 # test_selectin_relations.py -> test_twolevel_selectin_w_polymorphic
3277 #
3278 # effective_entity above is given to us in terms of the cached
3279 # statement, namely this one:
3280 orig_query = context.compile_state.select_statement
3281
3282 # the actual statement that was requested is this one:
3283 # context_query = context.user_passed_query
3284 #
3285 # that's not the cached one, however. So while it is of the identical
3286 # structure, if it has entities like AliasedInsp, which we get from
3287 # aliased() or with_polymorphic(), the AliasedInsp will likely be a
3288 # different object identity each time, and will not match up
3289 # hashing-wise to the corresponding AliasedInsp that's in the
3290 # cached query, meaning it won't match on paths and loader lookups
3291 # and loaders like this one will be skipped if it is used in options.
3292 #
3293 # as it turns out, standard loader options like selectinload(),
3294 # lazyload() that have a path need
3295 # to come from the cached query so that the AliasedInsp etc. objects
3296 # that are in the query line up with the object that's in the path
3297 # of the strategy object. however other options like
3298 # with_loader_criteria() that doesn't have a path (has a fixed entity)
3299 # and needs to have access to the latest closure state in order to
3300 # be correct, we need to use the uncached one.
3301 #
3302 # as of #8399 we let the loader option itself figure out what it
3303 # wants to do given cached and uncached version of itself.
3304
3305 effective_path = path[self.parent_property]
3306
3307 if orig_query is context.user_passed_query:
3308 new_options = orig_query._with_options
3309 else:
3310 cached_options = orig_query._with_options
3311 uncached_options = context.user_passed_query._with_options
3312
3313 # propagate compile state options from the original query,
3314 # updating their "extra_criteria" as necessary.
3315 # note this will create a different cache key than
3316 # "orig" options if extra_criteria is present, because the copy
3317 # of extra_criteria will have different boundparam than that of
3318 # the QueryableAttribute in the path
3319 new_options = [
3320 orig_opt._adapt_cached_option_to_uncached_option(
3321 context, uncached_opt
3322 )
3323 for orig_opt, uncached_opt in zip(
3324 cached_options, uncached_options
3325 )
3326 ]
3327
3328 if loadopt and loadopt._extra_criteria:
3329 new_options += (
3330 orm_util.LoaderCriteriaOption(
3331 effective_entity,
3332 loadopt._generate_extra_criteria(context),
3333 ),
3334 )
3335
3336 if recursion_depth is not None:
3337 effective_path = effective_path._truncate_recursive()
3338
3339 q = q.options(*new_options)
3340
3341 q = q._update_compile_options({"_current_path": effective_path})
3342 if context.populate_existing:
3343 q = q.execution_options(populate_existing=True)
3344
3345 if self.parent_property.order_by:
3346 if not query_info.load_with_join:
3347 eager_order_by = self.parent_property.order_by
3348 if effective_entity.is_aliased_class:
3349 eager_order_by = [
3350 effective_entity._adapt_element(elem)
3351 for elem in eager_order_by
3352 ]
3353 q = q.order_by(*eager_order_by)
3354 else:
3355
3356 def _setup_outermost_orderby(compile_context):
3357 compile_context.eager_order_by += tuple(
3358 util.to_list(self.parent_property.order_by)
3359 )
3360
3361 q = q._add_compile_state_func(
3362 _setup_outermost_orderby, self.parent_property
3363 )
3364
3365 chunksize = self._set_chunksize(loadopt)
3366
3367 if query_info.load_only_child:
3368 self._load_via_child(
3369 our_states,
3370 none_states,
3371 query_info,
3372 q,
3373 context,
3374 execution_options,
3375 chunksize,
3376 )
3377 else:
3378 self._load_via_parent(
3379 our_states,
3380 query_info,
3381 q,
3382 context,
3383 execution_options,
3384 chunksize,
3385 )
3386
3387 def _load_via_child(
3388 self,
3389 our_states,
3390 none_states,
3391 query_info,
3392 q,
3393 context,
3394 execution_options,
3395 chunksize,
3396 ):
3397 uselist = self.uselist
3398
3399 # this sort is really for the benefit of the unit tests
3400 our_keys = sorted(our_states)
3401 while our_keys:
3402 chunk = our_keys[0:chunksize]
3403 our_keys = our_keys[chunksize:]
3404 data = {
3405 k: v
3406 for k, v in context.session.execute(
3407 q,
3408 params={
3409 "primary_keys": [
3410 key[0] if query_info.zero_idx else key
3411 for key in chunk
3412 ]
3413 },
3414 execution_options=execution_options,
3415 ).unique()
3416 }
3417
3418 for key in chunk:
3419 # for a real foreign key and no concurrent changes to the
3420 # DB while running this method, "key" is always present in
3421 # data. However, for primaryjoins without real foreign keys
3422 # a non-None primaryjoin condition may still refer to no
3423 # related object.
3424 related_obj = data.get(key, None)
3425 for state, dict_, overwrite in our_states[key]:
3426 if not overwrite and self.key in dict_:
3427 continue
3428
3429 state.get_impl(self.key).set_committed_value(
3430 state,
3431 dict_,
3432 related_obj if not uselist else [related_obj],
3433 )
3434 # populate none states with empty value / collection
3435 for state, dict_, overwrite in none_states:
3436 if not overwrite and self.key in dict_:
3437 continue
3438
3439 # note it's OK if this is a uselist=True attribute, the empty
3440 # collection will be populated
3441 state.get_impl(self.key).set_committed_value(state, dict_, None)
3442
3443 def _load_via_parent(
3444 self, our_states, query_info, q, context, execution_options, chunksize
3445 ):
3446 uselist = self.uselist
3447 _empty_result = () if uselist else None
3448
3449 while our_states:
3450 chunk = our_states[0:chunksize]
3451 our_states = our_states[chunksize:]
3452
3453 primary_keys = [
3454 key[0] if query_info.zero_idx else key
3455 for key, state, state_dict, overwrite in chunk
3456 ]
3457
3458 data = collections.defaultdict(list)
3459 for k, v in itertools.groupby(
3460 context.session.execute(
3461 q,
3462 params={"primary_keys": primary_keys},
3463 execution_options=execution_options,
3464 ).unique(),
3465 lambda x: x[0],
3466 ):
3467 data[k].extend(vv[1] for vv in v)
3468
3469 for key, state, state_dict, overwrite in chunk:
3470 if not overwrite and self.key in state_dict:
3471 continue
3472
3473 collection = data.get(key, _empty_result)
3474
3475 if not uselist and collection:
3476 if len(collection) > 1:
3477 util.warn(
3478 "Multiple rows returned with "
3479 "uselist=False for eagerly-loaded "
3480 "attribute '%s' " % self
3481 )
3482 state.get_impl(self.key).set_committed_value(
3483 state, state_dict, collection[0]
3484 )
3485 else:
3486 # note that empty tuple set on uselist=False sets the
3487 # value to None
3488 state.get_impl(self.key).set_committed_value(
3489 state, state_dict, collection
3490 )
3491
3492
3493def _single_parent_validator(desc, prop):
3494 def _do_check(state, value, oldvalue, initiator):
3495 if value is not None and initiator.key == prop.key:
3496 hasparent = initiator.hasparent(attributes.instance_state(value))
3497 if hasparent and oldvalue is not value:
3498 raise sa_exc.InvalidRequestError(
3499 "Instance %s is already associated with an instance "
3500 "of %s via its %s attribute, and is only allowed a "
3501 "single parent."
3502 % (orm_util.instance_str(value), state.class_, prop),
3503 code="bbf1",
3504 )
3505 return value
3506
3507 def append(state, value, initiator):
3508 return _do_check(state, value, None, initiator)
3509
3510 def set_(state, value, oldvalue, initiator):
3511 return _do_check(state, value, oldvalue, initiator)
3512
3513 event.listen(
3514 desc, "append", append, raw=True, retval=True, active_history=True
3515 )
3516 event.listen(desc, "set", set_, raw=True, retval=True, active_history=True)